Skip to content

Commit 4a0cc37

Browse files
dpkp88manpreet
authored andcommitted
Drop unused sleep kwarg to poll (dpkp#1177)
1 parent 5aecd1b commit 4a0cc37

File tree

5 files changed

+24
-30
lines changed

5 files changed

+24
-30
lines changed

kafka/client_async.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -507,9 +507,6 @@ def poll(self, timeout_ms=None, future=None, sleep=True):
507507
timeout will be the minimum of timeout, request timeout and
508508
metadata timeout. Default: request_timeout_ms
509509
future (Future, optional): if provided, blocks until future.is_done
510-
sleep (bool): if True and there is nothing to do (no connections
511-
or requests in flight), will sleep for duration timeout before
512-
returning empty results. Default: False.
513510
514511
Returns:
515512
list: responses received (can be empty)
@@ -552,7 +549,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True):
552549
self.config['request_timeout_ms'])
553550
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
554551

555-
responses.extend(self._poll(timeout, sleep=sleep))
552+
responses.extend(self._poll(timeout))
556553

557554
# If all we had was a timeout (future is None) - only do one poll
558555
# If we do have a future, we keep looping until it is done
@@ -561,10 +558,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True):
561558

562559
return responses
563560

564-
def _poll(self, timeout, sleep=True):
565-
# select on reads across all connected sockets, blocking up to timeout
566-
assert self.in_flight_request_count() > 0 or self._connecting or sleep
567-
561+
def _poll(self, timeout):
568562
responses = []
569563
processed = set()
570564

kafka/consumer/fetcher.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
275275

276276
if future.exception.invalid_metadata:
277277
refresh_future = self._client.cluster.request_update()
278-
self._client.poll(
279-
future=refresh_future, sleep=True, timeout_ms=remaining_ms)
278+
self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
280279
else:
281280
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
282281

kafka/consumer/group.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ def _poll_once(self, timeout_ms, max_records):
614614
# Send any new fetches (won't resend pending fetches)
615615
self._fetcher.send_fetches()
616616

617-
self._client.poll(timeout_ms=timeout_ms, sleep=True)
617+
self._client.poll(timeout_ms=timeout_ms)
618618
records, _ = self._fetcher.fetched_records(max_records)
619619
return records
620620

@@ -1019,7 +1019,7 @@ def _message_generator(self):
10191019
poll_ms = 1000 * (self._consumer_timeout - time.time())
10201020
if not self._fetcher.in_flight_fetches():
10211021
poll_ms = 0
1022-
self._client.poll(timeout_ms=poll_ms, sleep=True)
1022+
self._client.poll(timeout_ms=poll_ms)
10231023

10241024
# We need to make sure we at least keep up with scheduled tasks,
10251025
# like heartbeats, auto-commits, and metadata refreshes
@@ -1045,6 +1045,8 @@ def _message_generator(self):
10451045
if time.time() > timeout_at:
10461046
log.debug("internal iterator timeout - breaking for poll")
10471047
break
1048+
if self._client.in_flight_request_count():
1049+
self._client.poll(timeout_ms=0)
10481050

10491051
# An else block on a for loop only executes if there was no break
10501052
# so this should only be called on a StopIteration from the fetcher

kafka/producer/sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def run_once(self):
156156
# difference between now and its linger expiry time; otherwise the
157157
# select time will be the time difference between now and the
158158
# metadata expiry time
159-
self._client.poll(poll_timeout_ms, sleep=True)
159+
self._client.poll(poll_timeout_ms)
160160

161161
def initiate_close(self):
162162
"""Start closing the sender (won't complete until all data is sent)."""

test/test_client_async.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -257,23 +257,22 @@ def test_poll(mocker):
257257
metadata.return_value = 1000
258258
tasks.return_value = 2
259259
cli.poll()
260-
_poll.assert_called_with(1.0, sleep=True)
260+
_poll.assert_called_with(1.0)
261261

262262
# user timeout wins
263263
cli.poll(250)
264-
_poll.assert_called_with(0.25, sleep=True)
264+
_poll.assert_called_with(0.25)
265265

266266
# tasks timeout wins
267267
tasks.return_value = 0
268268
cli.poll(250)
269-
_poll.assert_called_with(0, sleep=True)
269+
_poll.assert_called_with(0)
270270

271271
# default is request_timeout_ms
272272
metadata.return_value = 1000000
273273
tasks.return_value = 10000
274274
cli.poll()
275-
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0,
276-
sleep=True)
275+
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
277276

278277

279278
def test__poll():
@@ -313,33 +312,33 @@ def client(mocker):
313312
def test_maybe_refresh_metadata_ttl(mocker, client):
314313
client.cluster.ttl.return_value = 1234
315314

316-
client.poll(timeout_ms=12345678, sleep=True)
317-
client._poll.assert_called_with(1.234, sleep=True)
315+
client.poll(timeout_ms=12345678)
316+
client._poll.assert_called_with(1.234)
318317

319318

320319
def test_maybe_refresh_metadata_backoff(mocker, client):
321320
now = time.time()
322321
t = mocker.patch('time.time')
323322
t.return_value = now
324323

325-
client.poll(timeout_ms=12345678, sleep=True)
326-
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
324+
client.poll(timeout_ms=12345678)
325+
client._poll.assert_called_with(2.222) # reconnect backoff
327326

328327

329328
def test_maybe_refresh_metadata_in_progress(mocker, client):
330329
client._metadata_refresh_in_progress = True
331330

332-
client.poll(timeout_ms=12345678, sleep=True)
333-
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
331+
client.poll(timeout_ms=12345678)
332+
client._poll.assert_called_with(9999.999) # request_timeout_ms
334333

335334

336335
def test_maybe_refresh_metadata_update(mocker, client):
337336
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
338337
mocker.patch.object(client, '_can_send_request', return_value=True)
339338
send = mocker.patch.object(client, 'send')
340339

341-
client.poll(timeout_ms=12345678, sleep=True)
342-
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
340+
client.poll(timeout_ms=12345678)
341+
client._poll.assert_called_with(9999.999) # request_timeout_ms
343342
assert client._metadata_refresh_in_progress
344343
request = MetadataRequest[0]([])
345344
send.assert_called_once_with('foobar', request)
@@ -355,16 +354,16 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
355354
t.return_value = now
356355

357356
# first poll attempts connection
358-
client.poll(timeout_ms=12345678, sleep=True)
359-
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
357+
client.poll(timeout_ms=12345678)
358+
client._poll.assert_called_with(2.222) # reconnect backoff
360359
client._can_connect.assert_called_once_with('foobar')
361360
client._maybe_connect.assert_called_once_with('foobar')
362361

363362
# poll while connecting should not attempt a new connection
364363
client._connecting.add('foobar')
365364
client._can_connect.reset_mock()
366-
client.poll(timeout_ms=12345678, sleep=True)
367-
client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
365+
client.poll(timeout_ms=12345678)
366+
client._poll.assert_called_with(9999.999) # connection timeout (request timeout)
368367
assert not client._can_connect.called
369368

370369
assert not client._metadata_refresh_in_progress

0 commit comments

Comments
 (0)