Skip to content

Commit 55ded55

Browse files
committed
Added unit tests for fetcher's _reset_offset and related functions.
1 parent 1f69f8f commit 55ded55

File tree

3 files changed

+199
-7
lines changed

3 files changed

+199
-7
lines changed

kafka/consumer/fetcher.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ def _reset_offset(self, partition):
228228
log.debug("Resetting offset for partition %s to %s offset.",
229229
partition, strategy)
230230
offsets = self._retrieve_offsets({partition: timestamp})
231-
assert partition in offsets
231+
if partition not in offsets:
232+
raise NoOffsetForPartitionError(partition)
232233
offset = offsets[partition][0]
233234

234235
# we might lose the assignment while fetching the offset,
@@ -667,10 +668,14 @@ def on_success(value):
667668
offsets.update(r)
668669
list_offsets_future.success(offsets)
669670

671+
def on_fail(err):
672+
if not list_offsets_future.is_done:
673+
list_offsets_future.failure(err)
674+
670675
for node_id, timestamps in six.iteritems(timestamps_by_node):
671676
_f = self._send_offset_request(node_id, timestamps)
672677
_f.add_callback(on_success)
673-
_f.add_errback(lambda e: list_offsets_future.failure(e))
678+
_f.add_errback(on_fail)
674679
return list_offsets_future
675680

676681
def _send_offset_request(self, node_id, timestamps):
@@ -717,10 +722,13 @@ def _handle_offset_response(self, future, response):
717722
if response.API_VERSION == 0:
718723
offsets = partition_info[2]
719724
assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
720-
if offsets:
725+
if not offsets:
726+
offset = UNKNOWN_OFFSET
727+
else:
721728
offset = offsets[0]
722-
log.debug("Handling v0 ListOffsetResponse response for %s. "
723-
"Fetched offset %s", partition, offset)
729+
log.debug("Handling v0 ListOffsetResponse response for %s. "
730+
"Fetched offset %s", partition, offset)
731+
if offset != UNKNOWN_OFFSET:
724732
timestamp_offset_map[partition] = (offset, None)
725733
else:
726734
timestamp, offset = partition_info[2:]
@@ -739,16 +747,19 @@ def _handle_offset_response(self, future, response):
739747
" to obsolete leadership information, retrying.",
740748
partition)
741749
future.failure(error_type(partition))
750+
return
742751
elif error_type is Errors.UnknownTopicOrPartitionError:
743752
log.warn("Received unknown topic or partition error in ListOffset "
744753
"request for partition %s. The topic/partition " +
745754
"may not exist or the user may not have Describe access "
746755
"to it.", partition)
747756
future.failure(error_type(partition))
757+
return
748758
else:
749759
log.warning("Attempt to fetch offsets for partition %s failed due to:"
750760
" %s", partition, error_type)
751761
future.failure(error_type(partition))
762+
return
752763
if not future.is_done:
753764
future.success(timestamp_offset_map)
754765

test/test_consumer_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,7 +741,7 @@ def test_kafka_consumer_offsets_for_time_old(self):
741741
with self.assertRaises(UnsupportedVersionError):
742742
consumer.end_offsets([tp])
743743

744-
@kafka_versions('<0.10.1')
744+
@kafka_versions('>=0.10.1')
745745
def test_kafka_consumer_offsets_for_times_errors(self):
746746
consumer = self.kafka_consumer()
747747
tp = TopicPartition(self.topic, 0)

test/test_fetcher.py

Lines changed: 182 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,21 @@
33

44
import pytest
55

6+
import itertools
7+
from collections import OrderedDict
8+
69
from kafka.client_async import KafkaClient
7-
from kafka.consumer.fetcher import Fetcher
10+
from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError
811
from kafka.consumer.subscription_state import SubscriptionState
912
from kafka.metrics import Metrics
1013
from kafka.protocol.fetch import FetchRequest
14+
from kafka.protocol.offset import OffsetResponse
1115
from kafka.structs import TopicPartition
16+
from kafka.future import Future
17+
from kafka.errors import (
18+
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
19+
UnknownTopicOrPartitionError
20+
)
1221

1322

1423
@pytest.fixture
@@ -101,3 +110,175 @@ def test_update_fetch_positions(fetcher, mocker):
101110
fetcher.update_fetch_positions([partition])
102111
assert fetcher._reset_offset.call_count == 0
103112
fetcher._subscriptions.seek.assert_called_with(partition, 123)
113+
114+
115+
def test__reset_offset(fetcher, mocker):
116+
tp = TopicPartition("topic", 0)
117+
fetcher._subscriptions.subscribe(topics="topic")
118+
fetcher._subscriptions.assign_from_subscribed([tp])
119+
fetcher._subscriptions.need_offset_reset(tp)
120+
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')
121+
122+
mocked.return_value = {}
123+
with pytest.raises(NoOffsetForPartitionError):
124+
fetcher._reset_offset(tp)
125+
126+
mocked.return_value = {tp: (1001, None)}
127+
fetcher._reset_offset(tp)
128+
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
129+
assert fetcher._subscriptions.assignment[tp].position == 1001
130+
131+
132+
def test__send_offset_requests(fetcher, mocker):
133+
tp = TopicPartition("topic_send_offset", 1)
134+
mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
135+
send_futures = []
136+
137+
def send_side_effect(*args, **kw):
138+
f = Future()
139+
send_futures.append(f)
140+
return f
141+
mocked_send.side_effect = send_side_effect
142+
143+
mocked_leader = mocker.patch.object(
144+
fetcher._client.cluster, "leader_for_partition")
145+
# First we report unavailable leader 2 times different ways and later
146+
# always as available
147+
mocked_leader.side_effect = itertools.chain(
148+
[None, -1], itertools.cycle([0]))
149+
150+
# Leader == None
151+
fut = fetcher._send_offset_requests({tp: 0})
152+
assert fut.failed()
153+
assert isinstance(fut.exception, StaleMetadata)
154+
assert not mocked_send.called
155+
156+
# Leader == -1
157+
fut = fetcher._send_offset_requests({tp: 0})
158+
assert fut.failed()
159+
assert isinstance(fut.exception, LeaderNotAvailableError)
160+
assert not mocked_send.called
161+
162+
# Leader == 0, send failed
163+
fut = fetcher._send_offset_requests({tp: 0})
164+
assert not fut.is_done
165+
assert mocked_send.called
166+
# Check that we bound the futures correctly to chain failure
167+
send_futures.pop().failure(NotLeaderForPartitionError(tp))
168+
assert fut.failed()
169+
assert isinstance(fut.exception, NotLeaderForPartitionError)
170+
171+
# Leader == 0, send success
172+
fut = fetcher._send_offset_requests({tp: 0})
173+
assert not fut.is_done
174+
assert mocked_send.called
175+
# Check that we bound the futures correctly to chain success
176+
send_futures.pop().success({tp: (10, 10000)})
177+
assert fut.succeeded()
178+
assert fut.value == {tp: (10, 10000)}
179+
180+
181+
def test__send_offset_requests_multiple_nodes(fetcher, mocker):
182+
tp1 = TopicPartition("topic_send_offset", 1)
183+
tp2 = TopicPartition("topic_send_offset", 2)
184+
tp3 = TopicPartition("topic_send_offset", 3)
185+
tp4 = TopicPartition("topic_send_offset", 4)
186+
mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
187+
send_futures = []
188+
189+
def send_side_effect(node_id, timestamps):
190+
f = Future()
191+
send_futures.append((node_id, timestamps, f))
192+
return f
193+
mocked_send.side_effect = send_side_effect
194+
195+
mocked_leader = mocker.patch.object(
196+
fetcher._client.cluster, "leader_for_partition")
197+
mocked_leader.side_effect = itertools.cycle([0, 1])
198+
199+
# -- All node succeeded case
200+
tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)])
201+
fut = fetcher._send_offset_requests(tss)
202+
assert not fut.is_done
203+
assert mocked_send.call_count == 2
204+
205+
req_by_node = {}
206+
second_future = None
207+
for node, timestamps, f in send_futures:
208+
req_by_node[node] = timestamps
209+
if node == 0:
210+
# Say tp3 does not have any messages so it's missing
211+
f.success({tp1: (11, 1001)})
212+
else:
213+
second_future = f
214+
assert req_by_node == {
215+
0: {tp1: 0, tp3: 0},
216+
1: {tp2: 0, tp4: 0}
217+
}
218+
219+
# We only resolved 1 future so far, so result future is not yet ready
220+
assert not fut.is_done
221+
second_future.success({tp2: (12, 1002), tp4: (14, 1004)})
222+
assert fut.succeeded()
223+
assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)}
224+
225+
# -- First succeeded second not
226+
del send_futures[:]
227+
fut = fetcher._send_offset_requests(tss)
228+
assert len(send_futures) == 2
229+
send_futures[0][2].success({tp1: (11, 1001)})
230+
send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1))
231+
assert fut.failed()
232+
assert isinstance(fut.exception, UnknownTopicOrPartitionError)
233+
234+
# -- First fails second succeeded
235+
del send_futures[:]
236+
fut = fetcher._send_offset_requests(tss)
237+
assert len(send_futures) == 2
238+
send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1))
239+
send_futures[1][2].success({tp1: (11, 1001)})
240+
assert fut.failed()
241+
assert isinstance(fut.exception, UnknownTopicOrPartitionError)
242+
243+
244+
def test__handle_offset_response(fetcher, mocker):
245+
# Broker returns UnsupportedForMessageFormatError, will omit partition
246+
fut = Future()
247+
res = OffsetResponse[1]([
248+
("topic", [(0, 43, -1, -1)]),
249+
("topic", [(1, 0, 1000, 9999)])
250+
])
251+
fetcher._handle_offset_response(fut, res)
252+
assert fut.succeeded()
253+
assert fut.value == {TopicPartition("topic", 1): (9999, 1000)}
254+
255+
# Broker returns NotLeaderForPartitionError
256+
fut = Future()
257+
res = OffsetResponse[1]([
258+
("topic", [(0, 6, -1, -1)]),
259+
])
260+
fetcher._handle_offset_response(fut, res)
261+
assert fut.failed()
262+
assert isinstance(fut.exception, NotLeaderForPartitionError)
263+
264+
# Broker returns UnknownTopicOrPartitionError
265+
fut = Future()
266+
res = OffsetResponse[1]([
267+
("topic", [(0, 3, -1, -1)]),
268+
])
269+
fetcher._handle_offset_response(fut, res)
270+
assert fut.failed()
271+
assert isinstance(fut.exception, UnknownTopicOrPartitionError)
272+
273+
# Broker returns many errors and 1 result
274+
# Will fail on 1st error and return
275+
fut = Future()
276+
res = OffsetResponse[1]([
277+
("topic", [(0, 43, -1, -1)]),
278+
("topic", [(1, 6, -1, -1)]),
279+
("topic", [(2, 3, -1, -1)]),
280+
("topic", [(3, 0, 1000, 9999)])
281+
])
282+
fetcher._handle_offset_response(fut, res)
283+
assert fut.failed()
284+
assert isinstance(fut.exception, NotLeaderForPartitionError)

0 commit comments

Comments
 (0)