Skip to content

Commit 2e0ada0

Browse files
dpkpjeffwidman
authored andcommitted
Fix response error checking in KafkaAdminClient send_to_controller
Previously we weren't accounting for when the response tuple also has a `error_message` value. Note that in Java, the error fieldname is inconsistent: - `CreateTopicsResponse` / `CreatePartitionsResponse` uses `topic_errors` - `DeleteTopicsResponse` uses `topic_error_codes` So this updates the `CreateTopicsResponse` classes to match. The fix is a little brittle, but should suffice for now.
1 parent 807ac82 commit 2e0ada0

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

kafka/admin/client.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,18 @@ def _send_request_to_controller(self, request):
331331
while tries:
332332
tries -= 1
333333
response = self._send_request_to_node(self._controller_id, request)
334-
# DeleteTopicsResponse returns topic_error_codes rather than topic_errors
335-
for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
334+
# In Java, the error fieldname is inconsistent:
335+
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
336+
# - DeleteTopicsResponse uses topic_error_codes
337+
# So this is a little brittle in that it assumes all responses have
338+
# one of these attributes and that they always unpack into
339+
# (topic, error_code) tuples.
340+
topic_error_tuples = getattr(response, "topic_errors", response.topic_error_codes)
341+
# Also small py2/py3 compatibility -- py3 can ignore extra values
342+
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
343+
# So for now we have to map across the list and explicitly drop any
344+
# extra values (usually the error_message)
345+
for topic, error_code in map(lambda e: e[:2], topic_error_tuples):
336346
error_type = Errors.for_code(error_code)
337347
if tries and error_type is NotControllerError:
338348
# No need to inspect the rest of the errors for

kafka/protocol/admin.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class CreateTopicsResponse_v0(Response):
5151
API_KEY = 19
5252
API_VERSION = 0
5353
SCHEMA = Schema(
54-
('topic_error_codes', Array(
54+
('topic_errors', Array(
5555
('topic', String('utf-8')),
5656
('error_code', Int16)))
5757
)
@@ -61,7 +61,7 @@ class CreateTopicsResponse_v1(Response):
6161
API_KEY = 19
6262
API_VERSION = 1
6363
SCHEMA = Schema(
64-
('topic_error_codes', Array(
64+
('topic_errors', Array(
6565
('topic', String('utf-8')),
6666
('error_code', Int16),
6767
('error_message', String('utf-8'))))
@@ -73,7 +73,7 @@ class CreateTopicsResponse_v2(Response):
7373
API_VERSION = 2
7474
SCHEMA = Schema(
7575
('throttle_time_ms', Int32),
76-
('topic_error_codes', Array(
76+
('topic_errors', Array(
7777
('topic', String('utf-8')),
7878
('error_code', Int16),
7979
('error_message', String('utf-8'))))

0 commit comments

Comments
 (0)