Skip to content

Migrate from Unittest to pytest #1620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import absolute_import

import inspect

import pytest

from test.fixtures import KafkaFixture, ZookeeperFixture, random_string, version as kafka_version
Expand Down
16 changes: 9 additions & 7 deletions test/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from mock import MagicMock, patch
from . import unittest
import pytest

from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.errors import (
Expand All @@ -11,17 +12,13 @@
FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload)


class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])

class TestKafkaConsumer:
def test_session_timeout_larger_than_request_timeout_raises(self):
with self.assertRaises(KafkaConfigurationError):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000)

def test_fetch_max_wait_larger_than_request_timeout_raises(self):
with self.assertRaises(KafkaConfigurationError):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)

def test_subscription_copy(self):
Expand All @@ -43,7 +40,12 @@ def test_partition_list(self):
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member


class TestSimpleConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])

def test_simple_consumer_failed_payloads(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
Expand Down
20 changes: 2 additions & 18 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pytest
from kafka.vendor import six

from kafka import SimpleClient
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
from kafka.coordinator.base import MemberState, Generation
Expand All @@ -20,25 +19,10 @@ def get_connect_str(kafka_broker):
return kafka_broker.host + ':' + str(kafka_broker.port)


@pytest.fixture
def simple_client(kafka_broker):
return SimpleClient(get_connect_str(kafka_broker))


@pytest.fixture
def topic(simple_client):
topic = random_string(5)
simple_client.ensure_topic_exists(topic)
return topic


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_consumer(kafka_broker, version):

def test_consumer(kafka_broker, topic, version):
# The `topic` fixture is included because
# 0.8.2 brokers need a topic to function well
if version >= (0, 8, 2) and version < (0, 9):
topic(simple_client(kafka_broker))

consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
consumer.poll(500)
assert len(consumer._client._conns) > 0
Expand Down
19 changes: 10 additions & 9 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@

from test.conftest import version
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string
from test.testutil import (
KafkaIntegrationTestCase, kafka_versions, Timer,
send_messages
)
from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
"""Test KafkaConsumer
"""
def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
"""Test KafkaConsumer"""
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')

send_messages(simple_client, topic, 0, range(0, 100))
send_messages(simple_client, topic, 1, range(100, 200))
# TODO replace this with a `send_messages()` pytest fixture
# as we will likely need this elsewhere
for i in range(0, 100):
kafka_producer.send(topic, partition=0, value=str(i).encode())
for i in range(100, 200):
kafka_producer.send(topic, partition=1, value=str(i).encode())
kafka_producer.flush()

cnt = 0
messages = {0: set(), 1: set()}
Expand Down
23 changes: 10 additions & 13 deletions test/test_package.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
from . import unittest


class TestPackage(unittest.TestCase):
class TestPackage:
def test_top_level_namespace(self):
import kafka as kafka1
self.assertEqual(kafka1.KafkaConsumer.__name__, "KafkaConsumer")
self.assertEqual(kafka1.consumer.__name__, "kafka.consumer")
self.assertEqual(kafka1.codec.__name__, "kafka.codec")
assert kafka1.KafkaConsumer.__name__ == "KafkaConsumer"
assert kafka1.consumer.__name__ == "kafka.consumer"
assert kafka1.codec.__name__ == "kafka.codec"

def test_submodule_namespace(self):
import kafka.client as client1
self.assertEqual(client1.__name__, "kafka.client")
assert client1.__name__ == "kafka.client"

from kafka import client as client2
self.assertEqual(client2.__name__, "kafka.client")
assert client2.__name__ == "kafka.client"

from kafka.client import SimpleClient as SimpleClient1
self.assertEqual(SimpleClient1.__name__, "SimpleClient")
assert SimpleClient1.__name__ == "SimpleClient"

from kafka.codec import gzip_encode as gzip_encode1
self.assertEqual(gzip_encode1.__name__, "gzip_encode")
assert gzip_encode1.__name__ == "gzip_encode"

from kafka import SimpleClient as SimpleClient2
self.assertEqual(SimpleClient2.__name__, "SimpleClient")
assert SimpleClient2.__name__ == "SimpleClient"

from kafka.codec import snappy_encode
self.assertEqual(snappy_encode.__name__, "snappy_encode")
assert snappy_encode.__name__ == "snappy_encode"
26 changes: 3 additions & 23 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@
import functools
import operator
import os
import socket
import time
import uuid

import pytest
from . import unittest

from kafka import SimpleClient, create_message
from kafka import SimpleClient
from kafka.errors import (
LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
FailedPayloadsError
)
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
from kafka.structs import OffsetRequestPayload
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order


Expand Down Expand Up @@ -67,26 +66,6 @@ def wrapper(func, *args, **kwargs):
return real_kafka_versions


_MESSAGES = {}
def msg(message):
"""Format, encode and deduplicate a message
"""
global _MESSAGES #pylint: disable=global-statement
if message not in _MESSAGES:
_MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4()))

return _MESSAGES[message].encode('utf-8')

def send_messages(client, topic, partition, messages):
"""Send messages to a topic's partition
"""
messages = [create_message(msg(str(m))) for m in messages]
produce = ProduceRequestPayload(topic, partition, messages=messages)
resp, = client.send_produce_request([produce])
assert resp.error == 0

return [x.value for x in messages]

def current_offset(client, topic, partition, kafka_broker=None):
"""Get the current offset of a topic's partition
"""
Expand All @@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None):
else:
return offsets.offsets[0]


class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None
Expand Down
1 change: 0 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ deps =
xxhash
crc32c
py26: unittest2
decorator
commands =
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}
setenv =
Expand Down