Skip to content

Commit 4cd3520

Browse files
committed
Improve error handling in client._maybe_connect (#2504)
1 parent e4c8213 commit 4cd3520

File tree

2 files changed

+26
-18
lines changed

2 files changed

+26
-18
lines changed

kafka/client_async.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -364,13 +364,23 @@ def _should_recycle_connection(self, conn):
364364
return False
365365

366366
def _maybe_connect(self, node_id):
367-
"""Idempotent non-blocking connection attempt to the given node id."""
367+
"""Idempotent non-blocking connection attempt to the given node id.
368+
369+
Returns True if connection object exists and is connected / connecting
370+
"""
368371
with self._lock:
369372
conn = self._conns.get(node_id)
370373

374+
# Check if existing connection should be recreated because host/port changed
375+
if conn is not None and self._should_recycle_connection(conn):
376+
self._conns.pop(node_id).close()
377+
conn = None
378+
371379
if conn is None:
372380
broker = self.cluster.broker_metadata(node_id)
373-
assert broker, 'Broker id %s not in current metadata' % (node_id,)
381+
if broker is None:
382+
log.debug('Broker id %s not in current metadata', node_id)
383+
return False
374384

375385
log.debug("Initiating connection to node %s at %s:%s",
376386
node_id, broker.host, broker.port)
@@ -382,16 +392,11 @@ def _maybe_connect(self, node_id):
382392
**self.config)
383393
self._conns[node_id] = conn
384394

385-
# Check if existing connection should be recreated because host/port changed
386-
elif self._should_recycle_connection(conn):
387-
self._conns.pop(node_id)
388-
return False
389-
390395
elif conn.connected():
391396
return True
392397

393398
conn.connect()
394-
return conn.connected()
399+
return not conn.disconnected()
395400

396401
def ready(self, node_id, metadata_priority=True):
397402
"""Check whether a node is connected and ok to send more requests.
@@ -580,7 +585,10 @@ def poll(self, timeout_ms=None, future=None):
580585

581586
# Attempt to complete pending connections
582587
for node_id in list(self._connecting):
583-
self._maybe_connect(node_id)
588+
# False return means no more connection progress is possible
589+
# Connected nodes will update _connecting via state_change callback
590+
if not self._maybe_connect(node_id):
591+
self._connecting.remove(node_id)
584592

585593
# If we got a future that is already done, don't block in _poll
586594
if future is not None and future.is_done:
@@ -919,7 +927,12 @@ def check_version(self, node_id=None, timeout=None, strict=False):
919927
if try_node is None:
920928
self._lock.release()
921929
raise Errors.NoBrokersAvailable()
922-
self._maybe_connect(try_node)
930+
if not self._maybe_connect(try_node):
931+
if try_node == node_id:
932+
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
933+
else:
934+
continue
935+
923936
conn = self._conns[try_node]
924937

925938
# We will intentionally cause socket failures

test/test_client_async.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,14 @@ def test_can_connect(cli, conn):
7171

7272

7373
def test_maybe_connect(cli, conn):
74-
try:
75-
# Node not in metadata, raises AssertionError
76-
cli._maybe_connect(2)
77-
except AssertionError:
78-
pass
79-
else:
80-
assert False, 'Exception not raised'
74+
# Node not in metadata, return False
75+
assert not cli._maybe_connect(2)
8176

8277
# New node_id creates a conn object
8378
assert 0 not in cli._conns
8479
conn.state = ConnectionStates.DISCONNECTED
8580
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
86-
assert cli._maybe_connect(0) is False
81+
assert cli._maybe_connect(0) is True
8782
assert cli._conns[0] is conn
8883

8984

0 commit comments

Comments
 (0)