Skip to content

Commit 6c87155

Browse files
authored
KafkaConsumer: Exit poll if consumer is closed (#2152)
1 parent 12325c0 commit 6c87155

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

kafka/consumer/group.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
651651
# Poll for new data until the timeout expires
652652
start = time.time()
653653
remaining = timeout_ms
654-
while True:
654+
while not self._closed:
655655
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
656656
if records:
657657
return records
@@ -660,7 +660,9 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
660660
remaining = timeout_ms - elapsed_ms
661661

662662
if remaining <= 0:
663-
return {}
663+
break
664+
665+
return {}
664666

665667
def _poll_once(self, timeout_ms, max_records, update_offsets=True):
666668
"""Do one round of polling. In addition to checking for new data, this does

0 commit comments

Comments
 (0)