Skip to content

Commit d957986

Browse files
author
Gabriel Tincu
committed
Add zstd relevant code to a separate PR
1 parent a356394 commit d957986

File tree

7 files changed

+61
-8
lines changed

7 files changed

+61
-8
lines changed

kafka/codec.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
except ImportError:
1717
snappy = None
1818

19+
try:
20+
import zstandard as zstd
21+
except ImportError:
22+
zstd = None
23+
1924
try:
2025
import lz4.frame as lz4
2126

@@ -58,6 +63,10 @@ def has_snappy():
5863
return snappy is not None
5964

6065

66+
def has_zstd():
67+
return zstd is not None
68+
69+
6170
def has_lz4():
6271
if lz4 is not None:
6372
return True
@@ -299,3 +308,15 @@ def lz4_decode_old_kafka(payload):
299308
payload[header_size:]
300309
])
301310
return lz4_decode(munged_payload)
311+
312+
313+
def zstd_encode(payload):
314+
if not zstd:
315+
raise NotImplementedError("Zstd codec is not available")
316+
return zstd.ZstdCompressor().compress(payload)
317+
318+
319+
def zstd_decode(payload):
320+
if not zstd:
321+
raise NotImplementedError("Zstd codec is not available")
322+
return zstd.ZstdDecompressor().decompress(payload)

kafka/producer/kafka.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
import kafka.errors as Errors
1414
from kafka.client_async import KafkaClient, selectors
15-
from kafka.codec import has_gzip, has_snappy, has_lz4
15+
from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
1616
from kafka.metrics import MetricConfig, Metrics
1717
from kafka.partitioner.default import DefaultPartitioner
1818
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
@@ -119,7 +119,7 @@ class KafkaProducer(object):
119119
available guarantee.
120120
If unset, defaults to acks=1.
121121
compression_type (str): The compression type for all data generated by
122-
the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
122+
the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None.
123123
Compression is of full batches of data, so the efficacy of batching
124124
will also impact the compression ratio (more batching means better
125125
compression). Default: None.
@@ -339,6 +339,7 @@ class KafkaProducer(object):
339339
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
340340
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
341341
'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
342+
'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD),
342343
None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
343344
}
344345

@@ -388,6 +389,9 @@ def __init__(self, **configs):
388389
if self.config['compression_type'] == 'lz4':
389390
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
390391

392+
if self.config['compression_type'] == 'zstd':
393+
assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers'
394+
391395
# Check compression_type for library support
392396
ct = self.config['compression_type']
393397
if ct not in self._COMPRESSORS:

kafka/record/default_records.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@
6262
)
6363
from kafka.errors import CorruptRecordException, UnsupportedCodecError
6464
from kafka.codec import (
65-
gzip_encode, snappy_encode, lz4_encode,
66-
gzip_decode, snappy_decode, lz4_decode
65+
gzip_encode, snappy_encode, lz4_encode, zstd_encode,
66+
gzip_decode, snappy_decode, lz4_decode, zstd_decode
6767
)
6868
import kafka.codec as codecs
6969

@@ -97,6 +97,7 @@ class DefaultRecordBase(object):
9797
CODEC_GZIP = 0x01
9898
CODEC_SNAPPY = 0x02
9999
CODEC_LZ4 = 0x03
100+
CODEC_ZSTD = 0x04
100101
TIMESTAMP_TYPE_MASK = 0x08
101102
TRANSACTIONAL_MASK = 0x10
102103
CONTROL_MASK = 0x20
@@ -111,6 +112,8 @@ def _assert_has_codec(self, compression_type):
111112
checker, name = codecs.has_snappy, "snappy"
112113
elif compression_type == self.CODEC_LZ4:
113114
checker, name = codecs.has_lz4, "lz4"
115+
elif compression_type == self.CODEC_ZSTD:
116+
checker, name = codecs.has_zstd, "zstd"
114117
if not checker():
115118
raise UnsupportedCodecError(
116119
"Libraries for {} compression codec not found".format(name))
@@ -185,6 +188,8 @@ def _maybe_uncompress(self):
185188
uncompressed = snappy_decode(data.tobytes())
186189
if compression_type == self.CODEC_LZ4:
187190
uncompressed = lz4_decode(data.tobytes())
191+
if compression_type == self.CODEC_ZSTD:
192+
uncompressed = zstd_decode(data)
188193
self._buffer = bytearray(uncompressed)
189194
self._pos = 0
190195
self._decompressed = True
@@ -517,6 +522,8 @@ def _maybe_compress(self):
517522
compressed = snappy_encode(data)
518523
elif self._compression_type == self.CODEC_LZ4:
519524
compressed = lz4_encode(data)
525+
elif self._compression_type == self.CODEC_ZSTD:
526+
compressed = zstd_encode(data)
520527
compressed_size = len(compressed)
521528
if len(data) <= compressed_size:
522529
# We did not get any benefit from compression, lets send

kafka/record/memory_records.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class MemoryRecordsBuilder(object):
117117

118118
def __init__(self, magic, compression_type, batch_size):
119119
assert magic in [0, 1, 2], "Not supported magic"
120-
assert compression_type in [0, 1, 2, 3], "Not valid compression type"
120+
assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type"
121121
if magic >= 2:
122122
self._builder = DefaultRecordBatchBuilder(
123123
magic=magic, compression_type=compression_type,

test/test_codec.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
from kafka.vendor.six.moves import range
88

99
from kafka.codec import (
10-
has_snappy, has_lz4,
10+
has_snappy, has_lz4, has_zstd,
1111
gzip_encode, gzip_decode,
1212
snappy_encode, snappy_decode,
1313
lz4_encode, lz4_decode,
1414
lz4_encode_old_kafka, lz4_decode_old_kafka,
15+
zstd_encode, zstd_decode,
1516
)
1617

1718
from test.testutil import random_string
@@ -113,3 +114,11 @@ def test_lz4_incremental():
113114
b2 = lz4_decode(lz4_encode(b1))
114115
assert len(b1) == len(b2)
115116
assert b1 == b2
117+
118+
119+
@pytest.mark.skipif(not has_zstd(), reason="Zstd not available")
120+
def test_zstd():
121+
for _ in range(1000):
122+
b1 = random_string(100).encode('utf-8')
123+
b2 = zstd_decode(zstd_encode(b1))
124+
assert b1 == b2

test/test_producer.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def test_buffer_pool():
2323

2424

2525
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
26-
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
26+
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
2727
def test_end_to_end(kafka_broker, compression):
2828

2929
if compression == 'lz4':
@@ -34,10 +34,15 @@ def test_end_to_end(kafka_broker, compression):
3434
elif platform.python_implementation() == 'PyPy':
3535
return
3636

37+
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
38+
return
39+
env_version = env_kafka_version()
40+
api_version = env_version if env_version >= (2, 1, 0) else None
3741
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
3842
producer = KafkaProducer(bootstrap_servers=connect_str,
3943
retries=5,
4044
max_block_ms=30000,
45+
api_version=api_version,
4146
compression_type=compression,
4247
value_serializer=str.encode)
4348
consumer = KafkaConsumer(bootstrap_servers=connect_str,
@@ -81,16 +86,22 @@ def test_kafka_producer_gc_cleanup():
8186

8287

8388
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
84-
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
89+
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
8590
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
91+
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
92+
return
8693
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
94+
env_version = env_kafka_version()
95+
api_version = env_version if env_version >= (2, 1, 0) else None
8796
producer = KafkaProducer(bootstrap_servers=connect_str,
8897
retries=5,
98+
api_version=api_version,
8999
max_block_ms=30000,
90100
compression_type=compression)
91101
magic = producer._max_usable_produce_magic()
92102

93103
# record headers are supported in 0.11.0
104+
94105
if env_kafka_version() < (0, 11, 0):
95106
headers = None
96107
else:

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ deps =
1515
pytest-mock
1616
mock
1717
python-snappy
18+
zstandard
1819
lz4
1920
xxhash
2021
crc32c

0 commit comments

Comments
 (0)