Skip to content

Commit 62d3715

Browse files
dpkp88manpreet
authored andcommitted
Avoid multiple connection attempts when refreshing metadata (dpkp#1067)
1 parent 48836e8 commit 62d3715

File tree

2 files changed

+78
-63
lines changed

2 files changed

+78
-63
lines changed

kafka/client_async.py

Lines changed: 53 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@ def __init__(self, **configs):
183183
self.cluster = ClusterMetadata(**self.config)
184184
self._topics = set() # empty set will fetch all topic metadata
185185
self._metadata_refresh_in_progress = False
186-
self._last_no_node_available_ms = 0
187186
self._selector = self.config['selector']()
188187
self._conns = {}
189188
self._connecting = set()
@@ -700,57 +699,62 @@ def _maybe_refresh_metadata(self):
700699
int: milliseconds until next refresh
701700
"""
702701
ttl = self.cluster.ttl()
703-
next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff()
704-
next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0)
705-
wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0
706-
timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms)
707-
708-
if timeout == 0:
709-
node_id = self.least_loaded_node()
710-
if node_id is None:
711-
log.debug("Give up sending metadata request since no node is available")
712-
# mark the timestamp for no node available to connect
713-
self._last_no_node_available_ms = time.time() * 1000
714-
return timeout
715-
716-
topics = list(self._topics)
717-
if self.cluster.need_all_topic_metadata:
718-
if self.config['api_version'] < (0, 10):
719-
topics = []
720-
else:
721-
topics = None
702+
wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0
703+
metadata_timeout = max(ttl, wait_for_in_progress_ms)
722704

723-
if self._can_send_request(node_id):
724-
if self.config['api_version'] < (0, 10):
725-
api_version = 0
726-
else:
727-
api_version = 1
728-
request = MetadataRequest[api_version](topics)
729-
log.debug("Sending metadata request %s to node %s", request, node_id)
730-
future = self.send(node_id, request)
731-
future.add_callback(self.cluster.update_metadata)
732-
future.add_errback(self.cluster.failed_update)
733-
734-
self._metadata_refresh_in_progress = True
735-
def refresh_done(val_or_error):
736-
self._metadata_refresh_in_progress = False
737-
future.add_callback(refresh_done)
738-
future.add_errback(refresh_done)
739-
740-
elif self._can_connect(node_id):
741-
log.debug("Initializing connection to node %s for metadata request", node_id)
742-
self._maybe_connect(node_id)
743-
# If _maybe_connect failed immediately, this node will be put into blackout and we
744-
# should allow immediately retrying in case there is another candidate node. If it
745-
# is still connecting, the worst case is that we end up setting a longer timeout
746-
# on the next round and then wait for the response.
705+
if metadata_timeout > 0:
706+
return metadata_timeout
707+
708+
# Beware that the behavior of this method and the computation of
709+
# timeouts for poll() are highly dependent on the behavior of
710+
# least_loaded_node()
711+
node_id = self.least_loaded_node()
712+
if node_id is None:
713+
log.debug("Give up sending metadata request since no node is available");
714+
return self.config['reconnect_backoff_ms']
715+
716+
topics = list(self._topics)
717+
if self.cluster.need_all_topic_metadata:
718+
if self.config['api_version'] < (0, 10):
719+
topics = []
747720
else:
748-
# connected, but can't send more OR connecting
749-
# In either case, we just need to wait for a network event to let us know the selected
750-
# connection might be usable again.
751-
self._last_no_node_available_ms = time.time() * 1000
721+
topics = None
752722

753-
return timeout
723+
if self._can_send_request(node_id):
724+
if self.config['api_version'] < (0, 10):
725+
api_version = 0
726+
else:
727+
api_version = 1
728+
request = MetadataRequest[api_version](topics)
729+
log.debug("Sending metadata request %s to node %s", request, node_id)
730+
future = self.send(node_id, request)
731+
future.add_callback(self.cluster.update_metadata)
732+
future.add_errback(self.cluster.failed_update)
733+
734+
self._metadata_refresh_in_progress = True
735+
def refresh_done(val_or_error):
736+
self._metadata_refresh_in_progress = False
737+
future.add_callback(refresh_done)
738+
future.add_errback(refresh_done)
739+
return self.config['request_timeout_ms']
740+
741+
# If there's any connection establishment underway, wait until it completes. This prevents
742+
# the client from unnecessarily connecting to additional nodes while a previous connection
743+
# attempt has not been completed.
744+
if self._connecting:
745+
# Strictly the timeout we should return here is "connect timeout", but as we don't
746+
# have such application level configuration, using request timeout instead.
747+
return self.config['request_timeout_ms']
748+
749+
if self._can_connect(node_id):
750+
log.debug("Initializing connection to node %s for metadata request", node_id)
751+
self._maybe_connect(node_id)
752+
return self.config['reconnect_backoff_ms']
753+
754+
# connected but can't send more, OR connecting
755+
# In either case we just need to wait for a network event
756+
# to let us know the selected connection might be usable again.
757+
return float('inf')
754758

755759
def schedule(self, task, at):
756760
"""Schedule a new task to be executed at the given time.

test/test_client_async.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ def client(mocker):
295295
mocker.patch.object(KafkaClient, '_bootstrap')
296296
_poll = mocker.patch.object(KafkaClient, '_poll')
297297

298-
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9))
298+
cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
299299

300300
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
301301
tasks.return_value = 9999999
@@ -308,49 +308,60 @@ def client(mocker):
308308
def test_maybe_refresh_metadata_ttl(mocker, client):
309309
client.cluster.ttl.return_value = 1234
310310

311-
client.poll(timeout_ms=9999999, sleep=True)
311+
client.poll(timeout_ms=12345678, sleep=True)
312312
client._poll.assert_called_with(1.234, sleep=True)
313313

314314

315315
def test_maybe_refresh_metadata_backoff(mocker, client):
316316
now = time.time()
317317
t = mocker.patch('time.time')
318318
t.return_value = now
319-
client._last_no_node_available_ms = now * 1000
320319

321-
client.poll(timeout_ms=9999999, sleep=True)
322-
client._poll.assert_called_with(2.222, sleep=True)
320+
client.poll(timeout_ms=12345678, sleep=True)
321+
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
323322

324323

325324
def test_maybe_refresh_metadata_in_progress(mocker, client):
326325
client._metadata_refresh_in_progress = True
327326

328-
client.poll(timeout_ms=9999999, sleep=True)
329-
client._poll.assert_called_with(9999.999, sleep=True)
327+
client.poll(timeout_ms=12345678, sleep=True)
328+
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
330329

331330

332331
def test_maybe_refresh_metadata_update(mocker, client):
333332
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
334333
mocker.patch.object(client, '_can_send_request', return_value=True)
335334
send = mocker.patch.object(client, 'send')
336335

337-
client.poll(timeout_ms=9999999, sleep=True)
338-
client._poll.assert_called_with(0, sleep=True)
336+
client.poll(timeout_ms=12345678, sleep=True)
337+
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
339338
assert client._metadata_refresh_in_progress
340339
request = MetadataRequest[0]([])
341-
send.assert_called_with('foobar', request)
340+
send.assert_called_once_with('foobar', request)
342341

343342

344-
def test_maybe_refresh_metadata_failure(mocker, client):
343+
def test_maybe_refresh_metadata_cant_send(mocker, client):
345344
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
345+
mocker.patch.object(client, '_can_connect', return_value=True)
346+
mocker.patch.object(client, '_maybe_connect', return_value=True)
346347

347348
now = time.time()
348349
t = mocker.patch('time.time')
349350
t.return_value = now
350351

351-
client.poll(timeout_ms=9999999, sleep=True)
352-
client._poll.assert_called_with(0, sleep=True)
353-
assert client._last_no_node_available_ms == now * 1000
352+
# first poll attempts connection
353+
client.poll(timeout_ms=12345678, sleep=True)
354+
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
355+
client._can_connect.assert_called_once_with('foobar')
356+
client._maybe_connect.assert_called_once_with('foobar')
357+
358+
# poll while connecting should not attempt a new connection
359+
client._connecting.add('foobar')
360+
client._can_connect.reset_mock()
361+
client.poll(timeout_ms=12345678, sleep=True)
362+
client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
363+
assert not client._can_connect.called
364+
354365
assert not client._metadata_refresh_in_progress
355366

356367

0 commit comments

Comments
 (0)