Skip to content

Commit 51313d7

Browse files
authored
Dont treat popped conn.close() as failure in state change callback (#1773)
1 parent c02df4b commit 51313d7

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

kafka/client_async.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,12 @@ def _conn_state_change(self, node_id, conn):
314314
idle_disconnect = True
315315
self._idle_expiry_manager.remove(node_id)
316316

317-
if self.cluster.is_bootstrap(node_id):
317+
# If the connection has already by popped from self._conns,
318+
# we can assume the disconnect was intentional and not a failure
319+
if node_id not in self._conns:
320+
pass
321+
322+
elif self.cluster.is_bootstrap(node_id):
318323
self._bootstrap_fails += 1
319324

320325
elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
@@ -419,10 +424,12 @@ def close(self, node_id=None):
419424
with self._lock:
420425
if node_id is None:
421426
self._close()
422-
for conn in self._conns.values():
427+
conns = list(self._conns.values())
428+
self._conns.clear()
429+
for conn in conns:
423430
conn.close()
424431
elif node_id in self._conns:
425-
self._conns[node_id].close()
432+
self._conns.pop(node_id).close()
426433
else:
427434
log.warning("Node %s not found in current connection list; skipping", node_id)
428435
return

test/test_client_async.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def test_conn_state_change(mocker, cli, conn):
9393
sel = mocker.patch.object(cli, '_selector')
9494

9595
node_id = 0
96+
cli._conns[node_id] = conn
9697
conn.state = ConnectionStates.CONNECTING
9798
cli._conn_state_change(node_id, conn)
9899
assert node_id in cli._connecting
@@ -180,8 +181,8 @@ def test_close(mocker, cli, conn):
180181
# All node close
181182
cli._maybe_connect(1)
182183
cli.close()
183-
# +3 close: node 0, node 1, node bootstrap
184-
call_count += 3
184+
# +2 close: node 1, node bootstrap (node 0 already closed)
185+
call_count += 2
185186
assert conn.close.call_count == call_count
186187

187188

0 commit comments

Comments
 (0)