Skip to content

Commit a7d3063

Browse files
andyxningdpkp
authored andcommitted
add support for smaller topic metadata fetch during bootstrap (#1541)
1 parent 9ac3cb1 commit a7d3063

File tree

3 files changed

+14
-6
lines changed

3 files changed

+14
-6
lines changed

kafka/client_async.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ class KafkaClient(object):
149149

150150
DEFAULT_CONFIG = {
151151
'bootstrap_servers': 'localhost',
152+
'bootstrap_topics_filter': set(),
152153
'client_id': 'kafka-python-' + __version__,
153154
'request_timeout_ms': 30000,
154155
'connections_max_idle_ms': 9 * 60 * 1000,
@@ -236,9 +237,15 @@ def _bootstrap(self, hosts):
236237
self._last_bootstrap = time.time()
237238

238239
if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
239-
metadata_request = MetadataRequest[0]([])
240+
if self.config['bootstrap_topics_filter']:
241+
metadata_request = MetadataRequest[0](list(self.config['bootstrap_topics_filter']))
242+
else:
243+
metadata_request = MetadataRequest[0]([])
240244
else:
241-
metadata_request = MetadataRequest[1](None)
245+
if self.config['bootstrap_topics_filter']:
246+
metadata_request = MetadataRequest[1](list(self.config['bootstrap_topics_filter']))
247+
else:
248+
metadata_request = MetadataRequest[1](None)
242249

243250
for host, port, afi in hosts:
244251
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
@@ -830,7 +837,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
830837
self._refresh_on_disconnects = False
831838
try:
832839
remaining = end - time.time()
833-
version = conn.check_version(timeout=remaining, strict=strict)
840+
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
834841
return version
835842
except Errors.NodeNotReadyError:
836843
# Only raise to user if this is a node-specific request

kafka/conn.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
892892
# so if all else fails, choose that
893893
return (0, 10, 0)
894894

895-
def check_version(self, timeout=2, strict=False):
895+
def check_version(self, timeout=2, strict=False, topics=[]):
896896
"""Attempt to guess the broker version.
897897
898898
Note: This is a blocking call.
@@ -925,7 +925,7 @@ def check_version(self, timeout=2, strict=False):
925925
((0, 9), ListGroupsRequest[0]()),
926926
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
927927
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
928-
((0, 8, 0), MetadataRequest[0]([])),
928+
((0, 8, 0), MetadataRequest[0](topics)),
929929
]
930930

931931
for version, request in test_cases:
@@ -941,7 +941,7 @@ def check_version(self, timeout=2, strict=False):
941941
# the attempt to write to a disconnected socket should
942942
# immediately fail and allow us to infer that the prior
943943
# request was unrecognized
944-
mr = self.send(MetadataRequest[0]([]))
944+
mr = self.send(MetadataRequest[0](topics))
945945

946946
selector = self.config['selector']()
947947
selector.register(self._sock, selectors.EVENT_READ)

kafka/producer/kafka.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ class KafkaProducer(object):
281281
'key_serializer': None,
282282
'value_serializer': None,
283283
'acks': 1,
284+
'bootstrap_topics_filter': set(),
284285
'compression_type': None,
285286
'retries': 0,
286287
'batch_size': 16384,

0 commit comments

Comments
 (0)