@@ -25,7 +25,6 @@ def test_buffer_pool():
25
25
@pytest .mark .skipif (not env_kafka_version (), reason = "No KAFKA_VERSION set" )
26
26
@pytest .mark .parametrize ("compression" , [None , 'gzip' , 'snappy' , 'lz4' , 'zstd' ])
27
27
def test_end_to_end (kafka_broker , compression ):
28
-
29
28
if compression == 'lz4' :
30
29
# LZ4 requires 0.8.2
31
30
if env_kafka_version () < (0 , 8 , 2 ):
@@ -36,13 +35,12 @@ def test_end_to_end(kafka_broker, compression):
36
35
37
36
if compression == 'zstd' and env_kafka_version () < (2 , 1 , 0 ):
38
37
return
39
- env_version = env_kafka_version ()
40
- api_version = env_version if env_version >= (2 , 1 , 0 ) else None
38
+
41
39
connect_str = ':' .join ([kafka_broker .host , str (kafka_broker .port )])
42
40
producer = KafkaProducer (bootstrap_servers = connect_str ,
43
41
retries = 5 ,
44
42
max_block_ms = 30000 ,
45
- api_version = api_version ,
43
+ api_version = env_kafka_version () ,
46
44
compression_type = compression ,
47
45
value_serializer = str .encode )
48
46
consumer = KafkaConsumer (bootstrap_servers = connect_str ,
@@ -91,11 +89,9 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
91
89
if compression == 'zstd' and env_kafka_version () < (2 , 1 , 0 ):
92
90
return
93
91
connect_str = ':' .join ([kafka_broker .host , str (kafka_broker .port )])
94
- env_version = env_kafka_version ()
95
- api_version = env_version if env_version >= (2 , 1 , 0 ) else None
96
92
producer = KafkaProducer (bootstrap_servers = connect_str ,
97
93
retries = 5 ,
98
- api_version = api_version ,
94
+ api_version = env_kafka_version () ,
99
95
max_block_ms = 30000 ,
100
96
compression_type = compression )
101
97
magic = producer ._max_usable_produce_magic ()
0 commit comments