Skip to content

Drop unused sleep kwarg to KafkaClient.poll #1177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ def send(self, node_id, request):

return self._conns[node_id].send(request)

def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
"""Try to read and write to sockets.

This method will also attempt to complete node connections, refresh
Expand All @@ -507,9 +507,6 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
timeout will be the minimum of timeout, request timeout and
metadata timeout. Default: request_timeout_ms
future (Future, optional): if provided, blocks until future.is_done
sleep (bool): if True and there is nothing to do (no connections
or requests in flight), will sleep for duration timeout before
returning empty results. Default: False.

Returns:
list: responses received (can be empty)
Expand Down Expand Up @@ -553,7 +550,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
self.config['request_timeout_ms'])
timeout = max(0, timeout / 1000.0) # avoid negative timeouts

responses.extend(self._poll(timeout, sleep=sleep))
responses.extend(self._poll(timeout))

# If all we had was a timeout (future is None) - only do one poll
# If we do have a future, we keep looping until it is done
Expand All @@ -562,10 +559,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):

return responses

def _poll(self, timeout, sleep=True):
# select on reads across all connected sockets, blocking up to timeout
assert self.in_flight_request_count() > 0 or self._connecting or sleep

def _poll(self, timeout):
responses = []
processed = set()

Expand Down
3 changes: 1 addition & 2 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):

if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
self._client.poll(
future=refresh_future, sleep=True, timeout_ms=remaining_ms)
self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
else:
time.sleep(self.config['retry_backoff_ms'] / 1000.0)

Expand Down
6 changes: 4 additions & 2 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ def _poll_once(self, timeout_ms, max_records):
# Send any new fetches (won't resend pending fetches)
self._fetcher.send_fetches()

self._client.poll(timeout_ms=timeout_ms, sleep=True)
self._client.poll(timeout_ms=timeout_ms)
records, _ = self._fetcher.fetched_records(max_records)
return records

Expand Down Expand Up @@ -1019,7 +1019,7 @@ def _message_generator(self):
poll_ms = 1000 * (self._consumer_timeout - time.time())
if not self._fetcher.in_flight_fetches():
poll_ms = 0
self._client.poll(timeout_ms=poll_ms, sleep=True)
self._client.poll(timeout_ms=poll_ms)

# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
Expand All @@ -1045,6 +1045,8 @@ def _message_generator(self):
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
if self._client.in_flight_request_count():
self._client.poll(timeout_ms=0)

# An else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def run_once(self):
# difference between now and its linger expiry time; otherwise the
# select time will be the time difference between now and the
# metadata expiry time
self._client.poll(poll_timeout_ms, sleep=True)
self._client.poll(poll_timeout_ms)

def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""
Expand Down
33 changes: 16 additions & 17 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,23 +259,22 @@ def test_poll(mocker):
metadata.return_value = 1000
tasks.return_value = 2
cli.poll()
_poll.assert_called_with(1.0, sleep=True)
_poll.assert_called_with(1.0)

# user timeout wins
cli.poll(250)
_poll.assert_called_with(0.25, sleep=True)
_poll.assert_called_with(0.25)

# tasks timeout wins
tasks.return_value = 0
cli.poll(250)
_poll.assert_called_with(0, sleep=True)
_poll.assert_called_with(0)

# default is request_timeout_ms
metadata.return_value = 1000000
tasks.return_value = 10000
cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0,
sleep=True)
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)


def test__poll():
Expand Down Expand Up @@ -337,33 +336,33 @@ def client(mocker):
def test_maybe_refresh_metadata_ttl(mocker, client):
client.cluster.ttl.return_value = 1234

client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(1.234, sleep=True)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(1.234)


def test_maybe_refresh_metadata_backoff(mocker, client):
now = time.time()
t = mocker.patch('time.time')
t.return_value = now

client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(2.222) # reconnect backoff


def test_maybe_refresh_metadata_in_progress(mocker, client):
client._metadata_refresh_in_progress = True

client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(9999.999) # request_timeout_ms


def test_maybe_refresh_metadata_update(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_send_request', return_value=True)
send = mocker.patch.object(client, 'send')

client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(9999.999) # request_timeout_ms
assert client._metadata_refresh_in_progress
request = MetadataRequest[0]([])
send.assert_called_once_with('foobar', request)
Expand All @@ -379,16 +378,16 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
t.return_value = now

# first poll attempts connection
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(2.222) # reconnect backoff
client._can_connect.assert_called_once_with('foobar')
client._maybe_connect.assert_called_once_with('foobar')

# poll while connecting should not attempt a new connection
client._connecting.add('foobar')
client._can_connect.reset_mock()
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(9999.999) # connection timeout (request timeout)
assert not client._can_connect.called

assert not client._metadata_refresh_in_progress
Expand Down