Skip to content

Commit 16e05e7

Browse files
committed
Revert ffc7cae / PR #1239
The change caused a regression documented in issue #1290
1 parent 7bde919 commit 16e05e7

File tree

2 files changed

+2
-30
lines changed

2 files changed

+2
-30
lines changed

kafka/consumer/fetcher.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -838,17 +838,12 @@ def _parse_fetched_data(self, completed_fetch):
838838

839839
return parsed_records
840840

841-
class PartitionRecords(object):
841+
class PartitionRecords(six.Iterator):
842842
def __init__(self, fetch_offset, tp, messages):
843843
self.fetch_offset = fetch_offset
844844
self.topic_partition = tp
845845
self.messages = messages
846-
# When fetching an offset that is in the middle of a
847-
# compressed batch, we will get all messages in the batch.
848-
# But we want to start 'take' at the fetch_offset
849-
for i, msg in enumerate(messages):
850-
if msg.offset == fetch_offset:
851-
self.message_idx = i
846+
self.message_idx = 0
852847

853848
# For truthiness evaluation we need to define __len__ or __nonzero__
854849
def __len__(self):

test/test_fetcher.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -303,29 +303,6 @@ def test__handle_offset_response(fetcher, mocker):
303303
assert isinstance(fut.exception, NotLeaderForPartitionError)
304304

305305

306-
def test_partition_records_offset():
307-
"""Test that compressed messagesets are handled correctly
308-
when fetch offset is in the middle of the message list
309-
"""
310-
batch_start = 120
311-
batch_end = 130
312-
fetch_offset = 123
313-
tp = TopicPartition('foo', 0)
314-
messages = [ConsumerRecord(tp.topic, tp.partition, i,
315-
None, None, 'key', 'value', 'checksum', 0, 0)
316-
for i in range(batch_start, batch_end)]
317-
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
318-
assert len(records) > 0
319-
msgs = records.take(1)
320-
assert msgs[0].offset == 123
321-
assert records.fetch_offset == 124
322-
msgs = records.take(2)
323-
assert len(msgs) == 2
324-
assert len(records) > 0
325-
records.discard()
326-
assert len(records) == 0
327-
328-
329306
def test_fetched_records(fetcher, topic, mocker):
330307
fetcher.config['check_crcs'] = False
331308
tp = TopicPartition(topic, 0)

0 commit comments

Comments
 (0)