Skip to content

Support 2.1 baseline consumer group apis #2503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def _send_join_group_request(self):
(protocol, metadata if isinstance(metadata, bytes) else metadata.encode())
for protocol, metadata in self.group_protocols()
]
version = self._client.api_version(JoinGroupRequest, max_version=2)
version = self._client.api_version(JoinGroupRequest, max_version=3)
if version == 0:
request = JoinGroupRequest[version](
self.group_id,
Expand Down Expand Up @@ -492,6 +492,11 @@ def _failed_request(self, node_id, request, future, error):
future.failure(error)

def _handle_join_group_response(self, future, send_time, response):
if response.API_VERSION >= 2:
self.sensors.throttle_time.record(response.throttle_time_ms)
if response.throttle_time_ms > 0:
log.warning("JoinGroupRequest throttled by broker (%d ms)", response.throttle_time_ms)

error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
Expand Down Expand Up @@ -553,7 +558,7 @@ def _handle_join_group_response(self, future, send_time, response):

def _on_join_follower(self):
# send follower's sync group with an empty assignment
version = self._client.api_version(SyncGroupRequest, max_version=1)
version = self._client.api_version(SyncGroupRequest, max_version=2)
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
Expand Down Expand Up @@ -581,7 +586,7 @@ def _on_join_leader(self, response):
except Exception as e:
return Future().failure(e)

version = self._client.api_version(SyncGroupRequest, max_version=1)
version = self._client.api_version(SyncGroupRequest, max_version=2)
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
Expand Down Expand Up @@ -613,6 +618,11 @@ def _send_sync_group_request(self, request):
return future

def _handle_sync_group_response(self, future, send_time, response):
if response.API_VERSION >= 1:
self.sensors.throttle_time.record(response.throttle_time_ms)
if response.throttle_time_ms > 0:
log.warning("SyncGroupRequest throttled by broker (%d ms)", response.throttle_time_ms)

error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
Expand Down Expand Up @@ -762,7 +772,7 @@ def maybe_leave_group(self):
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
log.info('Leaving consumer group (%s).', self.group_id)
version = self._client.api_version(LeaveGroupRequest, max_version=1)
version = self._client.api_version(LeaveGroupRequest, max_version=2)
request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
future = self._client.send(self.coordinator_id, request)
future.add_callback(self._handle_leave_group_response)
Expand All @@ -772,6 +782,11 @@ def maybe_leave_group(self):
self.reset_generation()

def _handle_leave_group_response(self, response):
if response.API_VERSION >= 1:
self.sensors.throttle_time.record(response.throttle_time_ms)
if response.throttle_time_ms > 0:
log.warning("LeaveGroupRequest throttled by broker (%d ms)", response.throttle_time_ms)

error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("LeaveGroup request for group %s returned successfully",
Expand All @@ -790,7 +805,7 @@ def _send_heartbeat_request(self):
e = Errors.NodeNotReadyError(self.coordinator_id)
return Future().failure(e)

version = self._client.api_version(HeartbeatRequest, max_version=1)
version = self._client.api_version(HeartbeatRequest, max_version=2)
request = HeartbeatRequest[version](self.group_id,
self._generation.generation_id,
self._generation.member_id)
Expand All @@ -803,6 +818,11 @@ def _send_heartbeat_request(self):
return future

def _handle_heartbeat_response(self, future, send_time, response):
if response.API_VERSION >= 1:
self.sensors.throttle_time.record(response.throttle_time_ms)
if response.throttle_time_ms > 0:
log.warning("HeartbeatRequest throttled by broker (%d ms)", response.throttle_time_ms)

self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
Expand Down Expand Up @@ -891,6 +911,14 @@ def __init__(self, heartbeat, metrics, prefix, tags=None):
tags), AnonMeasurable(
lambda _, now: (now / 1000) - self.heartbeat.last_send))

self.throttle_time = metrics.sensor('throttle-time')
self.throttle_time.add(metrics.metric_name(
'throttle-time-avg', self.metric_group_name,
'The average throttle time in ms'), Avg())
self.throttle_time.add(metrics.metric_name(
'throttle-time-max', self.metric_group_name,
'The maximum throttle time in ms'), Max())


class HeartbeatThread(threading.Thread):
def __init__(self, coordinator):
Expand Down
69 changes: 61 additions & 8 deletions kafka/protocol/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ class JoinGroupResponse_v2(Response):
)


class JoinGroupResponse_v3(Response):
API_KEY = 11
API_VERSION = 3
SCHEMA = JoinGroupResponse_v2.SCHEMA


class JoinGroupRequest_v0(Request):
API_KEY = 11
API_VERSION = 0
Expand Down Expand Up @@ -83,11 +89,19 @@ class JoinGroupRequest_v2(Request):
UNKNOWN_MEMBER_ID = ''


class JoinGroupRequest_v3(Request):
API_KEY = 11
API_VERSION = 3
RESPONSE_TYPE = JoinGroupResponse_v3
SCHEMA = JoinGroupRequest_v2.SCHEMA
UNKNOWN_MEMBER_ID = ''


JoinGroupRequest = [
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, JoinGroupRequest_v3
]
JoinGroupResponse = [
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2, JoinGroupResponse_v3
]


Expand Down Expand Up @@ -118,6 +132,12 @@ class SyncGroupResponse_v1(Response):
)


class SyncGroupResponse_v2(Response):
API_KEY = 14
API_VERSION = 2
SCHEMA = SyncGroupResponse_v1.SCHEMA


class SyncGroupRequest_v0(Request):
API_KEY = 14
API_VERSION = 0
Expand All @@ -139,8 +159,15 @@ class SyncGroupRequest_v1(Request):
SCHEMA = SyncGroupRequest_v0.SCHEMA


SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1]
SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1]
class SyncGroupRequest_v2(Request):
API_KEY = 14
API_VERSION = 2
RESPONSE_TYPE = SyncGroupResponse_v2
SCHEMA = SyncGroupRequest_v1.SCHEMA


SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, SyncGroupRequest_v2]
SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v2]


class MemberAssignment(Struct):
Expand Down Expand Up @@ -170,6 +197,12 @@ class HeartbeatResponse_v1(Response):
)


class HeartbeatResponse_v2(Response):
API_KEY = 12
API_VERSION = 2
SCHEMA = HeartbeatResponse_v1.SCHEMA


class HeartbeatRequest_v0(Request):
API_KEY = 12
API_VERSION = 0
Expand All @@ -188,8 +221,15 @@ class HeartbeatRequest_v1(Request):
SCHEMA = HeartbeatRequest_v0.SCHEMA


HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1]
HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1]
class HeartbeatRequest_v2(Request):
API_KEY = 12
API_VERSION = 2
RESPONSE_TYPE = HeartbeatResponse_v2
SCHEMA = HeartbeatRequest_v1.SCHEMA


HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1, HeartbeatRequest_v2]
HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1, HeartbeatResponse_v2]


class LeaveGroupResponse_v0(Response):
Expand All @@ -209,6 +249,12 @@ class LeaveGroupResponse_v1(Response):
)


class LeaveGroupResponse_v2(Response):
API_KEY = 13
API_VERSION = 2
SCHEMA = LeaveGroupResponse_v1.SCHEMA


class LeaveGroupRequest_v0(Request):
API_KEY = 13
API_VERSION = 0
Expand All @@ -226,5 +272,12 @@ class LeaveGroupRequest_v1(Request):
SCHEMA = LeaveGroupRequest_v0.SCHEMA


LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1]
LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1]
class LeaveGroupRequest_v2(Request):
API_KEY = 13
API_VERSION = 2
RESPONSE_TYPE = LeaveGroupResponse_v2
SCHEMA = LeaveGroupRequest_v1.SCHEMA


LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1, LeaveGroupRequest_v2]
LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1, LeaveGroupResponse_v2]