Skip to content

Commit 203b64b

Browse files
bts-webber88manpreet
authored andcommitted
Fixed Issue 1033.Raise AssertionError when decompression unsupported. (dpkp#1159)
1 parent 3ebdcba commit 203b64b

File tree

1 file changed

+7
-0
lines changed

1 file changed

+7
-0
lines changed

kafka/consumer/fetcher.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def send_fetches(self):
121121
if self._client.ready(node_id):
122122
log.debug("Sending FetchRequest to node %s", node_id)
123123
future = self._client.send(node_id, request)
124+
future.error_on_callbacks=True
124125
future.add_callback(self._handle_fetch_response, request, time.time())
125126
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
126127
futures.append(future)
@@ -542,6 +543,12 @@ def _unpack_message_set(self, tp, messages):
542543
log.exception('StopIteration raised unpacking messageset: %s', e)
543544
raise Exception('StopIteration raised unpacking messageset')
544545

546+
# If unpacking raises AssertionError, it means decompression unsupported
547+
# See Issue 1033
548+
except AssertionError as e:
549+
log.exception('AssertionError raised unpacking messageset: %s', e)
550+
raise
551+
545552
def __iter__(self): # pylint: disable=non-iterator-returned
546553
return self
547554

0 commit comments

Comments
 (0)