@@ -44,6 +44,7 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
44
44
45
45
assert len (messages [0 ]) == 100
46
46
assert len (messages [1 ]) == 100
47
+ kafka_consumer .close ()
47
48
48
49
49
50
class TestConsumerIntegration (KafkaIntegrationTestCase ):
@@ -558,6 +559,7 @@ def test_kafka_consumer__blocking(self):
558
559
messages .add ((msg .partition , msg .offset ))
559
560
self .assertEqual (len (messages ), 5 )
560
561
self .assertGreaterEqual (t .interval , TIMEOUT_MS / 1000.0 )
562
+ consumer .close ()
561
563
562
564
@kafka_versions ('>=0.8.1' )
563
565
def test_kafka_consumer__offset_commit_resume (self ):
@@ -597,6 +599,7 @@ def test_kafka_consumer__offset_commit_resume(self):
597
599
output_msgs2 .append (m )
598
600
self .assert_message_count (output_msgs2 , 20 )
599
601
self .assertEqual (len (set (output_msgs1 ) | set (output_msgs2 )), 200 )
602
+ consumer2 .close ()
600
603
601
604
@kafka_versions ('>=0.10.1' )
602
605
def test_kafka_consumer_max_bytes_simple (self ):
@@ -617,6 +620,7 @@ def test_kafka_consumer_max_bytes_simple(self):
617
620
self .assertEqual (
618
621
seen_partitions , set ([
619
622
TopicPartition (self .topic , 0 ), TopicPartition (self .topic , 1 )]))
623
+ consumer .close ()
620
624
621
625
@kafka_versions ('>=0.10.1' )
622
626
def test_kafka_consumer_max_bytes_one_msg (self ):
@@ -642,6 +646,7 @@ def test_kafka_consumer_max_bytes_one_msg(self):
642
646
643
647
fetched_msgs = [next (consumer ) for i in range (10 )]
644
648
self .assertEqual (len (fetched_msgs ), 10 )
649
+ consumer .close ()
645
650
646
651
@kafka_versions ('>=0.10.1' )
647
652
def test_kafka_consumer_offsets_for_time (self ):
@@ -695,6 +700,7 @@ def test_kafka_consumer_offsets_for_time(self):
695
700
self .assertEqual (offsets , {
696
701
tp : late_msg .offset + 1
697
702
})
703
+ consumer .close ()
698
704
699
705
@kafka_versions ('>=0.10.1' )
700
706
def test_kafka_consumer_offsets_search_many_partitions (self ):
@@ -733,6 +739,7 @@ def test_kafka_consumer_offsets_search_many_partitions(self):
733
739
tp0 : p0msg .offset + 1 ,
734
740
tp1 : p1msg .offset + 1
735
741
})
742
+ consumer .close ()
736
743
737
744
@kafka_versions ('<0.10.1' )
738
745
def test_kafka_consumer_offsets_for_time_old (self ):
0 commit comments