From dfcc18cb73670a835f30f0a1dcbee41d57c4ccd0 Mon Sep 17 00:00:00 2001 From: Rauli Ikonen Date: Thu, 29 Oct 2020 13:29:51 +0200 Subject: [PATCH] consumer: Exit poll if consumer is closed Caller may invoke poll with long timeout and then end up closing the consumer from another thread e.g. to stop the application. Previously poll forcibly waited for the timeout before existing even though it could do nothing but spin in busy loop for the remainder of the time. --- kafka/consumer/group.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 26408c3a5..4fd57ae9c 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -651,7 +651,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True): # Poll for new data until the timeout expires start = time.time() remaining = timeout_ms - while True: + while not self._closed: records = self._poll_once(remaining, max_records, update_offsets=update_offsets) if records: return records @@ -660,7 +660,9 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True): remaining = timeout_ms - elapsed_ms if remaining <= 0: - return {} + break + + return {} def _poll_once(self, timeout_ms, max_records, update_offsets=True): """Do one round of polling. In addition to checking for new data, this does