Skip to content

Commit 50640a3

Browse files
committed
KAFKA-3888: use background thread for consumer heartbeats
1 parent f356f01 commit 50640a3

File tree

13 files changed

+905
-713
lines changed

13 files changed

+905
-713
lines changed

kafka/client_async.py

Lines changed: 182 additions & 282 deletions
Large diffs are not rendered by default.

kafka/consumer/fetcher.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,9 @@ def _create_fetch_requests(self):
669669
fetchable[node_id][partition.topic].append(partition_info)
670670
log.debug("Adding fetch request for partition %s at offset %d",
671671
partition, position)
672+
else:
673+
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
674+
partition, node_id)
672675

673676
if self.config['api_version'] >= (0, 10, 1):
674677
version = 3

kafka/consumer/group.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from __future__ import absolute_import
1+
from __future__ import absolute_import, division
22

33
import copy
44
import logging
@@ -125,19 +125,34 @@ class KafkaConsumer(six.Iterator):
125125
distribute partition ownership amongst consumer instances when
126126
group management is used.
127127
Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
128+
max_poll_records (int): The maximum number of records returned in a
129+
single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
130+
max_poll_interval_ms (int): The maximum delay between invocations of
131+
:meth:`~kafka.KafkaConsumer.poll` when using consumer group
132+
management. This places an upper bound on the amount of time that
133+
the consumer can be idle before fetching more records. If
134+
:meth:`~kafka.KafkaConsumer.poll` is not called before expiration
135+
of this timeout, then the consumer is considered failed and the
136+
group will rebalance in order to reassign the partitions to another
137+
member. Default 300000
138+
session_timeout_ms (int): The timeout used to detect failures when
139+
using Kafka's group management facilities. The consumer sends
140+
periodic heartbeats to indicate its liveness to the broker. If
141+
no heartbeats are received by the broker before the expiration of
142+
this session timeout, then the broker will remove this consumer
143+
from the group and initiate a rebalance. Note that the value must
144+
be in the allowable range as configured in the broker configuration
145+
by group.min.session.timeout.ms and group.max.session.timeout.ms.
146+
Default: 10000
128147
heartbeat_interval_ms (int): The expected time in milliseconds
129148
between heartbeats to the consumer coordinator when using
130-
Kafka's group management feature. Heartbeats are used to ensure
149+
Kafka's group management facilities. Heartbeats are used to ensure
131150
that the consumer's session stays active and to facilitate
132151
rebalancing when new consumers join or leave the group. The
133152
value must be set lower than session_timeout_ms, but typically
134153
should be set no higher than 1/3 of that value. It can be
135154
adjusted even lower to control the expected time for normal
136155
rebalances. Default: 3000
137-
session_timeout_ms (int): The timeout used to detect failures when
138-
using Kafka's group management facilities. Default: 30000
139-
max_poll_records (int): The maximum number of records returned in a
140-
single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
141156
receive_buffer_bytes (int): The size of the TCP receive buffer
142157
(SO_RCVBUF) to use when reading data. Default: None (relies on
143158
system defaults). The java client defaults to 32768.
@@ -236,7 +251,7 @@ class KafkaConsumer(six.Iterator):
236251
'fetch_min_bytes': 1,
237252
'fetch_max_bytes': 52428800,
238253
'max_partition_fetch_bytes': 1 * 1024 * 1024,
239-
'request_timeout_ms': 40 * 1000,
254+
'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
240255
'retry_backoff_ms': 100,
241256
'reconnect_backoff_ms': 50,
242257
'reconnect_backoff_max_ms': 1000,
@@ -248,9 +263,10 @@ class KafkaConsumer(six.Iterator):
248263
'check_crcs': True,
249264
'metadata_max_age_ms': 5 * 60 * 1000,
250265
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
251-
'heartbeat_interval_ms': 3000,
252-
'session_timeout_ms': 30000,
253266
'max_poll_records': 500,
267+
'max_poll_interval_ms': 300000,
268+
'session_timeout_ms': 10000, # XXX should be 30000 if < 0.11
269+
'heartbeat_interval_ms': 3000,
254270
'receive_buffer_bytes': None,
255271
'send_buffer_bytes': None,
256272
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
@@ -281,12 +297,13 @@ class KafkaConsumer(six.Iterator):
281297

282298
def __init__(self, *topics, **configs):
283299
self.config = copy.copy(self.DEFAULT_CONFIG)
300+
configs_copy = copy.copy(configs)
284301
for key in self.config:
285302
if key in configs:
286-
self.config[key] = configs.pop(key)
303+
self.config[key] = configs_copy.pop(key)
287304

288305
# Only check for extra config keys in top-level class
289-
assert not configs, 'Unrecognized configs: %s' % configs
306+
assert not configs_copy, 'Unrecognized configs: %s' % configs_copy
290307

291308
deprecated = {'smallest': 'earliest', 'largest': 'latest'}
292309
if self.config['auto_offset_reset'] in deprecated:
@@ -298,7 +315,7 @@ def __init__(self, *topics, **configs):
298315
request_timeout_ms = self.config['request_timeout_ms']
299316
session_timeout_ms = self.config['session_timeout_ms']
300317
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
301-
if request_timeout_ms <= session_timeout_ms:
318+
if self.config['group_id'] is not None and request_timeout_ms <= session_timeout_ms:
302319
raise KafkaConfigurationError(
303320
"Request timeout (%s) must be larger than session timeout (%s)" %
304321
(request_timeout_ms, session_timeout_ms))
@@ -588,7 +605,7 @@ def _poll_once(self, timeout_ms, max_records):
588605
dict: Map of topic to list of records (may be empty).
589606
"""
590607
if self._use_consumer_group():
591-
self._coordinator.ensure_active_group()
608+
self._coordinator.poll()
592609

593610
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
594611
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
@@ -614,6 +631,7 @@ def _poll_once(self, timeout_ms, max_records):
614631
# Send any new fetches (won't resend pending fetches)
615632
self._fetcher.send_fetches()
616633

634+
timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll())
617635
self._client.poll(timeout_ms=timeout_ms)
618636
records, _ = self._fetcher.fetched_records(max_records)
619637
return records
@@ -1015,8 +1033,7 @@ def _message_generator(self):
10151033
while time.time() < self._consumer_timeout:
10161034

10171035
if self._use_consumer_group():
1018-
self._coordinator.ensure_coordinator_ready()
1019-
self._coordinator.ensure_active_group()
1036+
self._coordinator.poll()
10201037

10211038
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
10221039
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
@@ -1068,7 +1085,6 @@ def _message_generator(self):
10681085

10691086
def _next_timeout(self):
10701087
timeout = min(self._consumer_timeout,
1071-
self._client._delayed_tasks.next_at() + time.time(),
10721088
self._client.cluster.ttl() / 1000.0 + time.time())
10731089

10741090
# Although the delayed_tasks timeout above should cover processing
@@ -1079,7 +1095,7 @@ def _next_timeout(self):
10791095
# the next heartbeat from being sent. This check should help
10801096
# avoid that.
10811097
if self._use_consumer_group():
1082-
heartbeat = time.time() + self._coordinator.heartbeat.ttl()
1098+
heartbeat = time.time() + self._coordinator.heartbeat.time_to_next_heartbeat()
10831099
timeout = min(timeout, heartbeat)
10841100
return timeout
10851101

0 commit comments

Comments
 (0)