Skip to content

Commit 6b97eae

Browse files
bts-webberdpkp
authored andcommitted
Fixed Issue 1033.Raise AssertionError when decompression unsupported. (#1159)
1 parent 3ff3d75 commit 6b97eae

File tree

1 file changed

+7
-0
lines changed

1 file changed

+7
-0
lines changed

kafka/consumer/fetcher.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ def send_fetches(self):
122122
if self._client.ready(node_id):
123123
log.debug("Sending FetchRequest to node %s", node_id)
124124
future = self._client.send(node_id, request)
125+
future.error_on_callbacks=True
125126
future.add_callback(self._handle_fetch_response, request, time.time())
126127
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
127128
futures.append(future)
@@ -550,6 +551,12 @@ def _unpack_message_set(self, tp, messages):
550551
log.exception('StopIteration raised unpacking messageset: %s', e)
551552
raise Exception('StopIteration raised unpacking messageset')
552553

554+
# If unpacking raises AssertionError, it means decompression unsupported
555+
# See Issue 1033
556+
except AssertionError as e:
557+
log.exception('AssertionError raised unpacking messageset: %s', e)
558+
raise
559+
553560
def __iter__(self): # pylint: disable=non-iterator-returned
554561
return self
555562

0 commit comments

Comments
 (0)