Skip to content

Commit 318c261

Browse files
committed
Change coordinator lock acquisition order
* Coordinator lock acquired first, client lock acquired second * Release client lock to process futures
1 parent 89bf6a6 commit 318c261

File tree

2 files changed

+39
-43
lines changed

2 files changed

+39
-43
lines changed

kafka/client_async.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,9 @@ def poll(self, timeout_ms=None, future=None):
595595

596596
self._poll(timeout / 1000)
597597

598-
responses.extend(self._fire_pending_completed_requests())
598+
# called without the lock to avoid deadlock potential
599+
# if handlers need to acquire locks
600+
responses.extend(self._fire_pending_completed_requests())
599601

600602
# If all we had was a timeout (future is None) - only do one poll
601603
# If we do have a future, we keep looping until it is done

kafka/coordinator/base.py

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ def ensure_coordinator_ready(self):
243243
"""Block until the coordinator for this group is known
244244
(and we have an active connection -- java client uses unsent queue).
245245
"""
246-
with self._client._lock, self._lock:
246+
with self._lock:
247247
while self.coordinator_unknown():
248248

249249
# Prior to 0.8.2 there was no group coordinator
@@ -273,7 +273,7 @@ def _reset_find_coordinator_future(self, result):
273273
self._find_coordinator_future = None
274274

275275
def lookup_coordinator(self):
276-
with self._client._lock, self._lock:
276+
with self._lock:
277277
if self._find_coordinator_future is not None:
278278
return self._find_coordinator_future
279279

@@ -346,7 +346,7 @@ def _handle_join_failure(self, _):
346346

347347
def ensure_active_group(self):
348348
"""Ensure that the group is active (i.e. joined and synced)"""
349-
with self._client._lock, self._lock:
349+
with self._lock:
350350
if self._heartbeat_thread is None:
351351
self._start_heartbeat_thread()
352352

@@ -504,7 +504,7 @@ def _handle_join_group_response(self, future, send_time, response):
504504
log.debug("Received successful JoinGroup response for group %s: %s",
505505
self.group_id, response)
506506
self.sensors.join_latency.record((time.time() - send_time) * 1000)
507-
with self._client._lock, self._lock:
507+
with self._lock:
508508
if self.state is not MemberState.REBALANCING:
509509
# if the consumer was woken up before a rebalance completes,
510510
# we may have already left the group. In this case, we do
@@ -679,7 +679,7 @@ def _handle_group_coordinator_response(self, future, response):
679679

680680
error_type = Errors.for_code(response.error_code)
681681
if error_type is Errors.NoError:
682-
with self._client._lock, self._lock:
682+
with self._lock:
683683
coordinator_id = self._client.cluster.add_group_coordinator(self.group_id, response)
684684
if not coordinator_id:
685685
# This could happen if coordinator metadata is different
@@ -761,7 +761,7 @@ def close(self):
761761

762762
def maybe_leave_group(self):
763763
"""Leave the current group and reset local generation/memberId."""
764-
with self._client._lock, self._lock:
764+
with self._lock:
765765
if (not self.coordinator_unknown()
766766
and self.state is not MemberState.UNJOINED
767767
and self._generation is not Generation.NO_GENERATION):
@@ -959,46 +959,40 @@ def _run_once(self):
959959
self.disable()
960960
return
961961

962-
# TODO: When consumer.wakeup() is implemented, we need to
963-
# disable here to prevent propagating an exception to this
964-
# heartbeat thread
965-
#
966-
# Release coordinator lock during client poll to avoid deadlocks
967-
# if/when connection errback needs coordinator lock
968-
self.coordinator._client.poll(timeout_ms=0)
969-
970-
if self.coordinator.coordinator_unknown():
971-
future = self.coordinator.lookup_coordinator()
972-
if not future.is_done or future.failed():
973-
# the immediate future check ensures that we backoff
974-
# properly in the case that no brokers are available
975-
# to connect to (and the future is automatically failed).
976-
with self.coordinator._lock:
962+
# TODO: When consumer.wakeup() is implemented, we need to
963+
# disable here to prevent propagating an exception to this
964+
# heartbeat thread
965+
self.coordinator._client.poll(timeout_ms=0)
966+
967+
if self.coordinator.coordinator_unknown():
968+
future = self.coordinator.lookup_coordinator()
969+
if not future.is_done or future.failed():
970+
# the immediate future check ensures that we backoff
971+
# properly in the case that no brokers are available
972+
# to connect to (and the future is automatically failed).
977973
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
978974

979-
elif self.coordinator.heartbeat.session_timeout_expired():
980-
# the session timeout has expired without seeing a
981-
# successful heartbeat, so we should probably make sure
982-
# the coordinator is still healthy.
983-
log.warning('Heartbeat session expired, marking coordinator dead')
984-
self.coordinator.coordinator_dead('Heartbeat session expired')
985-
986-
elif self.coordinator.heartbeat.poll_timeout_expired():
987-
# the poll timeout has expired, which means that the
988-
# foreground thread has stalled in between calls to
989-
# poll(), so we explicitly leave the group.
990-
log.warning('Heartbeat poll expired, leaving group')
991-
self.coordinator.maybe_leave_group()
992-
993-
elif not self.coordinator.heartbeat.should_heartbeat():
994-
# poll again after waiting for the retry backoff in case
995-
# the heartbeat failed or the coordinator disconnected
996-
log.log(0, 'Not ready to heartbeat, waiting')
997-
with self.coordinator._lock:
975+
elif self.coordinator.heartbeat.session_timeout_expired():
976+
# the session timeout has expired without seeing a
977+
# successful heartbeat, so we should probably make sure
978+
# the coordinator is still healthy.
979+
log.warning('Heartbeat session expired, marking coordinator dead')
980+
self.coordinator.coordinator_dead('Heartbeat session expired')
981+
982+
elif self.coordinator.heartbeat.poll_timeout_expired():
983+
# the poll timeout has expired, which means that the
984+
# foreground thread has stalled in between calls to
985+
# poll(), so we explicitly leave the group.
986+
log.warning('Heartbeat poll expired, leaving group')
987+
self.coordinator.maybe_leave_group()
988+
989+
elif not self.coordinator.heartbeat.should_heartbeat():
990+
# poll again after waiting for the retry backoff in case
991+
# the heartbeat failed or the coordinator disconnected
992+
log.log(0, 'Not ready to heartbeat, waiting')
998993
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
999994

1000-
else:
1001-
with self.coordinator._client._lock, self.coordinator._lock:
995+
else:
1002996
self.coordinator.heartbeat.sent_heartbeat()
1003997
future = self.coordinator._send_heartbeat_request()
1004998
future.add_callback(self._handle_heartbeat_success)

0 commit comments

Comments
 (0)