From b1b6c122fec6529fd23be111ab166f19eff4b99a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Feb 2025 15:53:51 -0800 Subject: [PATCH 1/4] Rename OffsetRequest/Response => ListOffsetsRequest/Response --- kafka/conn.py | 4 +-- kafka/consumer/fetcher.py | 16 +++++------ kafka/protocol/offset.py | 56 +++++++++++++++++++-------------------- test/test_fetcher.py | 10 +++---- 4 files changed, 43 insertions(+), 43 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 347e5000b..c92ea46f7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -26,7 +26,7 @@ from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest, DescribeClientQuotasRequest from kafka.protocol.commit import OffsetFetchRequest -from kafka.protocol.offset import OffsetRequest +from kafka.protocol.offset import ListOffsetsRequest from kafka.protocol.produce import ProduceRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.fetch import FetchRequest @@ -1202,7 +1202,7 @@ def _infer_broker_version_from_api_versions(self, api_versions): ((2, 5), DescribeAclsRequest[2]), ((2, 4), ProduceRequest[8]), ((2, 3), FetchRequest[11]), - ((2, 2), OffsetRequest[5]), + ((2, 2), ListOffsetsRequest[5]), ((2, 1), FetchRequest[10]), ((2, 0), FetchRequest[8]), ((1, 1), FetchRequest[7]), diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 512d56dc3..d1f756f2a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -14,7 +14,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.offset import ( - OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET + ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET ) from kafka.record import MemoryRecords from kafka.serializer import Deserializer @@ -570,7 +570,7 @@ def on_fail(err): return list_offsets_future def _send_offset_request(self, node_id, timestamps): - version = self._client.api_version(OffsetRequest, max_version=1) + version = self._client.api_version(ListOffsetsRequest, max_version=1) by_topic = collections.defaultdict(list) for tp, timestamp in six.iteritems(timestamps): if version >= 1: @@ -579,7 +579,7 @@ def _send_offset_request(self, node_id, timestamps): data = (tp.partition, timestamp, 1) by_topic[tp.topic].append(data) - request = OffsetRequest[version](-1, list(six.iteritems(by_topic))) + request = ListOffsetsRequest[version](-1, list(six.iteritems(by_topic))) # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it @@ -596,7 +596,7 @@ def _handle_offset_response(self, future, response): Arguments: future (Future): the future to update based on response - response (OffsetResponse): response from the server + response (ListOffsetsResponse): response from the server Raises: AssertionError: if response does not match partition @@ -610,18 +610,18 @@ def _handle_offset_response(self, future, response): if error_type is Errors.NoError: if response.API_VERSION == 0: offsets = partition_info[2] - assert len(offsets) <= 1, 'Expected OffsetResponse with one offset' + assert len(offsets) <= 1, 'Expected ListOffsetsResponse with one offset' if not offsets: offset = UNKNOWN_OFFSET else: offset = offsets[0] - log.debug("Handling v0 ListOffsetResponse response for %s. " + log.debug("Handling v0 ListOffsetsResponse response for %s. " "Fetched offset %s", partition, offset) if offset != UNKNOWN_OFFSET: timestamp_offset_map[partition] = (offset, None) else: timestamp, offset = partition_info[2:] - log.debug("Handling ListOffsetResponse response for %s. " + log.debug("Handling ListOffsetsResponse response for %s. " "Fetched offset %s, timestamp %s", partition, offset, timestamp) if offset != UNKNOWN_OFFSET: @@ -638,7 +638,7 @@ def _handle_offset_response(self, future, response): future.failure(error_type(partition)) return elif error_type is Errors.UnknownTopicOrPartitionError: - log.warning("Received unknown topic or partition error in ListOffset " + log.warning("Received unknown topic or partition error in ListOffsets " "request for partition %s. The topic/partition " + "may not exist or the user may not have Describe access " "to it.", partition) diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 1ed382b0d..9c5ad5edf 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -12,7 +12,7 @@ class OffsetResetStrategy(object): NONE = 0 -class OffsetResponse_v0(Response): +class ListOffsetsResponse_v0(Response): API_KEY = 2 API_VERSION = 0 SCHEMA = Schema( @@ -24,7 +24,7 @@ class OffsetResponse_v0(Response): ('offsets', Array(Int64)))))) ) -class OffsetResponse_v1(Response): +class ListOffsetsResponse_v1(Response): API_KEY = 2 API_VERSION = 1 SCHEMA = Schema( @@ -38,7 +38,7 @@ class OffsetResponse_v1(Response): ) -class OffsetResponse_v2(Response): +class ListOffsetsResponse_v2(Response): API_KEY = 2 API_VERSION = 2 SCHEMA = Schema( @@ -53,16 +53,16 @@ class OffsetResponse_v2(Response): ) -class OffsetResponse_v3(Response): +class ListOffsetsResponse_v3(Response): """ on quota violation, brokers send out responses before throttling """ API_KEY = 2 API_VERSION = 3 - SCHEMA = OffsetResponse_v2.SCHEMA + SCHEMA = ListOffsetsResponse_v2.SCHEMA -class OffsetResponse_v4(Response): +class ListOffsetsResponse_v4(Response): """ Add leader_epoch to response """ @@ -81,19 +81,19 @@ class OffsetResponse_v4(Response): ) -class OffsetResponse_v5(Response): +class ListOffsetsResponse_v5(Response): """ adds a new error code, OFFSET_NOT_AVAILABLE """ API_KEY = 2 API_VERSION = 5 - SCHEMA = OffsetResponse_v4.SCHEMA + SCHEMA = ListOffsetsResponse_v4.SCHEMA -class OffsetRequest_v0(Request): +class ListOffsetsRequest_v0(Request): API_KEY = 2 API_VERSION = 0 - RESPONSE_TYPE = OffsetResponse_v0 + RESPONSE_TYPE = ListOffsetsResponse_v0 SCHEMA = Schema( ('replica_id', Int32), ('topics', Array( @@ -107,10 +107,10 @@ class OffsetRequest_v0(Request): 'replica_id': -1 } -class OffsetRequest_v1(Request): +class ListOffsetsRequest_v1(Request): API_KEY = 2 API_VERSION = 1 - RESPONSE_TYPE = OffsetResponse_v1 + RESPONSE_TYPE = ListOffsetsResponse_v1 SCHEMA = Schema( ('replica_id', Int32), ('topics', Array( @@ -124,10 +124,10 @@ class OffsetRequest_v1(Request): } -class OffsetRequest_v2(Request): +class ListOffsetsRequest_v2(Request): API_KEY = 2 API_VERSION = 2 - RESPONSE_TYPE = OffsetResponse_v2 + RESPONSE_TYPE = ListOffsetsResponse_v2 SCHEMA = Schema( ('replica_id', Int32), ('isolation_level', Int8), # <- added isolation_level @@ -142,23 +142,23 @@ class OffsetRequest_v2(Request): } -class OffsetRequest_v3(Request): +class ListOffsetsRequest_v3(Request): API_KEY = 2 API_VERSION = 3 - RESPONSE_TYPE = OffsetResponse_v3 - SCHEMA = OffsetRequest_v2.SCHEMA + RESPONSE_TYPE = ListOffsetsResponse_v3 + SCHEMA = ListOffsetsRequest_v2.SCHEMA DEFAULTS = { 'replica_id': -1 } -class OffsetRequest_v4(Request): +class ListOffsetsRequest_v4(Request): """ Add current_leader_epoch to request """ API_KEY = 2 API_VERSION = 4 - RESPONSE_TYPE = OffsetResponse_v4 + RESPONSE_TYPE = ListOffsetsResponse_v4 SCHEMA = Schema( ('replica_id', Int32), ('isolation_level', Int8), # <- added isolation_level @@ -174,21 +174,21 @@ class OffsetRequest_v4(Request): } -class OffsetRequest_v5(Request): +class ListOffsetsRequest_v5(Request): API_KEY = 2 API_VERSION = 5 - RESPONSE_TYPE = OffsetResponse_v5 - SCHEMA = OffsetRequest_v4.SCHEMA + RESPONSE_TYPE = ListOffsetsResponse_v5 + SCHEMA = ListOffsetsRequest_v4.SCHEMA DEFAULTS = { 'replica_id': -1 } -OffsetRequest = [ - OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2, - OffsetRequest_v3, OffsetRequest_v4, OffsetRequest_v5, +ListOffsetsRequest = [ + ListOffsetsRequest_v0, ListOffsetsRequest_v1, ListOffsetsRequest_v2, + ListOffsetsRequest_v3, ListOffsetsRequest_v4, ListOffsetsRequest_v5, ] -OffsetResponse = [ - OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2, - OffsetResponse_v3, OffsetResponse_v4, OffsetResponse_v5, +ListOffsetsResponse = [ + ListOffsetsResponse_v0, ListOffsetsResponse_v1, ListOffsetsResponse_v2, + ListOffsetsResponse_v3, ListOffsetsResponse_v4, ListOffsetsResponse_v5, ] diff --git a/test/test_fetcher.py b/test/test_fetcher.py index c9b424d54..69774ccd0 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -18,7 +18,7 @@ from kafka.metrics import Metrics from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.fetch import FetchRequest, FetchResponse -from kafka.protocol.offset import OffsetResponse +from kafka.protocol.offset import ListOffsetsResponse from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError @@ -264,7 +264,7 @@ def send_side_effect(node_id, timestamps): def test__handle_offset_response(fetcher, mocker): # Broker returns UnsupportedForMessageFormatError, will omit partition fut = Future() - res = OffsetResponse[1]([ + res = ListOffsetsResponse[1]([ ("topic", [(0, 43, -1, -1)]), ("topic", [(1, 0, 1000, 9999)]) ]) @@ -274,7 +274,7 @@ def test__handle_offset_response(fetcher, mocker): # Broker returns NotLeaderForPartitionError fut = Future() - res = OffsetResponse[1]([ + res = ListOffsetsResponse[1]([ ("topic", [(0, 6, -1, -1)]), ]) fetcher._handle_offset_response(fut, res) @@ -283,7 +283,7 @@ def test__handle_offset_response(fetcher, mocker): # Broker returns UnknownTopicOrPartitionError fut = Future() - res = OffsetResponse[1]([ + res = ListOffsetsResponse[1]([ ("topic", [(0, 3, -1, -1)]), ]) fetcher._handle_offset_response(fut, res) @@ -293,7 +293,7 @@ def test__handle_offset_response(fetcher, mocker): # Broker returns many errors and 1 result # Will fail on 1st error and return fut = Future() - res = OffsetResponse[1]([ + res = ListOffsetsResponse[1]([ ("topic", [(0, 43, -1, -1)]), ("topic", [(1, 6, -1, -1)]), ("topic", [(2, 3, -1, -1)]), From 1b1b3222b6ed209942b89a0c6091fec729f45e42 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Feb 2025 15:56:50 -0800 Subject: [PATCH 2/4] rename kafka.protocol.offset -> kafka.protocol.list_offsets --- kafka/conn.py | 2 +- kafka/consumer/fetcher.py | 2 +- kafka/consumer/group.py | 2 +- kafka/consumer/subscription_state.py | 2 +- kafka/protocol/{offset.py => list_offsets.py} | 0 test/test_fetcher.py | 2 +- 6 files changed, 5 insertions(+), 5 deletions(-) rename kafka/protocol/{offset.py => list_offsets.py} (100%) diff --git a/kafka/conn.py b/kafka/conn.py index c92ea46f7..1672e4396 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -26,7 +26,7 @@ from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest, DescribeClientQuotasRequest from kafka.protocol.commit import OffsetFetchRequest -from kafka.protocol.offset import ListOffsetsRequest +from kafka.protocol.list_offsets import ListOffsetsRequest from kafka.protocol.produce import ProduceRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.fetch import FetchRequest diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d1f756f2a..b32268bda 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -13,7 +13,7 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest -from kafka.protocol.offset import ( +from kafka.protocol.list_offsets import ( ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET ) from kafka.record import MemoryRecords diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 06e10b886..38d758578 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -16,7 +16,7 @@ from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.metrics import MetricConfig, Metrics -from kafka.protocol.offset import OffsetResetStrategy +from kafka.protocol.list_offsets import OffsetResetStrategy from kafka.structs import TopicPartition from kafka.version import __version__ diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 5ca7c7346..a329ad3e9 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -7,7 +7,7 @@ from kafka.vendor import six from kafka.errors import IllegalStateError -from kafka.protocol.offset import OffsetResetStrategy +from kafka.protocol.list_offsets import OffsetResetStrategy from kafka.structs import OffsetAndMetadata log = logging.getLogger(__name__) diff --git a/kafka/protocol/offset.py b/kafka/protocol/list_offsets.py similarity index 100% rename from kafka/protocol/offset.py rename to kafka/protocol/list_offsets.py diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 69774ccd0..23159d6aa 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -18,7 +18,7 @@ from kafka.metrics import Metrics from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.fetch import FetchRequest, FetchResponse -from kafka.protocol.offset import ListOffsetsResponse +from kafka.protocol.list_offsets import ListOffsetsResponse from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError From e1df1b2cfe16c68433fb2d2d8c83070199b3bfb2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Feb 2025 16:03:46 -0800 Subject: [PATCH 3/4] Support ListOffsets v3 in consumer --- kafka/consumer/fetcher.py | 15 +++++++++++++-- test/test_fetcher.py | 24 +++++++++++++++++++++++- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index b32268bda..e3d875564 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -570,7 +570,7 @@ def on_fail(err): return list_offsets_future def _send_offset_request(self, node_id, timestamps): - version = self._client.api_version(ListOffsetsRequest, max_version=1) + version = self._client.api_version(ListOffsetsRequest, max_version=3) by_topic = collections.defaultdict(list) for tp, timestamp in six.iteritems(timestamps): if version >= 1: @@ -579,7 +579,16 @@ def _send_offset_request(self, node_id, timestamps): data = (tp.partition, timestamp, 1) by_topic[tp.topic].append(data) - request = ListOffsetsRequest[version](-1, list(six.iteritems(by_topic))) + if version <= 1: + request = ListOffsetsRequest[version]( + -1, + list(six.iteritems(by_topic))) + else: + request = ListOffsetsRequest[version]( + -1, + self._isolation_level, + list(six.iteritems(by_topic))) + # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it @@ -601,6 +610,8 @@ def _handle_offset_response(self, future, response): Raises: AssertionError: if response does not match partition """ + if response.API_VERSION >= 2 and response.throttle_time_ms > 0: + log.warning("ListOffsetsRequest throttled by broker (%d ms)", response.throttle_time_ms) timestamp_offset_map = {} for topic, part_data in response.topics: for partition_info in part_data: diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 23159d6aa..c2066e5f9 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -261,7 +261,7 @@ def send_side_effect(node_id, timestamps): assert isinstance(fut.exception, UnknownTopicOrPartitionError) -def test__handle_offset_response(fetcher, mocker): +def test__handle_offset_response_v1(fetcher, mocker): # Broker returns UnsupportedForMessageFormatError, will omit partition fut = Future() res = ListOffsetsResponse[1]([ @@ -304,6 +304,28 @@ def test__handle_offset_response(fetcher, mocker): assert isinstance(fut.exception, NotLeaderForPartitionError) +def test__handle_offset_response_v2_v3(fetcher, mocker): + # including a throttle_time shouldnt cause issues + fut = Future() + res = ListOffsetsResponse[2]( + 123, # throttle_time_ms + [("topic", [(0, 0, 1000, 9999)]) + ]) + fetcher._handle_offset_response(fut, res) + assert fut.succeeded() + assert fut.value == {TopicPartition("topic", 0): (9999, 1000)} + + # v3 response is the same format + fut = Future() + res = ListOffsetsResponse[3]( + 123, # throttle_time_ms + [("topic", [(0, 0, 1000, 9999)]) + ]) + fetcher._handle_offset_response(fut, res) + assert fut.succeeded() + assert fut.value == {TopicPartition("topic", 0): (9999, 1000)} + + def test_fetched_records(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) From d272455b0cf75797cea75161f18dadc9f46c701f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 27 Feb 2025 08:57:29 -0800 Subject: [PATCH 4/4] rename methods -> list_offsets --- kafka/consumer/fetcher.py | 14 ++++++------ test/test_fetcher.py | 48 +++++++++++++++++++-------------------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e3d875564..c6886c490 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -272,7 +272,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): if not timestamps: return {} - future = self._send_offset_requests(timestamps) + future = self._send_list_offsets_requests(timestamps) self._client.poll(future=future, timeout_ms=remaining_ms) if future.succeeded(): @@ -519,7 +519,7 @@ def _deserialize(self, f, topic, bytes_): return f.deserialize(topic, bytes_) return f(bytes_) - def _send_offset_requests(self, timestamps): + def _send_list_offsets_requests(self, timestamps): """Fetch offsets for each partition in timestamps dict. This may send request to multiple nodes, based on who is Leader for partition. @@ -564,12 +564,12 @@ def on_fail(err): list_offsets_future.failure(err) for node_id, timestamps in six.iteritems(timestamps_by_node): - _f = self._send_offset_request(node_id, timestamps) + _f = self._send_list_offsets_request(node_id, timestamps) _f.add_callback(on_success) _f.add_errback(on_fail) return list_offsets_future - def _send_offset_request(self, node_id, timestamps): + def _send_list_offsets_request(self, node_id, timestamps): version = self._client.api_version(ListOffsetsRequest, max_version=3) by_topic = collections.defaultdict(list) for tp, timestamp in six.iteritems(timestamps): @@ -596,12 +596,12 @@ def _send_offset_request(self, node_id, timestamps): future = Future() _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_response, future) + _f.add_callback(self._handle_list_offsets_response, future) _f.add_errback(lambda e: future.failure(e)) return future - def _handle_offset_response(self, future, response): - """Callback for the response of the list offset call above. + def _handle_list_offsets_response(self, future, response): + """Callback for the response of the ListOffsets api call Arguments: future (Future): the future to update based on response diff --git a/test/test_fetcher.py b/test/test_fetcher.py index c2066e5f9..e74369289 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -149,9 +149,9 @@ def test__reset_offset(fetcher, mocker): assert fetcher._subscriptions.assignment[tp].position == 1001 -def test__send_offset_requests(fetcher, mocker): - tp = TopicPartition("topic_send_offset", 1) - mocked_send = mocker.patch.object(fetcher, "_send_offset_request") +def test__send_list_offsets_requests(fetcher, mocker): + tp = TopicPartition("topic_send_list_offsets", 1) + mocked_send = mocker.patch.object(fetcher, "_send_list_offsets_request") send_futures = [] def send_side_effect(*args, **kw): @@ -168,19 +168,19 @@ def send_side_effect(*args, **kw): [None, -1], itertools.cycle([0])) # Leader == None - fut = fetcher._send_offset_requests({tp: 0}) + fut = fetcher._send_list_offsets_requests({tp: 0}) assert fut.failed() assert isinstance(fut.exception, StaleMetadata) assert not mocked_send.called # Leader == -1 - fut = fetcher._send_offset_requests({tp: 0}) + fut = fetcher._send_list_offsets_requests({tp: 0}) assert fut.failed() assert isinstance(fut.exception, LeaderNotAvailableError) assert not mocked_send.called # Leader == 0, send failed - fut = fetcher._send_offset_requests({tp: 0}) + fut = fetcher._send_list_offsets_requests({tp: 0}) assert not fut.is_done assert mocked_send.called # Check that we bound the futures correctly to chain failure @@ -189,7 +189,7 @@ def send_side_effect(*args, **kw): assert isinstance(fut.exception, NotLeaderForPartitionError) # Leader == 0, send success - fut = fetcher._send_offset_requests({tp: 0}) + fut = fetcher._send_list_offsets_requests({tp: 0}) assert not fut.is_done assert mocked_send.called # Check that we bound the futures correctly to chain success @@ -198,12 +198,12 @@ def send_side_effect(*args, **kw): assert fut.value == {tp: (10, 10000)} -def test__send_offset_requests_multiple_nodes(fetcher, mocker): - tp1 = TopicPartition("topic_send_offset", 1) - tp2 = TopicPartition("topic_send_offset", 2) - tp3 = TopicPartition("topic_send_offset", 3) - tp4 = TopicPartition("topic_send_offset", 4) - mocked_send = mocker.patch.object(fetcher, "_send_offset_request") +def test__send_list_offsets_requests_multiple_nodes(fetcher, mocker): + tp1 = TopicPartition("topic_send_list_offsets", 1) + tp2 = TopicPartition("topic_send_list_offsets", 2) + tp3 = TopicPartition("topic_send_list_offsets", 3) + tp4 = TopicPartition("topic_send_list_offsets", 4) + mocked_send = mocker.patch.object(fetcher, "_send_list_offsets_request") send_futures = [] def send_side_effect(node_id, timestamps): @@ -218,7 +218,7 @@ def send_side_effect(node_id, timestamps): # -- All node succeeded case tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)]) - fut = fetcher._send_offset_requests(tss) + fut = fetcher._send_list_offsets_requests(tss) assert not fut.is_done assert mocked_send.call_count == 2 @@ -244,7 +244,7 @@ def send_side_effect(node_id, timestamps): # -- First succeeded second not del send_futures[:] - fut = fetcher._send_offset_requests(tss) + fut = fetcher._send_list_offsets_requests(tss) assert len(send_futures) == 2 send_futures[0][2].success({tp1: (11, 1001)}) send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1)) @@ -253,7 +253,7 @@ def send_side_effect(node_id, timestamps): # -- First fails second succeeded del send_futures[:] - fut = fetcher._send_offset_requests(tss) + fut = fetcher._send_list_offsets_requests(tss) assert len(send_futures) == 2 send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1)) send_futures[1][2].success({tp1: (11, 1001)}) @@ -261,14 +261,14 @@ def send_side_effect(node_id, timestamps): assert isinstance(fut.exception, UnknownTopicOrPartitionError) -def test__handle_offset_response_v1(fetcher, mocker): +def test__handle_list_offsets_response_v1(fetcher, mocker): # Broker returns UnsupportedForMessageFormatError, will omit partition fut = Future() res = ListOffsetsResponse[1]([ ("topic", [(0, 43, -1, -1)]), ("topic", [(1, 0, 1000, 9999)]) ]) - fetcher._handle_offset_response(fut, res) + fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() assert fut.value == {TopicPartition("topic", 1): (9999, 1000)} @@ -277,7 +277,7 @@ def test__handle_offset_response_v1(fetcher, mocker): res = ListOffsetsResponse[1]([ ("topic", [(0, 6, -1, -1)]), ]) - fetcher._handle_offset_response(fut, res) + fetcher._handle_list_offsets_response(fut, res) assert fut.failed() assert isinstance(fut.exception, NotLeaderForPartitionError) @@ -286,7 +286,7 @@ def test__handle_offset_response_v1(fetcher, mocker): res = ListOffsetsResponse[1]([ ("topic", [(0, 3, -1, -1)]), ]) - fetcher._handle_offset_response(fut, res) + fetcher._handle_list_offsets_response(fut, res) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) @@ -299,19 +299,19 @@ def test__handle_offset_response_v1(fetcher, mocker): ("topic", [(2, 3, -1, -1)]), ("topic", [(3, 0, 1000, 9999)]) ]) - fetcher._handle_offset_response(fut, res) + fetcher._handle_list_offsets_response(fut, res) assert fut.failed() assert isinstance(fut.exception, NotLeaderForPartitionError) -def test__handle_offset_response_v2_v3(fetcher, mocker): +def test__handle_list_offsets_response_v2_v3(fetcher, mocker): # including a throttle_time shouldnt cause issues fut = Future() res = ListOffsetsResponse[2]( 123, # throttle_time_ms [("topic", [(0, 0, 1000, 9999)]) ]) - fetcher._handle_offset_response(fut, res) + fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() assert fut.value == {TopicPartition("topic", 0): (9999, 1000)} @@ -321,7 +321,7 @@ def test__handle_offset_response_v2_v3(fetcher, mocker): 123, # throttle_time_ms [("topic", [(0, 0, 1000, 9999)]) ]) - fetcher._handle_offset_response(fut, res) + fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() assert fut.value == {TopicPartition("topic", 0): (9999, 1000)}