Skip to content

Commit ffc7cae

Browse files
authored
Fix Fetcher.PartitionRecords to handle fetch_offset in the middle of compressed messageset (#1239)
1 parent cec1bdc commit ffc7cae

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

kafka/consumer/fetcher.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -923,12 +923,17 @@ def _handle_fetch_response(self, request, send_time, response):
923923
self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
924924
self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
925925

926-
class PartitionRecords(six.Iterator):
926+
class PartitionRecords(object):
927927
def __init__(self, fetch_offset, tp, messages):
928928
self.fetch_offset = fetch_offset
929929
self.topic_partition = tp
930930
self.messages = messages
931-
self.message_idx = 0
931+
# When fetching an offset that is in the middle of a
932+
# compressed batch, we will get all messages in the batch.
933+
# But we want to start 'take' at the fetch_offset
934+
for i, msg in enumerate(messages):
935+
if msg.offset == fetch_offset:
936+
self.message_idx = i
932937

933938
def discard(self):
934939
self.messages = None

test/test_fetcher.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from collections import OrderedDict
88

99
from kafka.client_async import KafkaClient
10-
from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError
10+
from kafka.consumer.fetcher import ConsumerRecord, Fetcher, NoOffsetForPartitionError
1111
from kafka.consumer.subscription_state import SubscriptionState
1212
from kafka.metrics import Metrics
1313
from kafka.protocol.fetch import FetchRequest
@@ -282,3 +282,26 @@ def test__handle_offset_response(fetcher, mocker):
282282
fetcher._handle_offset_response(fut, res)
283283
assert fut.failed()
284284
assert isinstance(fut.exception, NotLeaderForPartitionError)
285+
286+
287+
def test_partition_records_offset():
288+
"""Test that compressed messagesets are handle correctly
289+
when fetch offset is in the middle of the message list
290+
"""
291+
batch_start = 120
292+
batch_end = 130
293+
fetch_offset = 123
294+
tp = TopicPartition('foo', 0)
295+
messages = [ConsumerRecord(tp.topic, tp.partition, i,
296+
None, None, 'key', 'value', 'checksum', 0, 0)
297+
for i in range(batch_start, batch_end)]
298+
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
299+
assert records.has_more()
300+
msgs = records.take(1)
301+
assert msgs[0].offset == 123
302+
assert records.fetch_offset == 124
303+
msgs = records.take(2)
304+
assert len(msgs) == 2
305+
assert records.has_more()
306+
records.discard()
307+
assert not records.has_more()

0 commit comments

Comments
 (0)