diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index d18de0743..4c80188b8 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -452,7 +452,7 @@ def _send_join_group_request(self): (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) for protocol, metadata in self.group_protocols() ] - version = self._client.api_version(JoinGroupRequest, max_version=2) + version = self._client.api_version(JoinGroupRequest, max_version=3) if version == 0: request = JoinGroupRequest[version]( self.group_id, @@ -492,6 +492,11 @@ def _failed_request(self, node_id, request, future, error): future.failure(error) def _handle_join_group_response(self, future, send_time, response): + if response.API_VERSION >= 2: + self.sensors.throttle_time.record(response.throttle_time_ms) + if response.throttle_time_ms > 0: + log.warning("JoinGroupRequest throttled by broker (%d ms)", response.throttle_time_ms) + error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Received successful JoinGroup response for group %s: %s", @@ -553,7 +558,7 @@ def _handle_join_group_response(self, future, send_time, response): def _on_join_follower(self): # send follower's sync group with an empty assignment - version = self._client.api_version(SyncGroupRequest, max_version=1) + version = self._client.api_version(SyncGroupRequest, max_version=2) request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, @@ -581,7 +586,7 @@ def _on_join_leader(self, response): except Exception as e: return Future().failure(e) - version = self._client.api_version(SyncGroupRequest, max_version=1) + version = self._client.api_version(SyncGroupRequest, max_version=2) request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, @@ -613,6 +618,11 @@ def _send_sync_group_request(self, request): return future def _handle_sync_group_response(self, future, send_time, response): + if response.API_VERSION >= 1: + self.sensors.throttle_time.record(response.throttle_time_ms) + if response.throttle_time_ms > 0: + log.warning("SyncGroupRequest throttled by broker (%d ms)", response.throttle_time_ms) + error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: self.sensors.sync_latency.record((time.time() - send_time) * 1000) @@ -762,7 +772,7 @@ def maybe_leave_group(self): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. log.info('Leaving consumer group (%s).', self.group_id) - version = self._client.api_version(LeaveGroupRequest, max_version=1) + version = self._client.api_version(LeaveGroupRequest, max_version=2) request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) @@ -772,6 +782,11 @@ def maybe_leave_group(self): self.reset_generation() def _handle_leave_group_response(self, response): + if response.API_VERSION >= 1: + self.sensors.throttle_time.record(response.throttle_time_ms) + if response.throttle_time_ms > 0: + log.warning("LeaveGroupRequest throttled by broker (%d ms)", response.throttle_time_ms) + error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("LeaveGroup request for group %s returned successfully", @@ -790,7 +805,7 @@ def _send_heartbeat_request(self): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - version = self._client.api_version(HeartbeatRequest, max_version=1) + version = self._client.api_version(HeartbeatRequest, max_version=2) request = HeartbeatRequest[version](self.group_id, self._generation.generation_id, self._generation.member_id) @@ -803,6 +818,11 @@ def _send_heartbeat_request(self): return future def _handle_heartbeat_response(self, future, send_time, response): + if response.API_VERSION >= 1: + self.sensors.throttle_time.record(response.throttle_time_ms) + if response.throttle_time_ms > 0: + log.warning("HeartbeatRequest throttled by broker (%d ms)", response.throttle_time_ms) + self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: @@ -891,6 +911,14 @@ def __init__(self, heartbeat, metrics, prefix, tags=None): tags), AnonMeasurable( lambda _, now: (now / 1000) - self.heartbeat.last_send)) + self.throttle_time = metrics.sensor('throttle-time') + self.throttle_time.add(metrics.metric_name( + 'throttle-time-avg', self.metric_group_name, + 'The average throttle time in ms'), Avg()) + self.throttle_time.add(metrics.metric_name( + 'throttle-time-max', self.metric_group_name, + 'The maximum throttle time in ms'), Max()) + class HeartbeatThread(threading.Thread): def __init__(self, coordinator): diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index bcb96553b..ee06141e6 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -42,6 +42,12 @@ class JoinGroupResponse_v2(Response): ) +class JoinGroupResponse_v3(Response): + API_KEY = 11 + API_VERSION = 3 + SCHEMA = JoinGroupResponse_v2.SCHEMA + + class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 @@ -83,11 +89,19 @@ class JoinGroupRequest_v2(Request): UNKNOWN_MEMBER_ID = '' +class JoinGroupRequest_v3(Request): + API_KEY = 11 + API_VERSION = 3 + RESPONSE_TYPE = JoinGroupResponse_v3 + SCHEMA = JoinGroupRequest_v2.SCHEMA + UNKNOWN_MEMBER_ID = '' + + JoinGroupRequest = [ - JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2 + JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, JoinGroupRequest_v3 ] JoinGroupResponse = [ - JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2 + JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2, JoinGroupResponse_v3 ] @@ -118,6 +132,12 @@ class SyncGroupResponse_v1(Response): ) +class SyncGroupResponse_v2(Response): + API_KEY = 14 + API_VERSION = 2 + SCHEMA = SyncGroupResponse_v1.SCHEMA + + class SyncGroupRequest_v0(Request): API_KEY = 14 API_VERSION = 0 @@ -139,8 +159,15 @@ class SyncGroupRequest_v1(Request): SCHEMA = SyncGroupRequest_v0.SCHEMA -SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1] -SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1] +class SyncGroupRequest_v2(Request): + API_KEY = 14 + API_VERSION = 2 + RESPONSE_TYPE = SyncGroupResponse_v2 + SCHEMA = SyncGroupRequest_v1.SCHEMA + + +SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, SyncGroupRequest_v2] +SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v2] class MemberAssignment(Struct): @@ -170,6 +197,12 @@ class HeartbeatResponse_v1(Response): ) +class HeartbeatResponse_v2(Response): + API_KEY = 12 + API_VERSION = 2 + SCHEMA = HeartbeatResponse_v1.SCHEMA + + class HeartbeatRequest_v0(Request): API_KEY = 12 API_VERSION = 0 @@ -188,8 +221,15 @@ class HeartbeatRequest_v1(Request): SCHEMA = HeartbeatRequest_v0.SCHEMA -HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1] -HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1] +class HeartbeatRequest_v2(Request): + API_KEY = 12 + API_VERSION = 2 + RESPONSE_TYPE = HeartbeatResponse_v2 + SCHEMA = HeartbeatRequest_v1.SCHEMA + + +HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1, HeartbeatRequest_v2] +HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1, HeartbeatResponse_v2] class LeaveGroupResponse_v0(Response): @@ -209,6 +249,12 @@ class LeaveGroupResponse_v1(Response): ) +class LeaveGroupResponse_v2(Response): + API_KEY = 13 + API_VERSION = 2 + SCHEMA = LeaveGroupResponse_v1.SCHEMA + + class LeaveGroupRequest_v0(Request): API_KEY = 13 API_VERSION = 0 @@ -226,5 +272,12 @@ class LeaveGroupRequest_v1(Request): SCHEMA = LeaveGroupRequest_v0.SCHEMA -LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1] -LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1] +class LeaveGroupRequest_v2(Request): + API_KEY = 13 + API_VERSION = 2 + RESPONSE_TYPE = LeaveGroupResponse_v2 + SCHEMA = LeaveGroupRequest_v1.SCHEMA + + +LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1, LeaveGroupRequest_v2] +LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1, LeaveGroupResponse_v2]