diff --git a/kafka/codec.py b/kafka/codec.py index c740a181c..b73df060d 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -193,8 +193,15 @@ def _detect_xerial_stream(payload): """ if len(payload) > 16: - header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) - return header == _XERIAL_V1_HEADER + magic = struct.unpack('!' + _XERIAL_V1_FORMAT[:8], bytes(payload)[:8]) + version, compat = struct.unpack('!' + _XERIAL_V1_FORMAT[8:], bytes(payload)[8:16]) + # Until there is more than one way to do xerial blocking, the version + compat + # fields can be ignored. Also some producers (i.e., redpanda) are known to + # incorrectly encode these as little-endian, and that causes us to fail decoding + # when we otherwise would have succeeded. + # See https://github.com/dpkp/kafka-python/issues/2414 + if magic == _XERIAL_V1_HEADER[:8]: + return True return False diff --git a/test/test_codec.py b/test/test_codec.py index e05707451..24159c253 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -39,12 +39,14 @@ def test_snappy_detect_xerial(): _detect_xerial_stream = kafka1.codec._detect_xerial_stream header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' + redpanda_header = b'\x82SNAPPY\x00\x01\x00\x00\x00\x01\x00\x00\x00Some extra bytes' false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' default_snappy = snappy_encode(b'foobar' * 50) random_snappy = snappy_encode(b'SNAPPY' * 50, xerial_compatible=False) short_data = b'\x01\x02\x03\x04' assert _detect_xerial_stream(header) is True + assert _detect_xerial_stream(redpanda_header) is True assert _detect_xerial_stream(b'') is False assert _detect_xerial_stream(b'\x00') is False assert _detect_xerial_stream(false_header) is False