Skip to content

Commit b1effa2

Browse files
authored
Dont wakeup during maybe_refresh_metadata -- it is only called by poll() (#1769)
1 parent de6e9d3 commit b1effa2

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

kafka/client_async.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ def send(self, node_id, request, wakeup=True):
517517
Future: resolves to Response struct or Error
518518
"""
519519
if not self._can_send_request(node_id):
520-
self.maybe_connect(node_id)
520+
self.maybe_connect(node_id, wakeup=wakeup)
521521
return Future().failure(Errors.NodeNotReadyError(node_id))
522522

523523
# conn.send will queue the request internally
@@ -761,7 +761,7 @@ def add_topic(self, topic):
761761
return self.cluster.request_update()
762762

763763
# This method should be locked when running multi-threaded
764-
def _maybe_refresh_metadata(self):
764+
def _maybe_refresh_metadata(self, wakeup=False):
765765
"""Send a metadata request if needed.
766766
767767
Returns:
@@ -792,7 +792,7 @@ def _maybe_refresh_metadata(self):
792792
api_version = 0 if self.config['api_version'] < (0, 10) else 1
793793
request = MetadataRequest[api_version](topics)
794794
log.debug("Sending metadata request %s to node %s", request, node_id)
795-
future = self.send(node_id, request)
795+
future = self.send(node_id, request, wakeup=wakeup)
796796
future.add_callback(self.cluster.update_metadata)
797797
future.add_errback(self.cluster.failed_update)
798798

@@ -809,7 +809,7 @@ def refresh_done(val_or_error):
809809
if self._connecting:
810810
return self.config['reconnect_backoff_ms']
811811

812-
if self.maybe_connect(node_id):
812+
if self.maybe_connect(node_id, wakeup=wakeup):
813813
log.debug("Initializing connection to node %s for metadata request", node_id)
814814
return self.config['reconnect_backoff_ms']
815815

test/test_client_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ def test_maybe_refresh_metadata_update(mocker, client):
332332
client._poll.assert_called_with(9999.999) # request_timeout_ms
333333
assert client._metadata_refresh_in_progress
334334
request = MetadataRequest[0]([])
335-
send.assert_called_once_with('foobar', request)
335+
send.assert_called_once_with('foobar', request, wakeup=False)
336336

337337

338338
def test_maybe_refresh_metadata_cant_send(mocker, client):
@@ -348,7 +348,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
348348
# first poll attempts connection
349349
client.poll(timeout_ms=12345678)
350350
client._poll.assert_called_with(2.222) # reconnect backoff
351-
client.maybe_connect.assert_called_once_with('foobar')
351+
client.maybe_connect.assert_called_once_with('foobar', wakeup=False)
352352

353353
# poll while connecting should not attempt a new connection
354354
client._connecting.add('foobar')

0 commit comments

Comments
 (0)