Skip to content

Commit cbc6fdc

Browse files
authored
Drop unused sleep kwarg to poll (#1177)
1 parent 497ded9 commit cbc6fdc

File tree

5 files changed

+25
-31
lines changed

5 files changed

+25
-31
lines changed

kafka/client_async.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ def send(self, node_id, request):
495495

496496
return self._conns[node_id].send(request)
497497

498-
def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
498+
def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
499499
"""Try to read and write to sockets.
500500
501501
This method will also attempt to complete node connections, refresh
@@ -507,9 +507,6 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
507507
timeout will be the minimum of timeout, request timeout and
508508
metadata timeout. Default: request_timeout_ms
509509
future (Future, optional): if provided, blocks until future.is_done
510-
sleep (bool): if True and there is nothing to do (no connections
511-
or requests in flight), will sleep for duration timeout before
512-
returning empty results. Default: False.
513510
514511
Returns:
515512
list: responses received (can be empty)
@@ -553,7 +550,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
553550
self.config['request_timeout_ms'])
554551
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
555552

556-
responses.extend(self._poll(timeout, sleep=sleep))
553+
responses.extend(self._poll(timeout))
557554

558555
# If all we had was a timeout (future is None) - only do one poll
559556
# If we do have a future, we keep looping until it is done
@@ -562,10 +559,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
562559

563560
return responses
564561

565-
def _poll(self, timeout, sleep=True):
566-
# select on reads across all connected sockets, blocking up to timeout
567-
assert self.in_flight_request_count() > 0 or self._connecting or sleep
568-
562+
def _poll(self, timeout):
569563
responses = []
570564
processed = set()
571565

kafka/consumer/fetcher.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
275275

276276
if future.exception.invalid_metadata:
277277
refresh_future = self._client.cluster.request_update()
278-
self._client.poll(
279-
future=refresh_future, sleep=True, timeout_ms=remaining_ms)
278+
self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
280279
else:
281280
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
282281

kafka/consumer/group.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ def _poll_once(self, timeout_ms, max_records):
613613
# Send any new fetches (won't resend pending fetches)
614614
self._fetcher.send_fetches()
615615

616-
self._client.poll(timeout_ms=timeout_ms, sleep=True)
616+
self._client.poll(timeout_ms=timeout_ms)
617617
records, _ = self._fetcher.fetched_records(max_records)
618618
return records
619619

@@ -1019,7 +1019,7 @@ def _message_generator(self):
10191019
poll_ms = 1000 * (self._consumer_timeout - time.time())
10201020
if not self._fetcher.in_flight_fetches():
10211021
poll_ms = 0
1022-
self._client.poll(timeout_ms=poll_ms, sleep=True)
1022+
self._client.poll(timeout_ms=poll_ms)
10231023

10241024
# We need to make sure we at least keep up with scheduled tasks,
10251025
# like heartbeats, auto-commits, and metadata refreshes
@@ -1045,6 +1045,8 @@ def _message_generator(self):
10451045
if time.time() > timeout_at:
10461046
log.debug("internal iterator timeout - breaking for poll")
10471047
break
1048+
if self._client.in_flight_request_count():
1049+
self._client.poll(timeout_ms=0)
10481050

10491051
# An else block on a for loop only executes if there was no break
10501052
# so this should only be called on a StopIteration from the fetcher

kafka/producer/sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def run_once(self):
156156
# difference between now and its linger expiry time; otherwise the
157157
# select time will be the time difference between now and the
158158
# metadata expiry time
159-
self._client.poll(poll_timeout_ms, sleep=True)
159+
self._client.poll(poll_timeout_ms)
160160

161161
def initiate_close(self):
162162
"""Start closing the sender (won't complete until all data is sent)."""

test/test_client_async.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -259,23 +259,22 @@ def test_poll(mocker):
259259
metadata.return_value = 1000
260260
tasks.return_value = 2
261261
cli.poll()
262-
_poll.assert_called_with(1.0, sleep=True)
262+
_poll.assert_called_with(1.0)
263263

264264
# user timeout wins
265265
cli.poll(250)
266-
_poll.assert_called_with(0.25, sleep=True)
266+
_poll.assert_called_with(0.25)
267267

268268
# tasks timeout wins
269269
tasks.return_value = 0
270270
cli.poll(250)
271-
_poll.assert_called_with(0, sleep=True)
271+
_poll.assert_called_with(0)
272272

273273
# default is request_timeout_ms
274274
metadata.return_value = 1000000
275275
tasks.return_value = 10000
276276
cli.poll()
277-
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0,
278-
sleep=True)
277+
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
279278

280279

281280
def test__poll():
@@ -337,33 +336,33 @@ def client(mocker):
337336
def test_maybe_refresh_metadata_ttl(mocker, client):
338337
client.cluster.ttl.return_value = 1234
339338

340-
client.poll(timeout_ms=12345678, sleep=True)
341-
client._poll.assert_called_with(1.234, sleep=True)
339+
client.poll(timeout_ms=12345678)
340+
client._poll.assert_called_with(1.234)
342341

343342

344343
def test_maybe_refresh_metadata_backoff(mocker, client):
345344
now = time.time()
346345
t = mocker.patch('time.time')
347346
t.return_value = now
348347

349-
client.poll(timeout_ms=12345678, sleep=True)
350-
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
348+
client.poll(timeout_ms=12345678)
349+
client._poll.assert_called_with(2.222) # reconnect backoff
351350

352351

353352
def test_maybe_refresh_metadata_in_progress(mocker, client):
354353
client._metadata_refresh_in_progress = True
355354

356-
client.poll(timeout_ms=12345678, sleep=True)
357-
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
355+
client.poll(timeout_ms=12345678)
356+
client._poll.assert_called_with(9999.999) # request_timeout_ms
358357

359358

360359
def test_maybe_refresh_metadata_update(mocker, client):
361360
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
362361
mocker.patch.object(client, '_can_send_request', return_value=True)
363362
send = mocker.patch.object(client, 'send')
364363

365-
client.poll(timeout_ms=12345678, sleep=True)
366-
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
364+
client.poll(timeout_ms=12345678)
365+
client._poll.assert_called_with(9999.999) # request_timeout_ms
367366
assert client._metadata_refresh_in_progress
368367
request = MetadataRequest[0]([])
369368
send.assert_called_once_with('foobar', request)
@@ -379,16 +378,16 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
379378
t.return_value = now
380379

381380
# first poll attempts connection
382-
client.poll(timeout_ms=12345678, sleep=True)
383-
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
381+
client.poll(timeout_ms=12345678)
382+
client._poll.assert_called_with(2.222) # reconnect backoff
384383
client._can_connect.assert_called_once_with('foobar')
385384
client._maybe_connect.assert_called_once_with('foobar')
386385

387386
# poll while connecting should not attempt a new connection
388387
client._connecting.add('foobar')
389388
client._can_connect.reset_mock()
390-
client.poll(timeout_ms=12345678, sleep=True)
391-
client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
389+
client.poll(timeout_ms=12345678)
390+
client._poll.assert_called_with(9999.999) # connection timeout (request timeout)
392391
assert not client._can_connect.called
393392

394393
assert not client._metadata_refresh_in_progress

0 commit comments

Comments
 (0)