Skip to content

Commit ea6de4c

Browse files
committed
Updates from review
1 parent 5bdfc84 commit ea6de4c

File tree

6 files changed

+16
-25
lines changed

6 files changed

+16
-25
lines changed

kafka/client_async.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -732,14 +732,13 @@ def add_topic(self, topic):
732732
self._topics.add(topic)
733733
return self.cluster.request_update()
734734

735-
# request metadata update on disconnect and timedout
735+
# This method should be locked when running multi-threaded
736736
def _maybe_refresh_metadata(self):
737737
"""Send a metadata request if needed.
738738
739739
Returns:
740740
int: milliseconds until next refresh
741741
"""
742-
# This should be locked when running multi-threaded
743742
ttl = self.cluster.ttl()
744743
wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0
745744
metadata_timeout = max(ttl, wait_for_in_progress_ms)

kafka/conn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,7 @@ def can_send_more(self):
716716
def recv(self):
717717
"""Non-blocking network receive.
718718
719-
Return list of (response, future)
719+
Return list of (response, future) tuples
720720
"""
721721
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
722722
log.warning('%s cannot recv: socket not connected', self)

kafka/coordinator/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,10 +286,10 @@ def poll_heartbeat(self):
286286
"""
287287
Check the status of the heartbeat thread (if it is active) and indicate
288288
the liveness of the client. This must be called periodically after
289-
joining with :meth:`.ensureActiveGroup` to ensure that the member stays
289+
joining with :meth:`.ensure_active_group` to ensure that the member stays
290290
in the group. If an interval of time longer than the provided rebalance
291-
timeout expires without calling this method, then the client will
292-
proactively leave the group.
291+
timeout (max_poll_interval_ms) expires without calling this method, then
292+
the client will proactively leave the group.
293293
294294
Raises: RuntimeError for unexpected errors raised from the heartbeat thread
295295
"""
@@ -324,7 +324,7 @@ def _handle_join_success(self, member_assignment_bytes):
324324
self._generation.protocol,
325325
member_assignment_bytes)
326326

327-
def _handle_join_failure(self, exception):
327+
def _handle_join_failure(self, _):
328328
with self._lock:
329329
self.join_future = None
330330
self.state = MemberState.UNJOINED

kafka/coordinator/consumer.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ def poll(self):
251251
# track of the fact that we need to rebalance again to reflect the
252252
# change to the topic subscription. Without ensuring that the
253253
# metadata is fresh, any metadata update that changes the topic
254-
# subscriptions and arrives with a rebalance in progress will
254+
# subscriptions and arrives while a rebalance is in progress will
255255
# essentially be ignored. See KAFKA-3949 for the complete
256256
# description of the problem.
257257
if self._subscription.subscribed_pattern:
@@ -264,11 +264,7 @@ def poll(self):
264264
self._maybe_auto_commit_offsets_async()
265265

266266
def time_to_next_poll(self):
267-
"""
268-
Return the time to the next needed invocation of {@link #poll(long)}.
269-
@param now current time in milliseconds
270-
@return the maximum time in milliseconds the caller should wait before the next invocation of poll()
271-
"""
267+
"""Return seconds (float) remaining until :meth:`.poll` should be called again"""
272268
if not self.config['enable_auto_commit']:
273269
return self.time_to_next_heartbeat()
274270

@@ -396,12 +392,9 @@ def close(self, autocommit=True):
396392
super(ConsumerCoordinator, self).close()
397393

398394
def _invoke_completed_offset_commit_callbacks(self):
399-
try:
400-
while True:
401-
callback, offsets, exception = self.completed_offset_commits.popleft()
402-
callback(offsets, exception)
403-
except IndexError:
404-
pass
395+
while self.completed_offset_commits:
396+
callback, offsets, exception = self.completed_offset_commits.popleft()
397+
callback(offsets, exception)
405398

406399
def commit_offsets_async(self, offsets, callback=None):
407400
"""Commit specific offsets asynchronously.

kafka/coordinator/heartbeat.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def received_heartbeat(self):
4444
self.last_receive = time.time()
4545

4646
def time_to_next_heartbeat(self):
47+
"""Returns seconds (float) remaining before next heartbeat should be sent"""
4748
time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset)
4849
if self.heartbeat_failed:
4950
delay_to_next_heartbeat = self.config['retry_backoff_ms'] / 1000
@@ -58,9 +59,6 @@ def session_timeout_expired(self):
5859
last_recv = max(self.last_receive, self.last_reset)
5960
return (time.time() - last_recv) > (self.config['session_timeout_ms'] / 1000)
6061

61-
def interval(self):
62-
return self.config['heartbeat_interval_ms'] / 1000
63-
6462
def reset_timeouts(self):
6563
self.last_reset = time.time()
6664
self.last_poll = time.time()

kafka/errors.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,12 @@ def __init__(self, *args, **kwargs):
6464
"""Commit cannot be completed since the group has already
6565
rebalanced and assigned the partitions to another member.
6666
This means that the time between subsequent calls to poll()
67-
was longer than the configured max.poll.interval.ms, which
67+
was longer than the configured max_poll_interval_ms, which
6868
typically implies that the poll loop is spending too much
6969
time message processing. You can address this either by
70-
increasing the session timeout or by reducing the maximum
71-
size of batches returned in poll() with max.poll.records.
70+
increasing the rebalance timeout with max_poll_interval_ms,
71+
or by reducing the maximum size of batches returned in poll()
72+
with max_poll_records.
7273
""", *args, **kwargs)
7374

7475

0 commit comments

Comments
 (0)