Skip to content

Commit 75fb69b

Browse files
committed
Introduce new fixtures to prepare for migration to pytest.
This commits adds new pytest fixtures in prepation for the migration of unittest.TestCases to pytest test cases. The handling of temporary dir creation was also changed so that we can use the pytest tmpdir fixture after the migration.
1 parent e166f50 commit 75fb69b

File tree

5 files changed

+244
-101
lines changed

5 files changed

+244
-101
lines changed

pylint.rc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[TYPECHECK]
22
ignored-classes=SyncManager,_socketobject
3+
generated-members=py.*
34

45
[MESSAGES CONTROL]
56
disable=E1129

test/conftest.py

Lines changed: 117 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,138 @@
11
from __future__ import absolute_import
22

3-
import os
3+
import inspect
44

55
import pytest
6+
from decorator import decorate
67

78
from test.fixtures import KafkaFixture, ZookeeperFixture
8-
9+
from test.testutil import kafka_version, random_string
910

1011
@pytest.fixture(scope="module")
1112
def version():
12-
if 'KAFKA_VERSION' not in os.environ:
13-
return ()
14-
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
15-
13+
return kafka_version()
1614

1715
@pytest.fixture(scope="module")
18-
def zookeeper(version, request):
19-
assert version
16+
def zookeeper():
2017
zk = ZookeeperFixture.instance()
2118
yield zk
2219
zk.close()
2320

21+
_params = {}
22+
def _set_params(func, *args, **kw):
23+
"""Private helper method for set_params
24+
"""
25+
ret = func(*args, **kw)
26+
return ret
27+
28+
def set_params(**params):
29+
"""Allows specific parameters to be injected into the instatiation of a fixture.
30+
31+
The parameters are specified as shown in the example below:
32+
33+
@set_params(simple_consumer={'auto_offset_reset':'earliest'})
34+
def test_consumer_with_earliest(simple_consumer):
35+
...
36+
37+
@set_params(simple_consumer={'auto_offset_reset':'latest'})
38+
def test_consumer_with_latest(simple_consumer):
39+
...
40+
41+
The parameters set this way are exposed to the fixture method as an injected parameter:
42+
43+
@pytest.fixture
44+
def simple_consumer(...some_params..., simple_consumer_params):
45+
...
46+
47+
This avoids having to explicitly handle instantiation and destruction on each test method.
48+
This works only for fixtures with "function" scope.
49+
50+
Note that this is different from the behaviour of @pytest.mark.parametrize, since
51+
@pytest.mark.parametrize will force multiple executions of the same test for the different
52+
parameter values, which is different from the intended use above.
53+
"""
54+
global _params
55+
56+
def real_set_params(func, *args, **kwargs):
57+
_params[func.__name__] = params
58+
ret = decorate(func, _set_params)
59+
return ret
60+
61+
return real_set_params
62+
63+
# this must be called from the module where the broker is used
64+
def set_broker_params(**params):
65+
global _params
66+
caller = inspect.stack()[1]
67+
if isinstance(caller, tuple):
68+
filename = caller[1]
69+
else:
70+
filename = inspect.stack()[1].filename
71+
_params[filename] = params
72+
73+
@pytest.fixture(scope='module')
74+
def broker_params(request):
75+
global _params
76+
return _params[request.node.fspath] if request.node.fspath in _params else None
77+
78+
@pytest.fixture
79+
def kafka_consumer_params(request):
80+
return _get_params(request)
81+
82+
@pytest.fixture
83+
def kafka_producer_params(request):
84+
return _get_params(request)
85+
86+
def _get_params(request):
87+
global _params
88+
if request._pyfuncitem.name in _params and \
89+
request.fixturename.replace('_params', '') in _params[request._pyfuncitem.name]:
90+
return _params[request._pyfuncitem.name][request.fixturename.replace('_params', '')]
91+
return None
92+
93+
@pytest.fixture(scope="module")
94+
def kafka_broker(kafka_brokers):
95+
return kafka_brokers[0]
2496

2597
@pytest.fixture(scope="module")
26-
def kafka_broker(version, zookeeper, request):
27-
assert version
28-
k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
29-
partitions=4)
30-
yield k
31-
k.close()
98+
def kafka_brokers(version, zookeeper, broker_params):
99+
assert version, 'KAFKA_VERSION must be specified to run integration tests'
100+
params = {} if broker_params is None else broker_params.copy()
101+
params.setdefault('partitions', 4)
102+
num_brokers = params.pop('num_brokers', 1)
103+
brokers = tuple(KafkaFixture.instance(x, zookeeper.host, zookeeper.port, **params)
104+
for x in range(num_brokers))
105+
yield brokers
106+
for broker in brokers:
107+
broker.close()
32108

109+
@pytest.fixture
110+
def kafka_client(kafka_broker, request):
111+
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
112+
yield client
113+
client.close()
114+
115+
@pytest.fixture
116+
def kafka_consumer(kafka_broker, topic, request, kafka_consumer_params):
117+
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
118+
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
119+
(consumer,) = kafka_broker.get_consumers(cnt=1, topics=[topic], **params)
120+
yield consumer
121+
consumer.close()
122+
123+
@pytest.fixture
124+
def kafka_producer(kafka_broker, request, kafka_producer_params):
125+
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
126+
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
127+
(producer,) = kafka_broker.get_producers(cnt=1, **params)
128+
yield producer
129+
producer.close()
130+
131+
@pytest.fixture
132+
def topic(kafka_broker, request):
133+
topic_name = '%s_%s' % (request.node.name, random_string(10))
134+
kafka_broker.create_topics([topic_name])
135+
return topic_name
33136

34137
@pytest.fixture
35138
def conn(mocker):

0 commit comments

Comments
 (0)