From 9638d1cfaa42d0902e39db1f6c3db103cd7a0cc9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 31 Mar 2019 10:11:21 -0700 Subject: [PATCH] Dont treat popped conn.close() as failure in state change callback --- kafka/client_async.py | 13 ++++++++++--- test/test_client_async.py | 5 +++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index b6adb775b..5c8a5595e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -314,7 +314,12 @@ def _conn_state_change(self, node_id, conn): idle_disconnect = True self._idle_expiry_manager.remove(node_id) - if self.cluster.is_bootstrap(node_id): + # If the connection has already by popped from self._conns, + # we can assume the disconnect was intentional and not a failure + if node_id not in self._conns: + pass + + elif self.cluster.is_bootstrap(node_id): self._bootstrap_fails += 1 elif self._refresh_on_disconnects and not self._closed and not idle_disconnect: @@ -419,10 +424,12 @@ def close(self, node_id=None): with self._lock: if node_id is None: self._close() - for conn in self._conns.values(): + conns = list(self._conns.values()) + self._conns.clear() + for conn in conns: conn.close() elif node_id in self._conns: - self._conns[node_id].close() + self._conns.pop(node_id).close() else: log.warning("Node %s not found in current connection list; skipping", node_id) return diff --git a/test/test_client_async.py b/test/test_client_async.py index 246e36c06..0951cb414 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -93,6 +93,7 @@ def test_conn_state_change(mocker, cli, conn): sel = mocker.patch.object(cli, '_selector') node_id = 0 + cli._conns[node_id] = conn conn.state = ConnectionStates.CONNECTING cli._conn_state_change(node_id, conn) assert node_id in cli._connecting @@ -180,8 +181,8 @@ def test_close(mocker, cli, conn): # All node close cli._maybe_connect(1) cli.close() - # +3 close: node 0, node 1, node bootstrap - call_count += 3 + # +2 close: node 1, node bootstrap (node 0 already closed) + call_count += 2 assert conn.close.call_count == call_count