Skip to content

Commit d33f21a

Browse files
author
Gabriel Tincu
committed
Update producer test to use pytest.skip instead of return statements
Harden zstd decompression for missing frame size information (can happen when the sender is not under our control)
1 parent df1782f commit d33f21a

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

kafka/codec.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
1212
_XERIAL_V1_FORMAT = 'bccccccBii'
13+
ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3
1314

1415
try:
1516
import snappy
@@ -319,4 +320,7 @@ def zstd_encode(payload):
319320
def zstd_decode(payload):
320321
if not zstd:
321322
raise NotImplementedError("Zstd codec is not available")
322-
return zstd.ZstdDecompressor().decompress(payload)
323+
try:
324+
return zstd.ZstdDecompressor().decompress(payload)
325+
except zstd.ZstdError:
326+
return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)

test/test_producer.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ def test_buffer_pool():
2626
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
2727
def test_end_to_end(kafka_broker, compression):
2828
if compression == 'lz4':
29-
# LZ4 requires 0.8.2
3029
if env_kafka_version() < (0, 8, 2):
31-
return
32-
# python-lz4 crashes on older versions of pypy
30+
pytest.skip('LZ4 requires 0.8.2')
3331
elif platform.python_implementation() == 'PyPy':
34-
return
32+
pytest.skip('python-lz4 crashes on older versions of pypy')
3533

3634
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
37-
return
35+
pytest.skip('zstd requires kafka 2.1.0 or newer')
3836

3937
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
4038
producer = KafkaProducer(bootstrap_servers=connect_str,
@@ -87,7 +85,7 @@ def test_kafka_producer_gc_cleanup():
8785
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
8886
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
8987
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
90-
return
88+
pytest.skip('zstd requires 2.1.0 or more')
9189
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
9290
producer = KafkaProducer(bootstrap_servers=connect_str,
9391
retries=5,
@@ -130,10 +128,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
130128
if headers:
131129
assert record.serialized_header_size == 22
132130

133-
# generated timestamp case is skipped for broker 0.9 and below
134131
if magic == 0:
135-
return
136-
132+
pytest.skip('generated timestamp case is skipped for broker 0.9 and below')
137133
send_time = time.time() * 1000
138134
future = producer.send(
139135
topic,

0 commit comments

Comments
 (0)