Skip to content

Commit 8fbbbb3

Browse files
committed
test-this-too
1 parent 461d16b commit 8fbbbb3

File tree

2 files changed

+11
-32
lines changed

2 files changed

+11
-32
lines changed

test/test_consumer_integration.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,19 @@
2525

2626
from test.conftest import version
2727
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string
28-
from test.testutil import (
29-
KafkaIntegrationTestCase, kafka_versions, Timer,
30-
send_messages
31-
)
28+
from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer
3229

3330

3431
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
35-
def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
36-
"""Test KafkaConsumer
37-
"""
32+
def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
33+
"""Test KafkaConsumer"""
3834
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
3935

40-
send_messages(simple_client, topic, 0, range(0, 100))
41-
send_messages(simple_client, topic, 1, range(100, 200))
36+
for i in range(0, 100):
37+
kafka_producer.send(topic, partition=0, value=str(i).encode())
38+
for i in range(100, 200):
39+
kafka_producer.send(topic, partition=2, value=str(i).encode())
40+
kafka_producer.flush()
4241

4342
cnt = 0
4443
messages = {0: set(), 1: set()}

test/testutil.py

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,19 @@
33
import functools
44
import operator
55
import os
6-
import socket
76
import time
87
import uuid
98

109
import pytest
1110
from . import unittest
1211

13-
from kafka import SimpleClient, create_message
12+
from kafka import SimpleClient
1413
from kafka.errors import (
1514
LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
1615
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
1716
FailedPayloadsError
1817
)
19-
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
18+
from kafka.structs import OffsetRequestPayload
2019
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
2120

2221

@@ -67,26 +66,6 @@ def wrapper(func, *args, **kwargs):
6766
return real_kafka_versions
6867

6968

70-
_MESSAGES = {}
71-
def msg(message):
72-
"""Format, encode and deduplicate a message
73-
"""
74-
global _MESSAGES #pylint: disable=global-statement
75-
if message not in _MESSAGES:
76-
_MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4()))
77-
78-
return _MESSAGES[message].encode('utf-8')
79-
80-
def send_messages(client, topic, partition, messages):
81-
"""Send messages to a topic's partition
82-
"""
83-
messages = [create_message(msg(str(m))) for m in messages]
84-
produce = ProduceRequestPayload(topic, partition, messages=messages)
85-
resp, = client.send_produce_request([produce])
86-
assert resp.error == 0
87-
88-
return [x.value for x in messages]
89-
9069
def current_offset(client, topic, partition, kafka_broker=None):
9170
"""Get the current offset of a topic's partition
9271
"""
@@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None):
10180
else:
10281
return offsets.offsets[0]
10382

83+
10484
class KafkaIntegrationTestCase(unittest.TestCase):
10585
create_client = True
10686
topic = None

0 commit comments

Comments
 (0)