From c7db084887ab2886af47bcace815f1a2eea068bf Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 25 Feb 2025 12:33:21 -0800 Subject: [PATCH 1/9] Add KafkaClient.api_version(operation) for best available from api_versions --- kafka/admin/client.py | 68 +++++++++++-------------------------------- kafka/client_async.py | 35 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 51 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index c9e51e5c9..310227855 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -215,11 +215,7 @@ def __init__(self, **configs): ) # Get auto-discovered version from client if necessary - if self.config['api_version'] is None: - self.config['api_version'] = self._client.config['api_version'] - else: - # need to run check_version for get_api_versions() - self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) + self.config['api_version'] = self._client.config['api_version'] self._closed = False self._refresh_controller_id() @@ -236,35 +232,6 @@ def close(self): self._closed = True log.debug("KafkaAdminClient is now closed.") - def _matching_api_version(self, operation): - """Find the latest version of the protocol operation supported by both - this library and the broker. - - This resolves to the lesser of either the latest api version this - library supports, or the max version supported by the broker. - - Arguments: - operation: A list of protocol operation versions from kafka.protocol. - - Returns: - int: The max matching version number between client and broker. - """ - broker_api_versions = self._client.get_api_versions() - api_key = operation[0].API_KEY - if broker_api_versions is None or api_key not in broker_api_versions: - raise IncompatibleBrokerVersion( - "Kafka broker does not support the '{}' Kafka protocol." - .format(operation[0].__name__)) - min_version, max_version = broker_api_versions[api_key] - version = min(len(operation) - 1, max_version) - if version < min_version: - # max library version is less than min broker version. Currently, - # no Kafka versions specify a min msg version. Maybe in the future? - raise IncompatibleBrokerVersion( - "No version of the '{}' Kafka protocol is supported by both the client and broker." - .format(operation[0].__name__)) - return version - def _validate_timeout(self, timeout_ms): """Validate the timeout is set or use the configuration default. @@ -278,7 +245,7 @@ def _validate_timeout(self, timeout_ms): def _refresh_controller_id(self, timeout_ms=30000): """Determine the Kafka cluster controller.""" - version = self._matching_api_version(MetadataRequest) + version = self._client.api_version(MetadataRequest, max_version=6) if 1 <= version <= 6: timeout_at = time.time() + timeout_ms / 1000 while time.time() < timeout_at: @@ -323,8 +290,7 @@ def _find_coordinator_id_send_request(self, group_id): # When I experimented with this, the coordinator value returned in # GroupCoordinatorResponse_v1 didn't match the value returned by # GroupCoordinatorResponse_v0 and I couldn't figure out why. - version = 0 - # version = self._matching_api_version(GroupCoordinatorRequest) + version = self._client.api_version(GroupCoordinatorRequest, max_version=0) if version <= 0: request = GroupCoordinatorRequest[version](group_id) else: @@ -493,7 +459,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False): Returns: Appropriate version of CreateTopicResponse class. """ - version = self._matching_api_version(CreateTopicsRequest) + version = self._client.api_version(CreateTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) if version == 0: if validate_only: @@ -531,7 +497,7 @@ def delete_topics(self, topics, timeout_ms=None): Returns: Appropriate version of DeleteTopicsResponse class. """ - version = self._matching_api_version(DeleteTopicsRequest) + version = self._client.api_version(DeleteTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) if version <= 3: request = DeleteTopicsRequest[version]( @@ -550,7 +516,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): """ topics == None means "get all topics" """ - version = self._matching_api_version(MetadataRequest) + version = self._client.api_version(MetadataRequest, max_version=5) if version <= 3: if auto_topic_creation: raise IncompatibleBrokerVersion( @@ -667,7 +633,7 @@ def describe_acls(self, acl_filter): tuple of a list of matching ACL objects and a KafkaError (NoError if successful) """ - version = self._matching_api_version(DescribeAclsRequest) + version = self._client.api_version(DescribeAclsRequest, max_version=1) if version == 0: request = DescribeAclsRequest[version]( resource_type=acl_filter.resource_pattern.resource_type, @@ -801,7 +767,7 @@ def create_acls(self, acls): if not isinstance(acl, ACL): raise IllegalArgumentError("acls must contain ACL objects") - version = self._matching_api_version(CreateAclsRequest) + version = self._client.api_version(CreateAclsRequest, max_version=1) if version == 0: request = CreateAclsRequest[version]( creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls] @@ -923,7 +889,7 @@ def delete_acls(self, acl_filters): if not isinstance(acl, ACLFilter): raise IllegalArgumentError("acl_filters must contain ACLFilter type objects") - version = self._matching_api_version(DeleteAclsRequest) + version = self._client.api_version(DeleteAclsRequest, max_version=1) if version == 0: request = DeleteAclsRequest[version]( @@ -992,7 +958,7 @@ def describe_configs(self, config_resources, include_synonyms=False): topic_resources.append(self._convert_describe_config_resource_request(config_resource)) futures = [] - version = self._matching_api_version(DescribeConfigsRequest) + version = self._client.api_version(DescribeConfigsRequest, max_version=2) if version == 0: if include_synonyms: raise IncompatibleBrokerVersion( @@ -1077,7 +1043,7 @@ def alter_configs(self, config_resources): Returns: Appropriate version of AlterConfigsResponse class. """ - version = self._matching_api_version(AlterConfigsRequest) + version = self._client.api_version(AlterConfigsRequest, max_version=1) if version <= 1: request = AlterConfigsRequest[version]( resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] @@ -1138,7 +1104,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal Returns: Appropriate version of CreatePartitionsResponse class. """ - version = self._matching_api_version(CreatePartitionsRequest) + version = self._client.api_version(CreatePartitionsRequest, max_version=1) timeout_ms = self._validate_timeout(timeout_ms) if version <= 1: request = CreatePartitionsRequest[version]( @@ -1177,7 +1143,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, Returns: A message future. """ - version = self._matching_api_version(DescribeGroupsRequest) + version = self._client.api_version(DescribeGroupsRequest, max_version=3) if version <= 2: if include_authorized_operations: raise IncompatibleBrokerVersion( @@ -1311,7 +1277,7 @@ def _list_consumer_groups_send_request(self, broker_id): Returns: A message future """ - version = self._matching_api_version(ListGroupsRequest) + version = self._client.api_version(ListGroupsRequest, max_version=2) if version <= 2: request = ListGroupsRequest[version]() else: @@ -1394,7 +1360,7 @@ def _list_consumer_group_offsets_send_request(self, group_id, Returns: A message future """ - version = self._matching_api_version(OffsetFetchRequest) + version = self._client.api_version(OffsetFetchRequest, max_version=3) if version <= 3: if partitions is None: if version <= 1: @@ -1564,7 +1530,7 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id): Returns: A future representing the in-flight DeleteGroupsRequest. """ - version = self._matching_api_version(DeleteGroupsRequest) + version = self._client.api_version(DeleteGroupsRequest, max_version=1) if version <= 1: request = DeleteGroupsRequest[version](group_ids) else: @@ -1595,7 +1561,7 @@ def describe_log_dirs(self): Returns: A message future """ - version = self._matching_api_version(DescribeLogDirsRequest) + version = self._client.api_version(DescribeLogDirsRequest, max_version=0) if version <= 0: request = DescribeLogDirsRequest[version]() future = self._send_request_to_node(self._client.least_loaded_node(), request) diff --git a/kafka/client_async.py b/kafka/client_async.py index be19cf80b..4a9a9d060 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -962,6 +962,41 @@ def check_version(self, node_id=None, timeout=2, strict=False): self._lock.release() raise Errors.NoBrokersAvailable() + def api_version(self, operation, max_version=None): + """Find the latest version of the protocol operation supported by both + this library and the broker. + + This resolves to the lesser of either the latest api version this + library supports, or the max version supported by the broker. + + Arguments: + operation: A list of protocol operation versions from kafka.protocol. + + Keyword Arguments: + max_version (int, optional): Provide an alternate maximum api version + to reflect limitations in user code. + + Returns: + int: The highest api version number compatible between client and broker. + """ + # Cap max_version at the largest available version in operation list + max_version = min(len(operation) - 1, max_version if max_version is not None else float('inf')) + broker_api_versions = self._api_versions + api_key = operation[0].API_KEY + if broker_api_versions is None or api_key not in broker_api_versions: + raise IncompatibleBrokerVersion( + "Kafka broker does not support the '{}' Kafka protocol." + .format(operation[0].__name__)) + broker_min_version, broker_max_version = broker_api_versions[api_key] + version = min(max_version, broker_max_version) + if version < broker_min_version: + # max library version is less than min broker version. Currently, + # no Kafka versions specify a min msg version. Maybe in the future? + raise IncompatibleBrokerVersion( + "No version of the '{}' Kafka protocol is supported by both the client and broker." + .format(operation[0].__name__)) + return version + def wakeup(self): if self._waking or self._wake_w is None: return From 1487df2728260261a292a28bb17234718378e5c4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 25 Feb 2025 12:34:22 -0800 Subject: [PATCH 2/9] Fixup DescribeAclsRequest array --- kafka/conn.py | 4 ++-- kafka/protocol/admin.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 4d1c36b95..2a4f1df17 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -24,7 +24,7 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2, DescribeClientQuotasRequest +from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest, DescribeClientQuotasRequest from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.offset import OffsetRequest from kafka.protocol.produce import ProduceRequest @@ -1179,7 +1179,7 @@ def _infer_broker_version_from_api_versions(self, api_versions): # format (, ) # Make sure to update consumer_integration test check when adding newer versions. ((2, 6), DescribeClientQuotasRequest[0]), - ((2, 5), DescribeAclsRequest_v2), + ((2, 5), DescribeAclsRequest[2]), ((2, 4), ProduceRequest[8]), ((2, 3), FetchRequest[11]), ((2, 2), OffsetRequest[5]), diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 3da5c5419..c237ef7e0 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -463,8 +463,8 @@ class DescribeAclsRequest_v2(Request): SCHEMA = DescribeAclsRequest_v1.SCHEMA -DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1] -DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1] +DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1, DescribeAclsRequest_v2] +DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1, DescribeAclsResponse_v2] class CreateAclsResponse_v0(Response): API_KEY = 30 From 8a2c91e3f19a33c8f8b070185a9f44b654b2716a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 25 Feb 2025 12:52:23 -0800 Subject: [PATCH 3/9] Use api_version() to select MetadataRequest from client --- kafka/client_async.py | 4 ++-- kafka/protocol/metadata.py | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 4a9a9d060..48923cca3 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -868,9 +868,9 @@ def _maybe_refresh_metadata(self, wakeup=False): if not topics and self.cluster.is_bootstrap(node_id): topics = list(self.config['bootstrap_topics_filter']) + api_version = self.api_version(MetadataRequest, max_version=1) if self.cluster.need_all_topic_metadata or not topics: - topics = [] if self.config['api_version'] < (0, 10, 0) else None - api_version = 0 if self.config['api_version'] < (0, 10, 0) else 1 + topics = MetadataRequest[api_version].ALL_TOPICS request = MetadataRequest[api_version](topics) log.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request, wakeup=wakeup) diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 414e5b84a..bb4305001 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -135,7 +135,7 @@ class MetadataRequest_v0(Request): SCHEMA = Schema( ('topics', Array(String('utf-8'))) ) - ALL_TOPICS = None # Empty Array (len 0) for topics returns all topics + ALL_TOPICS = [] # Empty Array (len 0) for topics returns all topics class MetadataRequest_v1(Request): @@ -143,8 +143,8 @@ class MetadataRequest_v1(Request): API_VERSION = 1 RESPONSE_TYPE = MetadataResponse_v1 SCHEMA = MetadataRequest_v0.SCHEMA - ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics - NO_TOPICS = None # Empty array (len 0) for topics returns no topics + ALL_TOPICS = None # Null Array (len -1) for topics returns all topics + NO_TOPICS = [] # Empty array (len 0) for topics returns no topics class MetadataRequest_v2(Request): @@ -152,8 +152,8 @@ class MetadataRequest_v2(Request): API_VERSION = 2 RESPONSE_TYPE = MetadataResponse_v2 SCHEMA = MetadataRequest_v1.SCHEMA - ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics - NO_TOPICS = None # Empty array (len 0) for topics returns no topics + ALL_TOPICS = None + NO_TOPICS = [] class MetadataRequest_v3(Request): @@ -161,8 +161,8 @@ class MetadataRequest_v3(Request): API_VERSION = 3 RESPONSE_TYPE = MetadataResponse_v3 SCHEMA = MetadataRequest_v1.SCHEMA - ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics - NO_TOPICS = None # Empty array (len 0) for topics returns no topics + ALL_TOPICS = None + NO_TOPICS = [] class MetadataRequest_v4(Request): @@ -173,8 +173,8 @@ class MetadataRequest_v4(Request): ('topics', Array(String('utf-8'))), ('allow_auto_topic_creation', Boolean) ) - ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics - NO_TOPICS = None # Empty array (len 0) for topics returns no topics + ALL_TOPICS = None + NO_TOPICS = [] class MetadataRequest_v5(Request): @@ -186,8 +186,8 @@ class MetadataRequest_v5(Request): API_VERSION = 5 RESPONSE_TYPE = MetadataResponse_v5 SCHEMA = MetadataRequest_v4.SCHEMA - ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics - NO_TOPICS = None # Empty array (len 0) for topics returns no topics + ALL_TOPICS = None + NO_TOPICS = [] MetadataRequest = [ From 30f767c1f7863390e65aa57b86c1e57fbdc9ee0a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 25 Feb 2025 15:41:13 -0800 Subject: [PATCH 4/9] use api_version() to select all api versions in consumer/coordinator/producer --- kafka/consumer/fetcher.py | 17 +++-------------- kafka/coordinator/base.py | 27 ++++++++++----------------- kafka/coordinator/consumer.py | 30 +++++++++++++----------------- kafka/producer/sender.py | 21 ++------------------- test/test_coordinator.py | 19 +++++++++++++------ test/test_fetcher.py | 17 +++++++++++------ test/test_sender.py | 11 +++++------ 7 files changed, 57 insertions(+), 85 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 333c97758..442a6c3a1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -569,10 +569,8 @@ def _send_offset_request(self, node_id, timestamps): data = (tp.partition, timestamp, 1) by_topic[tp.topic].append(data) - if self.config['api_version'] >= (0, 10, 1): - request = OffsetRequest[1](-1, list(six.iteritems(by_topic))) - else: - request = OffsetRequest[0](-1, list(six.iteritems(by_topic))) + version = self._client.api_version(OffsetRequest, max_version=1) + request = OffsetRequest[version](-1, list(six.iteritems(by_topic))) # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it @@ -702,16 +700,7 @@ def _create_fetch_requests(self): log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s", partition, node_id) - if self.config['api_version'] >= (0, 11): - version = 4 - elif self.config['api_version'] >= (0, 10, 1): - version = 3 - elif self.config['api_version'] >= (0, 10, 0): - version = 2 - elif self.config['api_version'] == (0, 9): - version = 1 - else: - version = 0 + version = self._client.api_version(FetchRequest, max_version=4) requests = {} for node_id, partition_data in six.iteritems(fetchable): if version < 3: diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 75d9c903d..c52548512 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -452,25 +452,18 @@ def _send_join_group_request(self): (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) for protocol, metadata in self.group_protocols() ] - if self.config['api_version'] < (0, 9): + version = self._client.api_version(JoinGroupRequest, max_version=2) + if not version: raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers') - elif (0, 9) <= self.config['api_version'] < (0, 10, 1): - request = JoinGroupRequest[0]( + elif version == 0: + request = JoinGroupRequest[version]( self.group_id, self.config['session_timeout_ms'], self._generation.member_id, self.protocol_type(), member_metadata) - elif (0, 10, 1) <= self.config['api_version'] < (0, 11): - request = JoinGroupRequest[1]( - self.group_id, - self.config['session_timeout_ms'], - self.config['max_poll_interval_ms'], - self._generation.member_id, - self.protocol_type(), - member_metadata) else: - request = JoinGroupRequest[2]( + request = JoinGroupRequest[version]( self.group_id, self.config['session_timeout_ms'], self.config['max_poll_interval_ms'], @@ -562,7 +555,7 @@ def _handle_join_group_response(self, future, send_time, response): def _on_join_follower(self): # send follower's sync group with an empty assignment - version = 0 if self.config['api_version'] < (0, 11) else 1 + version = self._client.api_version(SyncGroupRequest, max_version=1) request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, @@ -590,7 +583,7 @@ def _on_join_leader(self, response): except Exception as e: return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11) else 1 + version = self._client.api_version(SyncGroupRequest, max_version=1) request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, @@ -744,7 +737,7 @@ def _start_heartbeat_thread(self): self._heartbeat_thread.start() def _close_heartbeat_thread(self): - if self._heartbeat_thread is not None: + if hasattr(self, '_heartbeat_thread') and self._heartbeat_thread is not None: log.info('Stopping heartbeat thread') try: self._heartbeat_thread.close() @@ -771,7 +764,7 @@ def maybe_leave_group(self): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. log.info('Leaving consumer group (%s).', self.group_id) - version = 0 if self.config['api_version'] < (0, 11) else 1 + version = self._client.api_version(LeaveGroupRequest, max_version=1) request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) @@ -799,7 +792,7 @@ def _send_heartbeat_request(self): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11) else 1 + version = self._client.api_version(HeartbeatRequest, max_version=1) request = HeartbeatRequest[version](self.group_id, self._generation.generation_id, self._generation.member_id) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 971f5e802..026fac833 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -582,12 +582,13 @@ def _send_offset_commit_request(self, offsets): if self.config['api_version'] >= (0, 9) and generation is None: return Future().failure(Errors.CommitFailedError()) - if self.config['api_version'] >= (0, 9): - request = OffsetCommitRequest[2]( + version = self._client.api_version(OffsetCommitRequest, max_version=2) + if version == 2: + request = OffsetCommitRequest[version]( self.group_id, generation.generation_id, generation.member_id, - OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, + OffsetCommitRequest[version].DEFAULT_RETENTION_TIME, [( topic, [( partition, @@ -596,8 +597,8 @@ def _send_offset_commit_request(self, offsets): ) for partition, offset in six.iteritems(partitions)] ) for topic, partitions in six.iteritems(offset_data)] ) - elif self.config['api_version'] >= (0, 8, 2): - request = OffsetCommitRequest[1]( + elif version == 1: + request = OffsetCommitRequest[version]( self.group_id, -1, '', [( topic, [( @@ -608,8 +609,8 @@ def _send_offset_commit_request(self, offsets): ) for partition, offset in six.iteritems(partitions)] ) for topic, partitions in six.iteritems(offset_data)] ) - elif self.config['api_version'] >= (0, 8, 1): - request = OffsetCommitRequest[0]( + elif version == 0: + request = OffsetCommitRequest[version]( self.group_id, [( topic, [( @@ -731,16 +732,11 @@ def _send_offset_fetch_request(self, partitions): for tp in partitions: topic_partitions[tp.topic].add(tp.partition) - if self.config['api_version'] >= (0, 8, 2): - request = OffsetFetchRequest[1]( - self.group_id, - list(topic_partitions.items()) - ) - else: - request = OffsetFetchRequest[0]( - self.group_id, - list(topic_partitions.items()) - ) + version = self._client.api_version(OffsetFetchRequest, max_version=1) + request = OffsetFetchRequest[version]( + self.group_id, + list(topic_partitions.items()) + ) # send the request with a callback future = Future() diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index ac9c5a96f..56805958e 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -301,31 +301,14 @@ def _produce_request(self, node_id, acks, timeout, batches): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - kwargs = {} - if self.config['api_version'] >= (2, 1): - version = 7 - elif self.config['api_version'] >= (2, 0): - version = 6 - elif self.config['api_version'] >= (1, 1): - version = 5 - elif self.config['api_version'] >= (1, 0): - version = 4 - elif self.config['api_version'] >= (0, 11): - version = 3 - kwargs = dict(transactional_id=None) - elif self.config['api_version'] >= (0, 10, 0): - version = 2 - elif self.config['api_version'] == (0, 9): - version = 1 - else: - version = 0 + version = self._client.api_version(ProduceRequest, max_version=7) + # TODO: support transactional_id return ProduceRequest[version]( required_acks=acks, timeout=timeout, topics=[(topic, list(partition_info.items())) for topic, partition_info in six.iteritems(produce_records_by_partition)], - **kwargs ) def wakeup(self): diff --git a/test/test_coordinator.py b/test/test_coordinator.py index a35cdd1a0..0c4ee6d33 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -17,6 +17,7 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics import Metrics +from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.commit import ( OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse) @@ -41,8 +42,9 @@ def test_init(client, coordinator): @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_autocommit_enable_api_version(client, api_version): - coordinator = ConsumerCoordinator(client, SubscriptionState(), +def test_autocommit_enable_api_version(conn, api_version): + coordinator = ConsumerCoordinator(KafkaClient(api_version=api_version), + SubscriptionState(), Metrics(), enable_auto_commit=True, session_timeout_ms=30000, # session_timeout_ms and max_poll_interval_ms @@ -86,8 +88,13 @@ def test_group_protocols(coordinator): @pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_pattern_subscription(coordinator, api_version): - coordinator.config['api_version'] = api_version +def test_pattern_subscription(conn, api_version): + coordinator = ConsumerCoordinator(KafkaClient(api_version=api_version), + SubscriptionState(), + Metrics(), + api_version=api_version, + session_timeout_ms=10000, + max_poll_interval_ms=10000) coordinator._subscription.subscribe(pattern='foo') assert coordinator._subscription.subscription == set([]) assert coordinator._metadata_snapshot == coordinator._build_metadata_snapshot(coordinator._subscription, {}) @@ -436,7 +443,7 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): def test_send_offset_commit_request_versions(patched_coord, offsets, api_version, req_type): expect_node = 0 - patched_coord.config['api_version'] = api_version + patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version] patched_coord._send_offset_commit_request(offsets) (node, request), _ = patched_coord._client.send.call_args @@ -532,7 +539,7 @@ def test_send_offset_fetch_request_versions(patched_coord, partitions, api_version, req_type): # assuming fixture sets coordinator=0, least_loaded_node=1 expect_node = 0 - patched_coord.config['api_version'] = api_version + patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version] patched_coord._send_offset_fetch_request(partitions) (node, request), _ = patched_coord._client.send.call_args diff --git a/test/test_fetcher.py b/test/test_fetcher.py index f8311ac79..bbc5b0c85 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -16,6 +16,7 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics import Metrics +from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.fetch import FetchRequest, FetchResponse from kafka.protocol.offset import OffsetResponse from kafka.errors import ( @@ -27,8 +28,8 @@ @pytest.fixture -def client(mocker): - return mocker.Mock(spec=KafkaClient(bootstrap_servers=(), api_version=(0, 9))) +def client(): + return KafkaClient(bootstrap_servers=(), api_version=(0, 9)) @pytest.fixture @@ -81,6 +82,8 @@ def test_send_fetches(fetcher, topic, mocker): mocker.patch.object(fetcher, '_create_fetch_requests', return_value=dict(enumerate(fetch_requests))) + mocker.patch.object(fetcher._client, 'ready', return_value=True) + mocker.patch.object(fetcher._client, 'send') ret = fetcher.send_fetches() for node, request in enumerate(fetch_requests): fetcher._client.send.assert_any_call(node, request, wakeup=False) @@ -91,14 +94,14 @@ def test_send_fetches(fetcher, topic, mocker): ((0, 10, 1), 3), ((0, 10, 0), 2), ((0, 9), 1), - ((0, 8), 0) + ((0, 8, 2), 0) ]) def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): - fetcher._client.in_flight_request_count.return_value = 0 - fetcher.config['api_version'] = api_version + fetcher._client._api_versions = BROKER_API_VERSIONS[api_version] + mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) by_node = fetcher._create_fetch_requests() requests = by_node.values() - assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests]) + assert set([r.API_VERSION for r in requests]) == set([fetch_version]) def test_update_fetch_positions(fetcher, topic, mocker): @@ -485,6 +488,7 @@ def test__parse_fetched_data__not_leader(fetcher, topic, mocker): tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None], mocker.MagicMock() ) + mocker.patch.object(fetcher._client.cluster, 'request_update') partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None fetcher._client.cluster.request_update.assert_called_with() @@ -497,6 +501,7 @@ def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker): tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None], mocker.MagicMock() ) + mocker.patch.object(fetcher._client.cluster, 'request_update') partition_record = fetcher._parse_fetched_data(completed_fetch) assert partition_record is None fetcher._client.cluster.request_update.assert_called_with() diff --git a/test/test_sender.py b/test/test_sender.py index f3bbf4275..83a26cd39 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -7,6 +7,7 @@ from kafka.client_async import KafkaClient from kafka.cluster import ClusterMetadata from kafka.metrics import Metrics +from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.produce import ProduceRequest from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch from kafka.producer.sender import Sender @@ -15,10 +16,8 @@ @pytest.fixture -def client(mocker): - _cli = mocker.Mock(spec=KafkaClient(bootstrap_servers=(), api_version=(0, 9))) - _cli.cluster = mocker.Mock(spec=ClusterMetadata()) - return _cli +def client(): + return KafkaClient(bootstrap_servers=(), api_version=(0, 9)) @pytest.fixture @@ -32,7 +31,7 @@ def metrics(): @pytest.fixture -def sender(client, accumulator, metrics): +def sender(client, accumulator, metrics, mocker): return Sender(client, client.cluster, accumulator, metrics) @@ -42,7 +41,7 @@ def sender(client, accumulator, metrics): ((0, 8, 0), 0) ]) def test_produce_request(sender, mocker, api_version, produce_version): - sender.config['api_version'] = api_version + sender._client._api_versions = BROKER_API_VERSIONS[api_version] tp = TopicPartition('foo', 0) buffer = io.BytesIO() records = MemoryRecordsBuilder( From 10aa4ba4d2454b936eb390abf0279e00c2b54236 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 25 Feb 2025 16:28:07 -0800 Subject: [PATCH 5/9] add raises to docstring for client.api_version() --- kafka/client_async.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index 48923cca3..8525f80a2 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -978,6 +978,8 @@ def api_version(self, operation, max_version=None): Returns: int: The highest api version number compatible between client and broker. + + Raises: IncompatibleBrokerVersion if no matching version is found """ # Cap max_version at the largest available version in operation list max_version = min(len(operation) - 1, max_version if max_version is not None else float('inf')) From 6a361a871159ad9825db1d5f488ba0c7d20b6b1b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 25 Feb 2025 16:28:33 -0800 Subject: [PATCH 6/9] drop version check from coordinator - api_version() raises --- kafka/coordinator/base.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c52548512..d18de0743 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -453,9 +453,7 @@ def _send_join_group_request(self): for protocol, metadata in self.group_protocols() ] version = self._client.api_version(JoinGroupRequest, max_version=2) - if not version: - raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers') - elif version == 0: + if version == 0: request = JoinGroupRequest[version]( self.group_id, self.config['session_timeout_ms'], From 31e8f5f524180e79ae3337659d00a04f8f92c347 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Feb 2025 08:40:22 -0800 Subject: [PATCH 7/9] Set api_version to client version in consumer/producer init --- kafka/consumer/group.py | 5 ++--- kafka/producer/kafka.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 2d7571d1b..ed8ddd071 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -357,9 +357,8 @@ def __init__(self, *topics, **configs): self._client = self.config['kafka_client'](metrics=self._metrics, **self.config) - # Get auto-discovered version from client if necessary - if self.config['api_version'] is None: - self.config['api_version'] = self._client.config['api_version'] + # Get auto-discovered / normalized version from client + self.config['api_version'] = self._client.config['api_version'] # Coordinator configurations are different for older brokers # max_poll_interval_ms is not supported directly -- it must the be diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 5c44a8a81..9a62ac36b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -385,9 +385,8 @@ def __init__(self, **configs): wakeup_timeout_ms=self.config['max_block_ms'], **self.config) - # Get auto-discovered version from client if necessary - if self.config['api_version'] is None: - self.config['api_version'] = client.config['api_version'] + # Get auto-discovered / normalized version from client + self.config['api_version'] = client.config['api_version'] if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' From b53e5de33817b78d20b6486700b2921f3b4a09b1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Feb 2025 08:40:56 -0800 Subject: [PATCH 8/9] Remove api_version config from Fetcher and Sender; update docstrings --- kafka/consumer/fetcher.py | 7 +++---- kafka/producer/sender.py | 5 ++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 442a6c3a1..b544e4b0e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -57,7 +57,6 @@ class Fetcher(six.Iterator): 'check_crcs': True, 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', - 'api_version': (0, 8, 0), 'retry_backoff_ms': 100 } @@ -561,15 +560,15 @@ def on_fail(err): return list_offsets_future def _send_offset_request(self, node_id, timestamps): + version = self._client.api_version(OffsetRequest, max_version=1) by_topic = collections.defaultdict(list) for tp, timestamp in six.iteritems(timestamps): - if self.config['api_version'] >= (0, 10, 1): + if version >= 1: data = (tp.partition, timestamp) else: data = (tp.partition, timestamp, 1) by_topic[tp.topic].append(data) - version = self._client.api_version(OffsetRequest, max_version=1) request = OffsetRequest[version](-1, list(six.iteritems(by_topic))) # Client returns a future that only fails on network issues @@ -660,7 +659,7 @@ def _create_fetch_requests(self): FetchRequests skipped if no leader, or node has requests in flight Returns: - dict: {node_id: FetchRequest, ...} (version depends on api_version) + dict: {node_id: FetchRequest, ...} (version depends on client api_versions) """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 56805958e..63b65d5a4 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -31,7 +31,6 @@ class Sender(threading.Thread): 'request_timeout_ms': 30000, 'guarantee_message_order': False, 'client_id': 'kafka-python-' + __version__, - 'api_version': (0, 8, 0), } def __init__(self, client, metadata, accumulator, metrics, **configs): @@ -278,7 +277,7 @@ def _create_produce_requests(self, collated): collated: {node_id: [RecordBatch]} Returns: - dict: {node_id: ProduceRequest} (version depends on api_version) + dict: {node_id: ProduceRequest} (version depends on client api_versions) """ requests = {} for node_id, batches in six.iteritems(collated): @@ -291,7 +290,7 @@ def _produce_request(self, node_id, acks, timeout, batches): """Create a produce request from the given record batches. Returns: - ProduceRequest (version depends on api_version) + ProduceRequest (version depends on client api_versions) """ produce_records_by_partition = collections.defaultdict(dict) for batch in batches: From 6c03ecadf4188bbad5e39503219854965dcc7ec2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Feb 2025 09:25:49 -0800 Subject: [PATCH 9/9] update api_version docstrings for consumer/producer/client --- kafka/client_async.py | 19 +++++++++++++++---- kafka/consumer/group.py | 11 +++++++++-- kafka/producer/kafka.py | 16 ++++++++++++++-- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 8525f80a2..27f6ab830 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -130,12 +130,23 @@ class KafkaClient(object): format. If no cipher can be selected (because compile-time options or other configuration forbids use of all the specified ciphers), an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers - api_version (tuple): Specify which Kafka API version to use. If set - to None, KafkaClient will attempt to infer the broker version by - probing various APIs. Example: (0, 10, 2). Default: None + api_version (tuple): Specify which Kafka API version to use. If set to + None, the client will attempt to determine the broker version via + ApiVersionsRequest API or, for brokers earlier than 0.10, probing + various known APIs. Dynamic version checking is performed eagerly + during __init__ and can raise NoBrokersAvailableError if no connection + was made before timeout (see api_version_auto_timeout_ms below). + Different versions enable different functionality. + + Examples: + (3, 9) most recent broker release, enable all supported features + (0, 10, 0) enables sasl authentication + (0, 8, 0) enables basic functionality only + + Default: None api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker - api version. Only applies if api_version is None + api version. Only applies if api_version set to None. selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ed8ddd071..3a4a85386 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -195,10 +195,17 @@ class KafkaConsumer(six.Iterator): or other configuration forbids use of all the specified ciphers), an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers api_version (tuple): Specify which Kafka API version to use. If set to - None, the client will attempt to infer the broker version by probing - various APIs. Different versions enable different functionality. + None, the client will attempt to determine the broker version via + ApiVersionsRequest API or, for brokers earlier than 0.10, probing + various known APIs. Dynamic version checking is performed eagerly + during __init__ and can raise NoBrokersAvailableError if no connection + was made before timeout (see api_version_auto_timeout_ms below). + Different versions enable different functionality. Examples: + (3, 9) most recent broker release, enable all supported features + (0, 11) enables message format v2 (internal) + (0, 10, 0) enables sasl authentication and message format v1 (0, 9) enables full group coordination features with automatic partition assignment and rebalancing, (0, 8, 2) enables kafka-storage offset commits with manual diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 9a62ac36b..e5d06bcf2 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -252,8 +252,20 @@ class KafkaProducer(object): or other configuration forbids use of all the specified ciphers), an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers api_version (tuple): Specify which Kafka API version to use. If set to - None, the client will attempt to infer the broker version by probing - various APIs. Example: (0, 10, 2). Default: None + None, the client will attempt to determine the broker version via + ApiVersionsRequest API or, for brokers earlier than 0.10, probing + various known APIs. Dynamic version checking is performed eagerly + during __init__ and can raise NoBrokersAvailableError if no connection + was made before timeout (see api_version_auto_timeout_ms below). + Different versions enable different functionality. + + Examples: + (3, 9) most recent broker release, enable all supported features + (0, 11) enables message format v2 (internal) + (0, 10, 0) enables sasl authentication and message format v1 + (0, 8, 0) enables basic functionality only + + Default: None api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version set to None.