Skip to content

KAFKA-4034: Avoid unnecessary consumer coordinator lookup #1254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ def send_fetches(self):
self._clean_done_fetch_futures()
return futures

def reset_offsets_if_needed(self, partitions):
"""Lookup and set offsets for any partitions which are awaiting an
explicit reset.

Arguments:
partitions (set of TopicPartitions): the partitions to reset
"""
for tp in partitions:
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)

def _clean_done_fetch_futures(self):
while True:
if not self._fetch_futures:
Expand Down Expand Up @@ -167,9 +179,6 @@ def update_fetch_positions(self, partitions):
" update", tp)
continue

# TODO: If there are several offsets to reset,
# we could submit offset requests in parallel
# for now, each call to _reset_offset will block
if self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
elif self._subscriptions.assignment[tp].committed is None:
Expand Down
29 changes: 19 additions & 10 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,12 +585,11 @@ def _poll_once(self, timeout_ms, max_records):
dict: Map of topic to list of records (may be empty).
"""
if self._use_consumer_group():
self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_active_group()

# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_coordinator_ready()

# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
Expand Down Expand Up @@ -835,6 +834,8 @@ def subscription(self):
Returns:
set: {topic, ...}
"""
if self._subscription.subscription is None:
return None
return self._subscription.subscription.copy()

def unsubscribe(self):
Expand Down Expand Up @@ -988,26 +989,34 @@ def _update_fetch_positions(self, partitions):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
if (self.config['api_version'] >= (0, 8, 1) and
self.config['group_id'] is not None):
# Lookup any positions for partitions which are awaiting reset (which may be the
# case if the user called seekToBeginning or seekToEnd. We do this check first to
# avoid an unnecessary lookup of committed offsets (which typically occurs when
# the user is manually assigning partitions and managing their own offsets).
self._fetcher.reset_offsets_if_needed(partitions)

# Refresh commits for all assigned partitions
self._coordinator.refresh_committed_offsets_if_needed()
if not self._subscription.has_all_fetch_positions():
# if we still don't have offsets for all partitions, then we should either seek
# to the last committed position or reset using the auto reset policy
if (self.config['api_version'] >= (0, 8, 1) and
self.config['group_id'] is not None):
# first refresh commits for all assigned partitions
self._coordinator.refresh_committed_offsets_if_needed()

# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)

def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:

if self._use_consumer_group():
self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_coordinator_ready()
self._coordinator.ensure_active_group()

# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_coordinator_ready()

# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
Expand Down
23 changes: 19 additions & 4 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(self, client, metrics, **configs):
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = self.config['group_id']
self.coordinator_id = None
self._find_coordinator_future = None
self.rejoin_needed = True
self.rejoining = False
self.heartbeat = Heartbeat(**self.config)
Expand Down Expand Up @@ -195,12 +196,11 @@ def coordinator_unknown(self):

return False

def ensure_coordinator_known(self):
def ensure_coordinator_ready(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
while self.coordinator_unknown():

# Prior to 0.8.2 there was no group coordinator
# so we will just pick a node at random and treat
# it as the "coordinator"
Expand All @@ -210,7 +210,7 @@ def ensure_coordinator_known(self):
self._client.ready(self.coordinator_id)
continue

future = self._send_group_coordinator_request()
future = self.lookup_coordinator()
self._client.poll(future=future)

if future.failed():
Expand All @@ -224,6 +224,16 @@ def ensure_coordinator_known(self):
else:
raise future.exception # pylint: disable-msg=raising-bad-type

def _reset_find_coordinator_future(self, result):
self._find_coordinator_future = None

def lookup_coordinator(self):
if self._find_coordinator_future is None:
self._find_coordinator_future = self._send_group_coordinator_request()

self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
return self._find_coordinator_future

def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)

Expand All @@ -234,6 +244,11 @@ def need_rejoin(self):

def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
# always ensure that the coordinator is ready because we may have been
# disconnected when sending heartbeats and does not necessarily require
# us to rejoin the group.
self.ensure_coordinator_ready()

if not self.need_rejoin():
return

Expand All @@ -242,7 +257,7 @@ def ensure_active_group(self):
self.rejoining = True

while self.need_rejoin():
self.ensure_coordinator_known()
self.ensure_coordinator_ready()

# ensure that there are no pending requests to the coordinator.
# This is important in particular to avoid resending a pending
Expand Down
28 changes: 24 additions & 4 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def fetch_committed_offsets(self, partitions):
return {}

while True:
self.ensure_coordinator_known()
self.ensure_coordinator_ready()

# contact coordinator to fetch committed offsets
future = self._send_offset_fetch_request(partitions)
Expand Down Expand Up @@ -353,9 +353,29 @@ def commit_offsets_async(self, offsets, callback=None):
response will be either an Exception or a OffsetCommitResponse
struct. This callback can be used to trigger custom actions when
a commit request completes.
Returns:
Future: indicating whether the commit was successful or not
"""
if not self.coordinator_unknown():
self._do_commit_offsets_async(offsets, callback)
else:
# we don't know the current coordinator, so try to find it and then
# send the commit or fail (we don't want recursive retries which can
# cause offset commits to arrive out of order). Note that there may
# be multiple offset commits chained to the same coordinator lookup
# request. This is fine because the listeners will be invoked in the
# same order that they were added. Note also that BaseCoordinator
# prevents multiple concurrent coordinator lookup requests.
future = self.lookup_coordinator()
future.add_callback(self._do_commit_offsets_async, offsets, callback)
if callback:
future.add_errback(callback)

# ensure the commit has a chance to be transmitted (without blocking on
# its completion). Note that commits are treated as heartbeats by the
# coordinator, so there is no need to explicitly allow heartbeats
# through delayed task execution.
self._client.poll() # no wakeup if we add that feature

def _do_commit_offsets_async(self, offsets, callback=None):
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
Expand Down Expand Up @@ -386,7 +406,7 @@ def commit_offsets_sync(self, offsets):
return

while True:
self.ensure_coordinator_known()
self.ensure_coordinator_ready()

future = self._send_offset_commit_request(offsets)
self._client.poll(future=future)
Expand Down
9 changes: 4 additions & 5 deletions test/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def test_fetch_committed_offsets(mocker, coordinator):
assert coordinator._client.poll.call_count == 0

# general case -- send offset fetch request, get successful future
mocker.patch.object(coordinator, 'ensure_coordinator_known')
mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_fetch_request',
return_value=Future().success('foobar'))
partitions = [TopicPartition('foobar', 0)]
Expand Down Expand Up @@ -295,16 +295,15 @@ def offsets():

def test_commit_offsets_async(mocker, coordinator, offsets):
mocker.patch.object(coordinator._client, 'poll')
mocker.patch.object(coordinator, 'ensure_coordinator_known')
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
ret = coordinator.commit_offsets_async(offsets)
assert isinstance(ret, Future)
coordinator.commit_offsets_async(offsets)
assert coordinator._send_offset_commit_request.call_count == 1


def test_commit_offsets_sync(mocker, coordinator, offsets):
mocker.patch.object(coordinator, 'ensure_coordinator_known')
mocker.patch.object(coordinator, 'ensure_coordinator_ready')
mocker.patch.object(coordinator, '_send_offset_commit_request',
return_value=Future().success('fizzbuzz'))
cli = coordinator._client
Expand Down