Skip to content

Commit 52dda1d

Browse files
tvoinarovskyi88manpreet
authored andcommitted
Added beginning_offsets and end_offsets API's and fixed @jeffwidman review issues
1 parent 98ac7e6 commit 52dda1d

File tree

4 files changed

+142
-17
lines changed

4 files changed

+142
-17
lines changed

kafka/conn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,7 @@ def _handle_api_version_response(self, response):
895895

896896
def _infer_broker_version_from_api_versions(self, api_versions):
897897
# The logic here is to check the list of supported request versions
898-
# in descending order. As soon as we find one that works, return it
898+
# in reverse order. As soon as we find one that works, return it
899899
test_cases = [
900900
# format (<broker verion>, <needed struct>)
901901
((0, 11, 0), MetadataRequest[4]),

kafka/consumer/fetcher.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,21 @@ def get_offsets_by_times(self, timestamps, timeout_ms):
193193
offsets[tp] = OffsetAndTimestamp(offset, timestamp)
194194
return offsets
195195

196+
def beginning_offsets(self, partitions, timeout_ms):
197+
return self.beginning_or_end_offset(
198+
partitions, OffsetResetStrategy.EARLIEST, timeout_ms)
199+
200+
def end_offsets(self, partitions, timeout_ms):
201+
return self.beginning_or_end_offset(
202+
partitions, OffsetResetStrategy.LATEST, timeout_ms)
203+
204+
def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
205+
timestamps = dict([(tp, timestamp) for tp in partitions])
206+
offsets = self._retrieve_offsets(timestamps, timeout_ms)
207+
for tp in timestamps:
208+
offsets[tp] = offsets[tp][0]
209+
return offsets
210+
196211
def _reset_offset(self, partition):
197212
"""Reset offsets for the given partition using the offset reset strategy.
198213
@@ -222,10 +237,10 @@ def _reset_offset(self, partition):
222237
self._subscriptions.seek(partition, offset)
223238

224239
def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
225-
""" Fetch offset for each partition passed in ``timestamps`` map.
240+
"""Fetch offset for each partition passed in ``timestamps`` map.
226241
227242
Blocks until offsets are obtained, a non-retriable exception is raised
228-
or ``timeout_ms`` passed (if it's not ``None``).
243+
or ``timeout_ms`` passed.
229244
230245
Arguments:
231246
timestamps: {TopicPartition: int} dict with timestamps to fetch
@@ -268,7 +283,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
268283
remaining_ms = timeout_ms - elapsed_ms
269284

270285
raise Errors.KafkaTimeoutError(
271-
"Failed to get offsets by times in %s ms" % timeout_ms)
286+
"Failed to get offsets by timestamps in %s ms" % timeout_ms)
272287

273288
def _raise_if_offset_out_of_range(self):
274289
"""Check FetchResponses for offset out of range.
@@ -610,7 +625,7 @@ def _deserialize(self, msg):
610625
return key, value
611626

612627
def _send_offset_requests(self, timestamps):
613-
""" Fetch offsets for each partition in timestamps dict. This may send
628+
"""Fetch offsets for each partition in timestamps dict. This may send
614629
request to multiple nodes, based on who is Leader for partition.
615630
616631
Arguments:

kafka/consumer/group.py

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -862,33 +862,37 @@ def metrics(self, raw=False):
862862
return metrics
863863

864864
def offsets_for_times(self, timestamps):
865-
"""
866-
Look up the offsets for the given partitions by timestamp. The returned
867-
offset for each partition is the earliest offset whose timestamp is
868-
greater than or equal to the given timestamp in the corresponding
869-
partition.
865+
"""Look up the offsets for the given partitions by timestamp. The
866+
returned offset for each partition is the earliest offset whose
867+
timestamp is greater than or equal to the given timestamp in the
868+
corresponding partition.
870869
871870
This is a blocking call. The consumer does not have to be assigned the
872871
partitions.
873872
874873
If the message format version in a partition is before 0.10.0, i.e.
875874
the messages do not have timestamps, ``None`` will be returned for that
876-
partition.
875+
partition. ``None`` will also be returned for the partition if there
876+
are no messages in it.
877877
878878
Note:
879-
Notice that this method may block indefinitely if the partition
880-
does not exist.
879+
This method may block indefinitely if the partition does not exist.
881880
882881
Arguments:
883882
timestamps (dict): ``{TopicPartition: int}`` mapping from partition
884883
to the timestamp to look up. Unit should be milliseconds since
885884
beginning of the epoch (midnight Jan 1, 1970 (UTC))
886885
886+
Returns:
887+
``{TopicPartition: OffsetAndTimestamp}``: mapping from partition
888+
to the timestamp and offset of the first message with timestamp
889+
greater than or equal to the target timestamp.
890+
887891
Raises:
888-
ValueError: if the target timestamp is negative
889-
UnsupportedVersionError: if the broker does not support looking
892+
ValueError: If the target timestamp is negative
893+
UnsupportedVersionError: If the broker does not support looking
890894
up the offsets by timestamp.
891-
KafkaTimeoutError: if fetch failed in request_timeout_ms
895+
KafkaTimeoutError: If fetch failed in request_timeout_ms
892896
"""
893897
if self.config['api_version'] <= (0, 10, 0):
894898
raise UnsupportedVersionError(
@@ -903,6 +907,67 @@ def offsets_for_times(self, timestamps):
903907
return self._fetcher.get_offsets_by_times(
904908
timestamps, self.config['request_timeout_ms'])
905909

910+
def beginning_offsets(self, partitions):
911+
"""Get the first offset for the given partitions.
912+
913+
This method does not change the current consumer position of the
914+
partitions.
915+
916+
Note:
917+
This method may block indefinitely if the partition does not exist.
918+
919+
Arguments:
920+
partitions (list): List of TopicPartition instances to fetch
921+
offsets for.
922+
923+
Returns:
924+
``{TopicPartition: int}``: The earliest available offsets for the
925+
given partitions.
926+
927+
Raises:
928+
UnsupportedVersionError: If the broker does not support looking
929+
up the offsets by timestamp.
930+
KafkaTimeoutError: If fetch failed in request_timeout_ms.
931+
"""
932+
if self.config['api_version'] <= (0, 10, 0):
933+
raise UnsupportedVersionError(
934+
"offsets_for_times API not supported for cluster version {}"
935+
.format(self.config['api_version']))
936+
offsets = self._fetcher.beginning_offsets(
937+
partitions, self.config['request_timeout_ms'])
938+
return offsets
939+
940+
def end_offsets(self, partitions):
941+
"""Get the last offset for the given partitions. The last offset of a
942+
partition is the offset of the upcoming message, i.e. the offset of the
943+
last available message + 1.
944+
945+
This method does not change the current consumer position of the
946+
partitions.
947+
948+
Note:
949+
This method may block indefinitely if the partition does not exist.
950+
951+
Arguments:
952+
partitions (list): List of TopicPartition instances to fetch
953+
offsets for.
954+
955+
Returns:
956+
``{TopicPartition: int}``: The end offsets for the given partitions.
957+
958+
Raises:
959+
UnsupportedVersionError: If the broker does not support looking
960+
up the offsets by timestamp.
961+
KafkaTimeoutError: If fetch failed in request_timeout_ms
962+
"""
963+
if self.config['api_version'] <= (0, 10, 0):
964+
raise UnsupportedVersionError(
965+
"offsets_for_times API not supported for cluster version {}"
966+
.format(self.config['api_version']))
967+
offsets = self._fetcher.end_offsets(
968+
partitions, self.config['request_timeout_ms'])
969+
return offsets
970+
906971
def _use_consumer_group(self):
907972
"""Return True iff this consumer can/should join a broker-coordinated group."""
908973
if self.config['api_version'] < (0, 9):

test/test_consumer_integration.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
)
1313
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
1414
from kafka.errors import (
15-
ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError, UnsupportedVersionError
15+
ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError, UnsupportedVersionError,
16+
KafkaTimeoutError
1617
)
1718
from kafka.structs import (
1819
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
@@ -813,6 +814,9 @@ def test_kafka_consumer_offsets_for_time(self):
813814
self.assertEqual(offsets[tp].offset, late_msg.offset)
814815
self.assertEqual(offsets[tp].timestamp, late_time)
815816

817+
offsets = consumer.offsets_for_times({})
818+
self.assertEqual(offsets, {})
819+
816820
# Out of bound timestamps check
817821

818822
offsets = consumer.offsets_for_times({tp: 0})
@@ -822,6 +826,17 @@ def test_kafka_consumer_offsets_for_time(self):
822826
offsets = consumer.offsets_for_times({tp: 9999999999999})
823827
self.assertEqual(offsets[tp], None)
824828

829+
# Beginning/End offsets
830+
831+
offsets = consumer.beginning_offsets([tp])
832+
self.assertEqual(offsets, {
833+
tp: early_msg.offset,
834+
})
835+
offsets = consumer.end_offsets([tp])
836+
self.assertEqual(offsets, {
837+
tp: late_msg.offset + 1
838+
})
839+
825840
@kafka_versions('>=0.10.1')
826841
def test_kafka_consumer_offsets_search_many_partitions(self):
827842
tp0 = TopicPartition(self.topic, 0)
@@ -847,10 +862,40 @@ def test_kafka_consumer_offsets_search_many_partitions(self):
847862
tp1: OffsetAndTimestamp(p1msg.offset, send_time)
848863
})
849864

865+
offsets = consumer.beginning_offsets([tp0, tp1])
866+
self.assertEqual(offsets, {
867+
tp0: p0msg.offset,
868+
tp1: p1msg.offset
869+
})
870+
871+
offsets = consumer.end_offsets([tp0, tp1])
872+
self.assertEqual(offsets, {
873+
tp0: p0msg.offset + 1,
874+
tp1: p1msg.offset + 1
875+
})
876+
850877
@kafka_versions('<0.10.1')
851878
def test_kafka_consumer_offsets_for_time_old(self):
852879
consumer = self.kafka_consumer()
853880
tp = TopicPartition(self.topic, 0)
854881

855882
with self.assertRaises(UnsupportedVersionError):
856883
consumer.offsets_for_times({tp: int(time.time())})
884+
885+
with self.assertRaises(UnsupportedVersionError):
886+
consumer.beginning_offsets([tp])
887+
888+
with self.assertRaises(UnsupportedVersionError):
889+
consumer.end_offsets([tp])
890+
891+
@kafka_versions('<0.10.1')
892+
def test_kafka_consumer_offsets_for_times_errors(self):
893+
consumer = self.kafka_consumer()
894+
tp = TopicPartition(self.topic, 0)
895+
bad_tp = TopicPartition(self.topic, 100)
896+
897+
with self.assertRaises(ValueError):
898+
consumer.offsets_for_times({tp: -1})
899+
900+
with self.assertRaises(KafkaTimeoutError):
901+
consumer.offsets_for_times({bad_tp: 0})

0 commit comments

Comments
 (0)