Skip to content

Commit bc573e3

Browse files
committed
More tests
1 parent a7d8ae5 commit bc573e3

File tree

2 files changed

+200
-15
lines changed

2 files changed

+200
-15
lines changed

kafka/consumer/fetcher.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,11 @@ def _message_generator(self):
400400

401401
tp = self._next_partition_records.topic_partition
402402

403+
# We can ignore any prior signal to drop pending message sets
404+
# because we are starting from a fresh one where fetch_offset == position
405+
# i.e., the user seek()'d to this position
406+
self._subscriptions.assignment[tp].drop_pending_message_set = False
407+
403408
for msg in self._next_partition_records.take():
404409

405410
# Because we are in a generator, it is possible for

test/test_fetcher.py

Lines changed: 195 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import time
99

1010
from kafka.client_async import KafkaClient
11+
from kafka.codec import gzip_encode
1112
from kafka.consumer.fetcher import (
1213
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
1314
)
@@ -16,11 +17,12 @@
1617
from kafka.protocol.fetch import FetchRequest, FetchResponse
1718
from kafka.protocol.message import Message
1819
from kafka.protocol.offset import OffsetResponse
20+
from kafka.protocol.types import Int64, Int32
1921
from kafka.structs import TopicPartition
2022
from kafka.future import Future
2123
from kafka.errors import (
2224
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
23-
UnknownTopicOrPartitionError
25+
UnknownTopicOrPartitionError, OffsetOutOfRangeError
2426
)
2527

2628

@@ -294,7 +296,7 @@ def test__handle_offset_response(fetcher, mocker):
294296

295297

296298
def test_partition_records_offset():
297-
"""Test that compressed messagesets are handle correctly
299+
"""Test that compressed messagesets are handled correctly
298300
when fetch offset is in the middle of the message list
299301
"""
300302
batch_start = 120
@@ -317,6 +319,7 @@ def test_partition_records_offset():
317319

318320

319321
def test_fetched_records(fetcher, topic, mocker):
322+
fetcher.config['check_crcs'] = False
320323
tp = TopicPartition(topic, 0)
321324
msgs = []
322325
for i in range(10):
@@ -327,7 +330,6 @@ def test_fetched_records(fetcher, topic, mocker):
327330
mocker.MagicMock()
328331
)
329332
fetcher._completed_fetches.append(completed_fetch)
330-
fetcher.config['check_crcs'] = False
331333
records, partial = fetcher.fetched_records()
332334
assert tp in records
333335
assert len(records[tp]) == len(msgs)
@@ -341,7 +343,7 @@ def test_fetched_records(fetcher, topic, mocker):
341343
-1, 100, 100,
342344
[('foo', [(0, 0, 1000),])]),
343345
FetchResponse[0](
344-
[("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),])]),]),
346+
[("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
345347
1,
346348
),
347349
(
@@ -351,8 +353,8 @@ def test_fetched_records(fetcher, topic, mocker):
351353
FetchResponse[1](
352354
0,
353355
[("foo", [
354-
(0, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]),
355-
(1, 0, 1000, [(0, Message(b'abc', magic=0)._encode_self()),]),
356+
(0, 0, 1000, [(0, b'xxx'),]),
357+
(1, 0, 1000, [(0, b'xxx'),]),
356358
]),]),
357359
2,
358360
),
@@ -361,7 +363,7 @@ def test_fetched_records(fetcher, topic, mocker):
361363
-1, 100, 100,
362364
[('foo', [(0, 0, 1000),])]),
363365
FetchResponse[2](
364-
0, [("foo", [(0, 0, 1000, [(0, Message(b'abc', magic=1)._encode_self()),])]),]),
366+
0, [("foo", [(0, 0, 1000, [(0, b'xxx'),])]),]),
365367
1,
366368
),
367369
(
@@ -395,17 +397,195 @@ def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_part
395397
assert len(fetcher._completed_fetches) == num_partitions
396398

397399

398-
def test__unpack_message_set():
399-
pass
400+
def test__unpack_message_set(fetcher):
401+
fetcher.config['check_crcs'] = False
402+
tp = TopicPartition('foo', 0)
403+
messages = [
404+
(0, None, Message(b'a')),
405+
(1, None, Message(b'b')),
406+
(2, None, Message(b'c'))
407+
]
408+
records = list(fetcher._unpack_message_set(tp, messages))
409+
assert len(records) == 3
410+
assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
411+
assert records[0].value == b'a'
412+
assert records[1].value == b'b'
413+
assert records[2].value == b'c'
414+
assert records[0].offset == 0
415+
assert records[1].offset == 1
416+
assert records[2].offset == 2
417+
418+
419+
def test__unpack_message_set_compressed_v0(fetcher):
420+
fetcher.config['check_crcs'] = False
421+
tp = TopicPartition('foo', 0)
422+
messages = [
423+
(0, None, Message(b'a')),
424+
(1, None, Message(b'b')),
425+
(2, None, Message(b'c')),
426+
]
427+
message_bytes = []
428+
for offset, _, m in messages:
429+
encoded = m.encode()
430+
message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
431+
compressed_bytes = gzip_encode(b''.join(message_bytes))
432+
compressed_base_offset = 0
433+
compressed_msgs = [
434+
(compressed_base_offset, None,
435+
Message(compressed_bytes,
436+
magic=0,
437+
attributes=Message.CODEC_GZIP))
438+
]
439+
records = list(fetcher._unpack_message_set(tp, compressed_msgs))
440+
assert len(records) == 3
441+
assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
442+
assert records[0].value == b'a'
443+
assert records[1].value == b'b'
444+
assert records[2].value == b'c'
445+
assert records[0].offset == 0
446+
assert records[1].offset == 1
447+
assert records[2].offset == 2
448+
449+
450+
def test__unpack_message_set_compressed_v1(fetcher):
451+
fetcher.config['check_crcs'] = False
452+
tp = TopicPartition('foo', 0)
453+
messages = [
454+
(0, None, Message(b'a')),
455+
(1, None, Message(b'b')),
456+
(2, None, Message(b'c')),
457+
]
458+
message_bytes = []
459+
for offset, _, m in messages:
460+
encoded = m.encode()
461+
message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
462+
compressed_bytes = gzip_encode(b''.join(message_bytes))
463+
compressed_base_offset = 10
464+
compressed_msgs = [
465+
(compressed_base_offset, None,
466+
Message(compressed_bytes,
467+
magic=1,
468+
attributes=Message.CODEC_GZIP))
469+
]
470+
records = list(fetcher._unpack_message_set(tp, compressed_msgs))
471+
assert len(records) == 3
472+
assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
473+
assert records[0].value == b'a'
474+
assert records[1].value == b'b'
475+
assert records[2].value == b'c'
476+
assert records[0].offset == 8
477+
assert records[1].offset == 9
478+
assert records[2].offset == 10
479+
480+
481+
def test__parse_record(fetcher):
482+
tp = TopicPartition('foo', 0)
483+
record = fetcher._parse_record(tp, 123, 456, Message(b'abc'))
484+
assert record.topic == 'foo'
485+
assert record.partition == 0
486+
assert record.offset == 123
487+
assert record.timestamp == 456
488+
assert record.value == b'abc'
489+
assert record.key is None
400490

401491

402-
def test__parse_record():
403-
pass
492+
def test__message_generator(fetcher, topic, mocker):
493+
fetcher.config['check_crcs'] = False
494+
tp = TopicPartition(topic, 0)
495+
msgs = []
496+
for i in range(10):
497+
msg = Message(b'foo')
498+
msgs.append((i, -1, msg))
499+
completed_fetch = CompletedFetch(
500+
tp, 0, 0, [0, 100, msgs],
501+
mocker.MagicMock()
502+
)
503+
fetcher._completed_fetches.append(completed_fetch)
504+
for i in range(10):
505+
msg = next(fetcher)
506+
assert isinstance(msg, ConsumerRecord)
507+
assert msg.offset == i
508+
assert msg.value == b'foo'
404509

405510

406-
def test_message_generator():
407-
pass
511+
def test__parse_fetched_data(fetcher, topic, mocker):
512+
fetcher.config['check_crcs'] = False
513+
tp = TopicPartition(topic, 0)
514+
msgs = []
515+
for i in range(10):
516+
msg = Message(b'foo')
517+
msgs.append((i, -1, msg))
518+
completed_fetch = CompletedFetch(
519+
tp, 0, 0, [0, 100, msgs],
520+
mocker.MagicMock()
521+
)
522+
partition_record = fetcher._parse_fetched_data(completed_fetch)
523+
assert isinstance(partition_record, fetcher.PartitionRecords)
524+
assert len(partition_record) == 10
408525

409526

410-
def test__parse_fetched_data():
411-
pass
527+
def test__parse_fetched_data__paused(fetcher, topic, mocker):
528+
fetcher.config['check_crcs'] = False
529+
tp = TopicPartition(topic, 0)
530+
msgs = []
531+
for i in range(10):
532+
msg = Message(b'foo')
533+
msgs.append((i, -1, msg))
534+
completed_fetch = CompletedFetch(
535+
tp, 0, 0, [0, 100, msgs],
536+
mocker.MagicMock()
537+
)
538+
fetcher._subscriptions.pause(tp)
539+
partition_record = fetcher._parse_fetched_data(completed_fetch)
540+
assert partition_record is None
541+
542+
543+
def test__parse_fetched_data__stale_offset(fetcher, topic, mocker):
544+
fetcher.config['check_crcs'] = False
545+
tp = TopicPartition(topic, 0)
546+
msgs = []
547+
for i in range(10):
548+
msg = Message(b'foo')
549+
msgs.append((i, -1, msg))
550+
completed_fetch = CompletedFetch(
551+
tp, 10, 0, [0, 100, msgs],
552+
mocker.MagicMock()
553+
)
554+
partition_record = fetcher._parse_fetched_data(completed_fetch)
555+
assert partition_record is None
556+
557+
558+
def test__parse_fetched_data__not_leader(fetcher, topic, mocker):
559+
fetcher.config['check_crcs'] = False
560+
tp = TopicPartition(topic, 0)
561+
completed_fetch = CompletedFetch(
562+
tp, 0, 0, [NotLeaderForPartitionError.errno, -1, None],
563+
mocker.MagicMock()
564+
)
565+
partition_record = fetcher._parse_fetched_data(completed_fetch)
566+
assert partition_record is None
567+
fetcher._client.cluster.request_update.assert_called_with()
568+
569+
570+
def test__parse_fetched_data__unknown_tp(fetcher, topic, mocker):
571+
fetcher.config['check_crcs'] = False
572+
tp = TopicPartition(topic, 0)
573+
completed_fetch = CompletedFetch(
574+
tp, 0, 0, [UnknownTopicOrPartitionError.errno, -1, None],
575+
mocker.MagicMock()
576+
)
577+
partition_record = fetcher._parse_fetched_data(completed_fetch)
578+
assert partition_record is None
579+
fetcher._client.cluster.request_update.assert_called_with()
580+
581+
582+
def test__parse_fetched_data__out_of_range(fetcher, topic, mocker):
583+
fetcher.config['check_crcs'] = False
584+
tp = TopicPartition(topic, 0)
585+
completed_fetch = CompletedFetch(
586+
tp, 0, 0, [OffsetOutOfRangeError.errno, -1, None],
587+
mocker.MagicMock()
588+
)
589+
partition_record = fetcher._parse_fetched_data(completed_fetch)
590+
assert partition_record is None
591+
assert fetcher._subscriptions.assignment[tp].awaiting_reset is True

0 commit comments

Comments
 (0)