diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index bff628669..da37f92fd 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -910,11 +910,10 @@ def close(self): def run(self): try: + log.debug('Heartbeat thread started') while not self.closed: self._run_once() - log.debug('Heartbeat thread closed') - except ReferenceError: log.debug('Heartbeat thread closed due to coordinator gc') @@ -923,6 +922,9 @@ def run(self): self.coordinator.group_id, e) self.failed = e + finally: + log.debug('Heartbeat thread closed') + def _run_once(self): with self.coordinator._lock: if not self.enabled: @@ -952,20 +954,20 @@ def _run_once(self): # the session timeout has expired without seeing a # successful heartbeat, so we should probably make sure # the coordinator is still healthy. - log.debug('Heartbeat session expired, marking coordinator dead') + log.warning('Heartbeat session expired, marking coordinator dead') self.coordinator.coordinator_dead('Heartbeat session expired') elif self.coordinator.heartbeat.poll_timeout_expired(): # the poll timeout has expired, which means that the # foreground thread has stalled in between calls to # poll(), so we explicitly leave the group. - log.debug('Heartbeat poll expired, leaving group') + log.warning('Heartbeat poll expired, leaving group') self.coordinator.maybe_leave_group() elif not self.coordinator.heartbeat.should_heartbeat(): # poll again after waiting for the retry backoff in case # the heartbeat failed or the coordinator disconnected - log.debug('Not ready to heartbeat, waiting') + log.log(0, 'Not ready to heartbeat, waiting') self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) else: