From 3c873794f04d968895b0452a26ac061862686abf Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Mar 2019 06:21:02 -0800 Subject: [PATCH 1/6] Add BrokerConnection.send_pending_requests to support async network sends --- kafka/conn.py | 49 ++++++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 7dfc8bd77..6b5aff9f8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -733,11 +733,8 @@ def close(self, error=None): future.failure(error) self.config['state_change_callback'](self) - def send(self, request): - """send request, return Future() - - Can block on network if request is larger than send_buffer_bytes - """ + def send(self, request, blocking=True): + """Queue request for async network send, return Future()""" future = Future() if self.connecting(): return future.failure(Errors.NodeNotReadyError(str(self))) @@ -745,35 +742,49 @@ def send(self, request): return future.failure(Errors.KafkaConnectionError(str(self))) elif not self.can_send_more(): return future.failure(Errors.TooManyInFlightRequests(str(self))) - return self._send(request) + return self._send(request, blocking=blocking) - def _send(self, request): + def _send(self, request, blocking=True): assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) future = Future() correlation_id = self._protocol.send_request(request) + + # Attempt to replicate behavior from prior to introduction of + # send_pending_requests() / async sends + if blocking: + error = self.send_pending_requests() + if isinstance(error, Exception): + future.failure(error) + return future + + log.debug('%s Request %d: %s', self, correlation_id, request) + if request.expect_response(): + sent_time = time.time() + ifr = (correlation_id, future, sent_time) + self.in_flight_requests.append(ifr) + else: + future.success(None) + return future + + def send_pending_requests(self): + """Can block on network if request is larger than send_buffer_bytes""" + if self.state not in (ConnectionStates.AUTHENTICATING, + ConnectionStates.CONNECTED): + return Errors.NodeNotReadyError(str(self)) data = self._protocol.send_bytes() try: # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block # sending each request payload - sent_time = time.time() total_bytes = self._send_bytes_blocking(data) if self._sensors: self._sensors.bytes_sent.record(total_bytes) + return total_bytes except ConnectionError as e: - log.exception("Error sending %s to %s", request, self) + log.exception("Error sending request data to %s", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) - return future.failure(error) - log.debug('%s Request %d: %s', self, correlation_id, request) - - if request.expect_response(): - ifr = (correlation_id, future, sent_time) - self.in_flight_requests.append(ifr) - else: - future.success(None) - - return future + return error def can_send_more(self): """Return True unless there are max_in_flight_requests_per_connection.""" From 492b7d27f5cc4ecfde7af1e380162c5fd9e4206a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Mar 2019 06:24:59 -0800 Subject: [PATCH 2/6] Send network requests during KafkaClient.poll() rather than in KafkaClient.send() --- kafka/client_async.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index e2bdda904..e9d5919fc 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -522,7 +522,17 @@ def send(self, node_id, request): if not self._maybe_connect(node_id): return Future().failure(Errors.NodeNotReadyError(node_id)) - return self._conns[node_id].send(request) + # conn.send will queue the request internally + # we will need to call send_pending_requests() + # to trigger network I/O + future = self._conns[node_id].send(request, blocking=False) + + # Wakeup signal is useful in case another thread is + # blocked waiting for incoming network traffic while holding + # the client lock in poll(). + self.wakeup() + + return future def poll(self, timeout_ms=None, future=None): """Try to read and write to sockets. @@ -640,6 +650,8 @@ def _poll(self, timeout): conn.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % conn.config['request_timeout_ms'])) + else: + conn.send_pending_requests() if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) From 23132863d0e00bd8aabc0e19c7e1822dabfb05b9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Mar 2019 06:41:13 -0800 Subject: [PATCH 3/6] Dont acquire lock during KafkaClient.send if node is connected / ready --- kafka/client_async.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index e9d5919fc..c1bdd824a 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -499,14 +499,15 @@ def is_ready(self, node_id, metadata_priority=True): return True def _can_send_request(self, node_id): - with self._lock: - if node_id not in self._conns: - return False - conn = self._conns[node_id] - return conn.connected() and conn.can_send_more() + conn = self._conns.get(node_id) + if not conn: + return False + return conn.connected() and conn.can_send_more() def send(self, node_id, request): - """Send a request to a specific node. + """Send a request to a specific node. Bytes are placed on an + internal per-connection send-queue. Actual network I/O will be + triggered in a subsequent call to .poll() Arguments: node_id (int): destination node @@ -518,7 +519,7 @@ def send(self, node_id, request): Returns: Future: resolves to Response struct or Error """ - with self._lock: + if not self._can_send_request(node_id): if not self._maybe_connect(node_id): return Future().failure(Errors.NodeNotReadyError(node_id)) From 957c62d6ded7a3652e7897db20a23e070a6ad852 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Mar 2019 06:44:25 -0800 Subject: [PATCH 4/6] Move all network connection IO into KafkaClient.poll() --- kafka/client_async.py | 30 ++++++++++++++++++++++-------- kafka/coordinator/base.py | 4 ++-- test/fixtures.py | 7 ++++--- test/test_client_async.py | 9 ++++----- 4 files changed, 32 insertions(+), 18 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index c1bdd824a..d608e6a5e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -304,7 +304,10 @@ def _conn_state_change(self, node_id, conn): # SSL connections can enter this state 2x (second during Handshake) if node_id not in self._connecting: self._connecting.add(node_id) + try: self._selector.register(conn._sock, selectors.EVENT_WRITE) + except KeyError: + self._selector.modify(conn._sock, selectors.EVENT_WRITE) elif conn.connected(): log.debug("Node %s connected", node_id) @@ -312,10 +315,10 @@ def _conn_state_change(self, node_id, conn): self._connecting.remove(node_id) try: - self._selector.unregister(conn._sock) + self._selector.modify(conn._sock, selectors.EVENT_READ, conn) except KeyError: - pass - self._selector.register(conn._sock, selectors.EVENT_READ, conn) + self._selector.register(conn._sock, selectors.EVENT_READ, conn) + if self._sensors: self._sensors.connection_created.record() @@ -336,6 +339,7 @@ def _conn_state_change(self, node_id, conn): self._selector.unregister(conn._sock) except KeyError: pass + if self._sensors: self._sensors.connection_closed.record() @@ -348,6 +352,17 @@ def _conn_state_change(self, node_id, conn): log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() + def maybe_connect(self, node_id): + """Queues a node for asynchronous connection during the next .poll()""" + if self._can_connect(node_id): + self._connecting.add(node_id) + # Wakeup signal is useful in case another thread is + # blocked waiting for incoming network traffic while holding + # the client lock in poll(). + self.wakeup() + return True + return False + def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" with self._lock: @@ -397,7 +412,7 @@ def ready(self, node_id, metadata_priority=True): Returns: bool: True if we are ready to send to the given node """ - self._maybe_connect(node_id) + self.maybe_connect(node_id) return self.is_ready(node_id, metadata_priority=metadata_priority) def connected(self, node_id): @@ -520,8 +535,8 @@ def send(self, node_id, request): Future: resolves to Response struct or Error """ if not self._can_send_request(node_id): - if not self._maybe_connect(node_id): - return Future().failure(Errors.NodeNotReadyError(node_id)) + self.maybe_connect(node_id) + return Future().failure(Errors.NodeNotReadyError(node_id)) # conn.send will queue the request internally # we will need to call send_pending_requests() @@ -814,9 +829,8 @@ def refresh_done(val_or_error): # have such application level configuration, using request timeout instead. return self.config['request_timeout_ms'] - if self._can_connect(node_id): + if self.maybe_connect(node_id): log.debug("Initializing connection to node %s for metadata request", node_id) - self._maybe_connect(node_id) return self.config['reconnect_backoff_ms'] # connected but can't send more, OR connecting diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 14351839d..664e8d262 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -252,7 +252,7 @@ def ensure_coordinator_ready(self): if self.config['api_version'] < (0, 8, 2): self.coordinator_id = self._client.least_loaded_node() if self.coordinator_id is not None: - self._client.ready(self.coordinator_id) + self._client.maybe_connect(self.coordinator_id) continue future = self.lookup_coordinator() @@ -686,7 +686,7 @@ def _handle_group_coordinator_response(self, future, response): self.coordinator_id = response.coordinator_id log.info("Discovered coordinator %s for group %s", self.coordinator_id, self.group_id) - self._client.ready(self.coordinator_id) + self._client.maybe_connect(self.coordinator_id) self.heartbeat.reset_timeouts() future.success(self.coordinator_id) diff --git a/test/fixtures.py b/test/fixtures.py index 34373e623..8b156e693 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -405,10 +405,11 @@ def _failure(error): retries = 10 while True: node_id = self._client.least_loaded_node() - for ready_retry in range(40): - if self._client.ready(node_id, False): + for connect_retry in range(40): + self._client.maybe_connect(node_id) + if self._client.connected(node_id): break - time.sleep(.1) + self._client.poll(timeout_ms=100) else: raise RuntimeError('Could not connect to broker with node id %d' % (node_id,)) diff --git a/test/test_client_async.py b/test/test_client_async.py index 09781ac2c..1c8a50f1c 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -125,8 +125,7 @@ def test_conn_state_change(mocker, cli, conn): conn.state = ConnectionStates.CONNECTED cli._conn_state_change(node_id, conn) assert node_id not in cli._connecting - sel.unregister.assert_called_with(conn._sock) - sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn) + sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn) # Failure to connect should trigger metadata update assert cli.cluster._need_update is False @@ -145,7 +144,7 @@ def test_conn_state_change(mocker, cli, conn): def test_ready(mocker, cli, conn): - maybe_connect = mocker.patch.object(cli, '_maybe_connect') + maybe_connect = mocker.patch.object(cli, 'maybe_connect') node_id = 1 cli.ready(node_id) maybe_connect.assert_called_with(node_id) @@ -362,6 +361,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_connect', return_value=True) mocker.patch.object(client, '_maybe_connect', return_value=True) + mocker.patch.object(client, 'maybe_connect', return_value=True) now = time.time() t = mocker.patch('time.time') @@ -370,8 +370,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): # first poll attempts connection client.poll(timeout_ms=12345678) client._poll.assert_called_with(2.222) # reconnect backoff - client._can_connect.assert_called_once_with('foobar') - client._maybe_connect.assert_called_once_with('foobar') + client.maybe_connect.assert_called_once_with('foobar') # poll while connecting should not attempt a new connection client._connecting.add('foobar') From 7ce69765ab71350aa739f388217a180cad8ce17a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Mar 2019 08:02:37 -0800 Subject: [PATCH 5/6] Fetcher should call client.poll() regardless of in-flight-request count --- kafka/consumer/group.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 8d2c65e80..5f06e7b5f 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1090,8 +1090,7 @@ def _message_generator(self): if time.time() > timeout_at: log.debug("internal iterator timeout - breaking for poll") break - if self._client.in_flight_request_count(): - self._client.poll(timeout_ms=0) + self._client.poll(timeout_ms=0) # An else block on a for loop only executes if there was no break # so this should only be called on a StopIteration from the fetcher From 0a8914ff0e9e5896b761127b82b5aac613d3e4c0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Mar 2019 08:03:36 -0800 Subject: [PATCH 6/6] Remove sleep check when no partitions assigned -- no longer needed --- kafka/consumer/group.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 5f06e7b5f..531c1072a 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1070,16 +1070,6 @@ def _message_generator(self): # like heartbeats, auto-commits, and metadata refreshes timeout_at = self._next_timeout() - # Because the consumer client poll does not sleep unless blocking on - # network IO, we need to explicitly sleep when we know we are idle - # because we haven't been assigned any partitions to fetch / consume - if self._use_consumer_group() and not self.assignment(): - sleep_time = max(timeout_at - time.time(), 0) - if sleep_time > 0 and not self._client.in_flight_request_count(): - log.debug('No partitions assigned; sleeping for %s', sleep_time) - time.sleep(sleep_time) - continue - # Short-circuit the fetch iterator if we are already timed out # to avoid any unintentional interaction with fetcher setup if time.time() > timeout_at: