From 56c5e513eb264429eb0dca91c41ec6d314e615cf Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 22 Mar 2018 14:42:49 -0700 Subject: [PATCH 1/2] Change levels for some heartbeat thread logging --- kafka/coordinator/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index bff628669..9d1872a39 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -952,20 +952,20 @@ def _run_once(self): # the session timeout has expired without seeing a # successful heartbeat, so we should probably make sure # the coordinator is still healthy. - log.debug('Heartbeat session expired, marking coordinator dead') + log.warning('Heartbeat session expired, marking coordinator dead') self.coordinator.coordinator_dead('Heartbeat session expired') elif self.coordinator.heartbeat.poll_timeout_expired(): # the poll timeout has expired, which means that the # foreground thread has stalled in between calls to # poll(), so we explicitly leave the group. - log.debug('Heartbeat poll expired, leaving group') + log.warning('Heartbeat poll expired, leaving group') self.coordinator.maybe_leave_group() elif not self.coordinator.heartbeat.should_heartbeat(): # poll again after waiting for the retry backoff in case # the heartbeat failed or the coordinator disconnected - log.debug('Not ready to heartbeat, waiting') + log.log(0, 'Not ready to heartbeat, waiting') self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) else: From 68757d2a536dbcb38b0016b3cf6f6e494956eb40 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 22 Mar 2018 15:10:40 -0700 Subject: [PATCH 2/2] Heartbeat thread start / close --- kafka/coordinator/base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 9d1872a39..da37f92fd 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -910,11 +910,10 @@ def close(self): def run(self): try: + log.debug('Heartbeat thread started') while not self.closed: self._run_once() - log.debug('Heartbeat thread closed') - except ReferenceError: log.debug('Heartbeat thread closed due to coordinator gc') @@ -923,6 +922,9 @@ def run(self): self.coordinator.group_id, e) self.failed = e + finally: + log.debug('Heartbeat thread closed') + def _run_once(self): with self.coordinator._lock: if not self.enabled: