Skip to content

Commit 937388f

Browse files
committed
Change coordinator lock acquisition order
* Coordinator lock acquired first, client lock acquired second * Release client lock to process futures
1 parent f6a8a38 commit 937388f

File tree

2 files changed

+39
-43
lines changed

2 files changed

+39
-43
lines changed

kafka/client_async.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,9 @@ def poll(self, timeout_ms=None, future=None):
592592

593593
self._poll(timeout)
594594

595-
responses.extend(self._fire_pending_completed_requests())
595+
# called without the lock to avoid deadlock potential
596+
# if handlers need to acquire locks
597+
responses.extend(self._fire_pending_completed_requests())
596598

597599
# If all we had was a timeout (future is None) - only do one poll
598600
# If we do have a future, we keep looping until it is done

kafka/coordinator/base.py

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ def ensure_coordinator_ready(self):
243243
"""Block until the coordinator for this group is known
244244
(and we have an active connection -- java client uses unsent queue).
245245
"""
246-
with self._client._lock, self._lock:
246+
with self._lock:
247247
while self.coordinator_unknown():
248248

249249
# Prior to 0.8.2 there was no group coordinator
@@ -273,7 +273,7 @@ def _reset_find_coordinator_future(self, result):
273273
self._find_coordinator_future = None
274274

275275
def lookup_coordinator(self):
276-
with self._client._lock, self._lock:
276+
with self._lock:
277277
if self._find_coordinator_future is not None:
278278
return self._find_coordinator_future
279279

@@ -342,7 +342,7 @@ def _handle_join_failure(self, _):
342342

343343
def ensure_active_group(self):
344344
"""Ensure that the group is active (i.e. joined and synced)"""
345-
with self._client._lock, self._lock:
345+
with self._lock:
346346
if self._heartbeat_thread is None:
347347
self._start_heartbeat_thread()
348348

@@ -500,7 +500,7 @@ def _handle_join_group_response(self, future, send_time, response):
500500
log.debug("Received successful JoinGroup response for group %s: %s",
501501
self.group_id, response)
502502
self.sensors.join_latency.record((time.time() - send_time) * 1000)
503-
with self._client._lock, self._lock:
503+
with self._lock:
504504
if self.state is not MemberState.REBALANCING:
505505
# if the consumer was woken up before a rebalance completes,
506506
# we may have already left the group. In this case, we do
@@ -675,7 +675,7 @@ def _handle_group_coordinator_response(self, future, response):
675675

676676
error_type = Errors.for_code(response.error_code)
677677
if error_type is Errors.NoError:
678-
with self._client._lock, self._lock:
678+
with self._lock:
679679
ok = self._client.cluster.add_group_coordinator(self.group_id, response)
680680
if not ok:
681681
# This could happen if coordinator metadata is different
@@ -757,7 +757,7 @@ def close(self):
757757

758758
def maybe_leave_group(self):
759759
"""Leave the current group and reset local generation/memberId."""
760-
with self._client._lock, self._lock:
760+
with self._lock:
761761
if (not self.coordinator_unknown()
762762
and self.state is not MemberState.UNJOINED
763763
and self._generation is not Generation.NO_GENERATION):
@@ -955,46 +955,40 @@ def _run_once(self):
955955
self.disable()
956956
return
957957

958-
# TODO: When consumer.wakeup() is implemented, we need to
959-
# disable here to prevent propagating an exception to this
960-
# heartbeat thread
961-
#
962-
# Release coordinator lock during client poll to avoid deadlocks
963-
# if/when connection errback needs coordinator lock
964-
self.coordinator._client.poll(timeout_ms=0)
965-
966-
if self.coordinator.coordinator_unknown():
967-
future = self.coordinator.lookup_coordinator()
968-
if not future.is_done or future.failed():
969-
# the immediate future check ensures that we backoff
970-
# properly in the case that no brokers are available
971-
# to connect to (and the future is automatically failed).
972-
with self.coordinator._lock:
958+
# TODO: When consumer.wakeup() is implemented, we need to
959+
# disable here to prevent propagating an exception to this
960+
# heartbeat thread
961+
self.coordinator._client.poll(timeout_ms=0)
962+
963+
if self.coordinator.coordinator_unknown():
964+
future = self.coordinator.lookup_coordinator()
965+
if not future.is_done or future.failed():
966+
# the immediate future check ensures that we backoff
967+
# properly in the case that no brokers are available
968+
# to connect to (and the future is automatically failed).
973969
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
974970

975-
elif self.coordinator.heartbeat.session_timeout_expired():
976-
# the session timeout has expired without seeing a
977-
# successful heartbeat, so we should probably make sure
978-
# the coordinator is still healthy.
979-
log.warning('Heartbeat session expired, marking coordinator dead')
980-
self.coordinator.coordinator_dead('Heartbeat session expired')
981-
982-
elif self.coordinator.heartbeat.poll_timeout_expired():
983-
# the poll timeout has expired, which means that the
984-
# foreground thread has stalled in between calls to
985-
# poll(), so we explicitly leave the group.
986-
log.warning('Heartbeat poll expired, leaving group')
987-
self.coordinator.maybe_leave_group()
988-
989-
elif not self.coordinator.heartbeat.should_heartbeat():
990-
# poll again after waiting for the retry backoff in case
991-
# the heartbeat failed or the coordinator disconnected
992-
log.log(0, 'Not ready to heartbeat, waiting')
993-
with self.coordinator._lock:
971+
elif self.coordinator.heartbeat.session_timeout_expired():
972+
# the session timeout has expired without seeing a
973+
# successful heartbeat, so we should probably make sure
974+
# the coordinator is still healthy.
975+
log.warning('Heartbeat session expired, marking coordinator dead')
976+
self.coordinator.coordinator_dead('Heartbeat session expired')
977+
978+
elif self.coordinator.heartbeat.poll_timeout_expired():
979+
# the poll timeout has expired, which means that the
980+
# foreground thread has stalled in between calls to
981+
# poll(), so we explicitly leave the group.
982+
log.warning('Heartbeat poll expired, leaving group')
983+
self.coordinator.maybe_leave_group()
984+
985+
elif not self.coordinator.heartbeat.should_heartbeat():
986+
# poll again after waiting for the retry backoff in case
987+
# the heartbeat failed or the coordinator disconnected
988+
log.log(0, 'Not ready to heartbeat, waiting')
994989
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
995990

996-
else:
997-
with self.coordinator._client._lock, self.coordinator._lock:
991+
else:
998992
self.coordinator.heartbeat.sent_heartbeat()
999993
future = self.coordinator._send_heartbeat_request()
1000994
future.add_callback(self._handle_heartbeat_success)

0 commit comments

Comments
 (0)