From a91e8dd7c00efc14affd4d79014c2f10b512edc0 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 24 Apr 2020 16:10:16 +0200 Subject: [PATCH 01/11] create tests for delete_consumer_groups command --- test/test_admin_integration.py | 61 ++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index dc04537d5..dd44b3740 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -142,6 +142,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): with pytest.raises(ValueError): configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') def test_describe_consumer_group_does_not_exist(kafka_admin_client): """Tests that the describe consumer group call fails if the group coordinator is not available @@ -149,6 +150,7 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client): with pytest.raises(GroupCoordinatorNotAvailableError): group_description = kafka_admin_client.describe_consumer_groups(['test']) + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): """Tests that the describe consumer group call returns valid consumer group information @@ -236,3 +238,62 @@ def consumer_thread(i, group_id): stop[c].set() threads[c].join() threads[c] = None + + +@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") +def test_delete_consumergroups_inactive_group(kafka_admin_client, kafka_consumer_factory, send_messages): + send_messages(range(0, 100), partition=0) + send_messages(range(0, 100), partition=1) + consumer1 = kafka_consumer_factory(group_id="group1") + next(consumer1) + consumer1.close() + + consumer2 = kafka_consumer_factory(group_id="group2") + next(consumer2) + consumer2.close() + + consumer3 = kafka_consumer_factory(group_id="group3") + next(consumer3) + consumer3.close() + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert "group1" in consumergroups + assert "group2" in consumergroups + assert "group3" in consumergroups + + kafka_admin_client.delete_consumer_groups(["group1", "group2"]) + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert "group1" not in consumergroups + assert "group2" not in consumergroups + assert "group3" in consumergroups + + +@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") +def test_delete_consumergroups_active_group(kafka_admin_client, kafka_consumer_factory, send_messages): + send_messages(range(0, 100), partition=0) + send_messages(range(0, 100), partition=1) + consumer1 = kafka_consumer_factory(group_id="group1") + next(consumer1) + consumer1.close() + + consumer2 = kafka_consumer_factory(group_id="group2") + next(consumer2) + + consumer3 = kafka_consumer_factory(group_id="group3") + next(consumer3) + consumer3.close() + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert "group1" in consumergroups + assert "group2" in consumergroups + assert "group3" in consumergroups + + # TODO use more specific exception + with pytest.raises(Exception): + kafka_admin_client.delete_consumer_groups(["group1", "group2"]) + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert "group1" not in consumergroups + assert "group2" in consumergroups + assert "group3" in consumergroups From 64253aad2ff16cb83b73c6638f3475c9a056f567 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 24 Apr 2020 17:46:17 +0200 Subject: [PATCH 02/11] improve tests --- test/test_admin_integration.py | 35 +++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index dd44b3740..f41a3c50d 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -7,7 +7,7 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) -from kafka.errors import (NoError, GroupCoordinatorNotAvailableError) +from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -241,9 +241,8 @@ def consumer_thread(i, group_id): @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") -def test_delete_consumergroups_inactive_group(kafka_admin_client, kafka_consumer_factory, send_messages): +def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages): send_messages(range(0, 100), partition=0) - send_messages(range(0, 100), partition=1) consumer1 = kafka_consumer_factory(group_id="group1") next(consumer1) consumer1.close() @@ -261,7 +260,13 @@ def test_delete_consumergroups_inactive_group(kafka_admin_client, kafka_consumer assert "group2" in consumergroups assert "group3" in consumergroups - kafka_admin_client.delete_consumer_groups(["group1", "group2"]) + delete_results = { + group_id: error + for group_id, error in kafka_admin_client.delete_consumer_groups(["group1", "group2"]) + } + assert delete_results["group1"] == NoError + assert delete_results["group2"] == NoError + assert "group3" not in delete_results consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} assert "group1" not in consumergroups @@ -270,9 +275,8 @@ def test_delete_consumergroups_inactive_group(kafka_admin_client, kafka_consumer @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") -def test_delete_consumergroups_active_group(kafka_admin_client, kafka_consumer_factory, send_messages): +def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): send_messages(range(0, 100), partition=0) - send_messages(range(0, 100), partition=1) consumer1 = kafka_consumer_factory(group_id="group1") next(consumer1) consumer1.close() @@ -280,20 +284,21 @@ def test_delete_consumergroups_active_group(kafka_admin_client, kafka_consumer_f consumer2 = kafka_consumer_factory(group_id="group2") next(consumer2) - consumer3 = kafka_consumer_factory(group_id="group3") - next(consumer3) - consumer3.close() - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} assert "group1" in consumergroups assert "group2" in consumergroups - assert "group3" in consumergroups + assert "group3" not in consumergroups + + delete_results = { + group_id: error + for group_id, error in kafka_admin_client.delete_consumer_groups(["group1", "group2", "group3"]) + } - # TODO use more specific exception - with pytest.raises(Exception): - kafka_admin_client.delete_consumer_groups(["group1", "group2"]) + assert delete_results["group1"] == NoError + assert delete_results["group2"] == NonEmptyGroupError + assert delete_results["group3"] == GroupIdNotFoundError consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} assert "group1" not in consumergroups assert "group2" in consumergroups - assert "group3" in consumergroups + assert "group3" not in consumergroups \ No newline at end of file From 4b76d418fe8c8e97caa47a57041984d83353401f Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 24 Apr 2020 17:46:39 +0200 Subject: [PATCH 03/11] add consumergroup related errors --- kafka/errors.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/kafka/errors.py b/kafka/errors.py index 2c1df82de..b33cf51e2 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -449,6 +449,18 @@ class SecurityDisabledError(BrokerResponseError): description = 'Security features are disabled.' +class NonEmptyGroupError(BrokerResponseError): + errno = 68 + message = 'NON_EMPTY_GROUP' + description = 'The group is not empty.' + + +class GroupIdNotFoundError(BrokerResponseError): + errno = 69 + message = 'GROUP_ID_NOT_FOUND' + description = 'The group id does not exist.' + + class KafkaUnavailableError(KafkaError): pass From 804faf4e2d02d9300b6337b5da7a1e7d198813f4 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 24 Apr 2020 17:47:02 +0200 Subject: [PATCH 04/11] add DeleteGroups to protocol.admin --- kafka/protocol/admin.py | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index af88ea473..f3b691a5f 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -882,3 +882,44 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] + +class DeleteGroupsResponse_v0(Response): + API_KEY = 42 + API_VERSION = 0 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ("results", Array( + ("group_id", String("utf-8")), + ("error_code", Int16))) + ) + + +class DeleteGroupsResponse_v1(Response): + API_KEY = 42 + API_VERSION = 1 + SCHEMA = DeleteGroupsResponse_v0.SCHEMA + + +class DeleteGroupsRequest_v0(Request): + API_KEY = 42 + API_VERSION = 0 + RESPONSE_TYPE = DeleteGroupsResponse_v0 + SCHEMA = Schema( + ("groups_names", Array(String("utf-8"))) + ) + + +class DeleteGroupsRequest_v1(Request): + API_KEY = 42 + API_VERSION = 1 + RESPONSE_TYPE = DeleteGroupsResponse_v1 + SCHEMA = DeleteGroupsRequest_v0.SCHEMA + + +DeleteGroupsRequest = [ + DeleteGroupsRequest_v0, DeleteGroupsRequest_v1 +] + +DeleteGroupsResponse = [ + DeleteGroupsResponse_v0, DeleteGroupsResponse_v1 +] From 16cd36a75b0a9e2ad6072e1a16d97ae1022e6a16 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Fri, 24 Apr 2020 17:47:23 +0200 Subject: [PATCH 05/11] implemente delete_groups feature on KafkaAdminClient --- kafka/admin/client.py | 93 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 1b91e1b80..b5665ffdf 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -19,7 +19,9 @@ from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) + ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, + DeleteGroupsRequest +) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.types import Array @@ -343,6 +345,30 @@ def _find_coordinator_id(self, group_id): response = future.value return self._find_coordinator_id_process_response(response) + def _find_many_coordinator_ids(self, group_ids): + """Find the broker node_id of the coordinator for each of the given groups. + + Sends a FindCoordinatorRequest message to the cluster for each group_id. + Will block until the FindCoordinatorResponse is received for all groups. + Any errors are immediately raised. + + :param group_ids: A list of consumer group IDs. This is typically the group + name as a string. + :return: A list of tuples (group_id, node_id) where node_id is the id + of the broker that is the coordinator for the corresponding group. + """ + # Note: Java may change how this is implemented in KAFKA-6791. + futures = { + group_id: self._find_coordinator_id_send_request(group_id) + for group_id in group_ids + } + self._wait_for_futures(list(futures.values())) + groups_coordinators = [ + (group_id, self._find_coordinator_id_process_response(f.value)) + for group_id, f in futures.items() + ] + return groups_coordinators + def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. @@ -1261,8 +1287,69 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, response = future.value return self._list_consumer_group_offsets_process_response(response) - # delete groups protocol not yet implemented - # Note: send the request to the group's coordinator. + def delete_consumer_groups(self, group_ids, group_coordinator_id=None): + """Delete Consumer Group Offsets for given consumer groups. + + Note: + This does not verify that the group ids actually exist and + group_coordinator_id is the correct coordinator for all these groups. + + The result needs checking for potential errors. + + :param group_ids: The consumer group ids of the groups which are to be deleted. + :param group_coordinator_id: The node_id of the broker which is the coordinator for + all the groups. Use only if all groups are coordinated by the same broker. + If set to None, will query the cluster to find the coordinator for every single group. + Explicitly specifying this can be useful to prevent + that extra network round trips if you already know the group + coordinator. Default: None. + :return dictionary: A list of tuples (group_id, KafkaError) + """ + if group_coordinator_id is not None: + futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)] + else: + groups_coordinators = defaultdict(list) + for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids): + groups_coordinators[group_coordinator_id].append(group_id) + futures = [ + self._delete_consumer_groups_send_request(group_ids, group_coordinator_id) + for group_coordinator_id, group_ids in groups_coordinators.items() + ] + + self._wait_for_futures(futures) + + results = [] + for f in futures: + results.extend(self._convert_delete_groups_response(f.value)) + return results + + def _convert_delete_groups_response(self, response): + if response.API_VERSION <= 1: + results = [] + for group_id, error_code in response.results: + results.append((group_id, Errors.for_code(error_code))) + return results + else: + raise NotImplementedError( + "Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + + def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id): + """Send a DeleteGroups request to a broker. + + :param group_ids: The consumer group ids of the groups which are to be deleted. + :param group_coordinator_id: The node_id of the broker which is the coordinator for + all the groups. + :return: A message future + """ + version = self._matching_api_version(DeleteGroupsRequest) + if version <= 1: + request = DeleteGroupsRequest[version](group_ids) + else: + raise NotImplementedError( + "Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient." + .format(version)) + return self._send_request_to_node(group_coordinator_id, request) def _wait_for_futures(self, futures): while not all(future.succeeded() for future in futures): From 792cf2b6887ac810043bed7e31792cb9fe83192d Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 27 Apr 2020 10:06:40 +0200 Subject: [PATCH 06/11] fix indentation of test code --- test/test_admin_integration.py | 52 +++++++++++++++++----------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index f41a3c50d..466f1bd74 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -276,29 +276,29 @@ def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_ @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): - send_messages(range(0, 100), partition=0) - consumer1 = kafka_consumer_factory(group_id="group1") - next(consumer1) - consumer1.close() - - consumer2 = kafka_consumer_factory(group_id="group2") - next(consumer2) - - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "group1" in consumergroups - assert "group2" in consumergroups - assert "group3" not in consumergroups - - delete_results = { - group_id: error - for group_id, error in kafka_admin_client.delete_consumer_groups(["group1", "group2", "group3"]) - } - - assert delete_results["group1"] == NoError - assert delete_results["group2"] == NonEmptyGroupError - assert delete_results["group3"] == GroupIdNotFoundError - - consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "group1" not in consumergroups - assert "group2" in consumergroups - assert "group3" not in consumergroups \ No newline at end of file + send_messages(range(0, 100), partition=0) + consumer1 = kafka_consumer_factory(group_id="group1") + next(consumer1) + consumer1.close() + + consumer2 = kafka_consumer_factory(group_id="group2") + next(consumer2) + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert "group1" in consumergroups + assert "group2" in consumergroups + assert "group3" not in consumergroups + + delete_results = { + group_id: error + for group_id, error in kafka_admin_client.delete_consumer_groups(["group1", "group2", "group3"]) + } + + assert delete_results["group1"] == NoError + assert delete_results["group2"] == NonEmptyGroupError + assert delete_results["group3"] == GroupIdNotFoundError + + consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} + assert "group1" not in consumergroups + assert "group2" in consumergroups + assert "group3" not in consumergroups \ No newline at end of file From fa8af0cb023aaf4cfb2ba4ac1780545b4a0eede4 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Mon, 27 Apr 2020 11:26:21 +0200 Subject: [PATCH 07/11] fix delete_consumer_groups docstring --- kafka/admin/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index b5665ffdf..f1f440f10 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1303,7 +1303,7 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None): Explicitly specifying this can be useful to prevent that extra network round trips if you already know the group coordinator. Default: None. - :return dictionary: A list of tuples (group_id, KafkaError) + :return: A list of tuples (group_id, KafkaError) """ if group_coordinator_id is not None: futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)] From 11d8020b77fb15ccfcc404052f4d177dc376e7fe Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Sat, 20 Jun 2020 13:40:37 +0200 Subject: [PATCH 08/11] fix indentation and use unique consumergroup ids for tests --- test/test_admin_integration.py | 50 +++++++++++++++++----------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 466f1bd74..978df2f5a 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -243,62 +243,62 @@ def consumer_thread(i, group_id): @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages): send_messages(range(0, 100), partition=0) - consumer1 = kafka_consumer_factory(group_id="group1") + consumer1 = kafka_consumer_factory(group_id="test1-group1") next(consumer1) consumer1.close() - consumer2 = kafka_consumer_factory(group_id="group2") + consumer2 = kafka_consumer_factory(group_id="test1-group2") next(consumer2) consumer2.close() - consumer3 = kafka_consumer_factory(group_id="group3") + consumer3 = kafka_consumer_factory(group_id="test1-group3") next(consumer3) consumer3.close() consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "group1" in consumergroups - assert "group2" in consumergroups - assert "group3" in consumergroups + assert "test1-group1" in consumergroups + assert "test1-group2" in consumergroups + assert "test1-group3" in consumergroups delete_results = { group_id: error - for group_id, error in kafka_admin_client.delete_consumer_groups(["group1", "group2"]) + for group_id, error in kafka_admin_client.delete_consumer_groups(["test1-group1", "test1-group2"]) } - assert delete_results["group1"] == NoError - assert delete_results["group2"] == NoError - assert "group3" not in delete_results + assert delete_results["test1-group1"] == NoError + assert delete_results["test1-group2"] == NoError + assert "test1-group3" not in delete_results consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "group1" not in consumergroups - assert "group2" not in consumergroups - assert "group3" in consumergroups + assert "test1-group1" not in consumergroups + assert "test1-group2" not in consumergroups + assert "test1-group3" in consumergroups @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): send_messages(range(0, 100), partition=0) - consumer1 = kafka_consumer_factory(group_id="group1") + consumer1 = kafka_consumer_factory(group_id="test2-group1") next(consumer1) consumer1.close() - consumer2 = kafka_consumer_factory(group_id="group2") + consumer2 = kafka_consumer_factory(group_id="test2-group2") next(consumer2) consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "group1" in consumergroups - assert "group2" in consumergroups - assert "group3" not in consumergroups + assert "test2-group1" in consumergroups + assert "test2-group2" in consumergroups + assert "test2-group3" not in consumergroups delete_results = { group_id: error - for group_id, error in kafka_admin_client.delete_consumer_groups(["group1", "group2", "group3"]) + for group_id, error in kafka_admin_client.delete_consumer_groups(["test2-group1", "test2-group2", "test2-group3"]) } - assert delete_results["group1"] == NoError - assert delete_results["group2"] == NonEmptyGroupError - assert delete_results["group3"] == GroupIdNotFoundError + assert delete_results["test2-group1"] == NoError + assert delete_results["test2-group2"] == NonEmptyGroupError + assert delete_results["test2-group3"] == GroupIdNotFoundError consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "group1" not in consumergroups - assert "group2" in consumergroups - assert "group3" not in consumergroups \ No newline at end of file + assert "test2-group1" not in consumergroups + assert "test2-group2" in consumergroups + assert "test2-group3" not in consumergroups \ No newline at end of file From 12f0a008dbb4c1477092d53d199e1a59fee1a1c8 Mon Sep 17 00:00:00 2001 From: Swen Date: Thu, 17 Sep 2020 14:56:46 +0200 Subject: [PATCH 09/11] add newline at end of test_admin_integration.py --- test/test_admin_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 978df2f5a..01b0f6b7c 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -301,4 +301,4 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} assert "test2-group1" not in consumergroups assert "test2-group2" in consumergroups - assert "test2-group3" not in consumergroups \ No newline at end of file + assert "test2-group3" not in consumergroups From 52bde481dfdda63134c500b0bfa65b1b119d2236 Mon Sep 17 00:00:00 2001 From: Swen Date: Thu, 17 Sep 2020 14:58:43 +0200 Subject: [PATCH 10/11] remove outdated note in admin/client.py --- kafka/admin/client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index f1f440f10..97fe73acb 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -339,7 +339,6 @@ def _find_coordinator_id(self, group_id): name as a string. :return: The node_id of the broker that is the coordinator. """ - # Note: Java may change how this is implemented in KAFKA-6791. future = self._find_coordinator_id_send_request(group_id) self._wait_for_futures([future]) response = future.value @@ -357,7 +356,6 @@ def _find_many_coordinator_ids(self, group_ids): :return: A list of tuples (group_id, node_id) where node_id is the id of the broker that is the coordinator for the corresponding group. """ - # Note: Java may change how this is implemented in KAFKA-6791. futures = { group_id: self._find_coordinator_id_send_request(group_id) for group_id in group_ids From 2f678ae2dc076502679755405e761e547a1b50ea Mon Sep 17 00:00:00 2001 From: Swen Date: Thu, 17 Sep 2020 15:08:06 +0200 Subject: [PATCH 11/11] use random group ids in test_admin_integration.py --- test/test_admin_integration.py | 60 ++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 01b0f6b7c..06c40a223 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -242,63 +242,73 @@ def consumer_thread(i, group_id): @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages): + random_group_id = 'test-group-' + random_string(6) + group1 = random_group_id + "_1" + group2 = random_group_id + "_2" + group3 = random_group_id + "_3" + send_messages(range(0, 100), partition=0) - consumer1 = kafka_consumer_factory(group_id="test1-group1") + consumer1 = kafka_consumer_factory(group_id=group1) next(consumer1) consumer1.close() - consumer2 = kafka_consumer_factory(group_id="test1-group2") + consumer2 = kafka_consumer_factory(group_id=group2) next(consumer2) consumer2.close() - consumer3 = kafka_consumer_factory(group_id="test1-group3") + consumer3 = kafka_consumer_factory(group_id=group3) next(consumer3) consumer3.close() consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "test1-group1" in consumergroups - assert "test1-group2" in consumergroups - assert "test1-group3" in consumergroups + assert group1 in consumergroups + assert group2 in consumergroups + assert group3 in consumergroups delete_results = { group_id: error - for group_id, error in kafka_admin_client.delete_consumer_groups(["test1-group1", "test1-group2"]) + for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2]) } - assert delete_results["test1-group1"] == NoError - assert delete_results["test1-group2"] == NoError - assert "test1-group3" not in delete_results + assert delete_results[group1] == NoError + assert delete_results[group2] == NoError + assert group3 not in delete_results consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "test1-group1" not in consumergroups - assert "test1-group2" not in consumergroups - assert "test1-group3" in consumergroups + assert group1 not in consumergroups + assert group2 not in consumergroups + assert group3 in consumergroups @pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1") def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages): + random_group_id = 'test-group-' + random_string(6) + group1 = random_group_id + "_1" + group2 = random_group_id + "_2" + group3 = random_group_id + "_3" + send_messages(range(0, 100), partition=0) - consumer1 = kafka_consumer_factory(group_id="test2-group1") + consumer1 = kafka_consumer_factory(group_id=group1) next(consumer1) consumer1.close() - consumer2 = kafka_consumer_factory(group_id="test2-group2") + consumer2 = kafka_consumer_factory(group_id=group2) next(consumer2) consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "test2-group1" in consumergroups - assert "test2-group2" in consumergroups - assert "test2-group3" not in consumergroups + assert group1 in consumergroups + assert group2 in consumergroups + assert group3 not in consumergroups delete_results = { group_id: error - for group_id, error in kafka_admin_client.delete_consumer_groups(["test2-group1", "test2-group2", "test2-group3"]) + for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3]) } - assert delete_results["test2-group1"] == NoError - assert delete_results["test2-group2"] == NonEmptyGroupError - assert delete_results["test2-group3"] == GroupIdNotFoundError + assert delete_results[group1] == NoError + assert delete_results[group2] == NonEmptyGroupError + assert delete_results[group3] == GroupIdNotFoundError consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()} - assert "test2-group1" not in consumergroups - assert "test2-group2" in consumergroups - assert "test2-group3" not in consumergroups + assert group1 not in consumergroups + assert group2 in consumergroups + assert group3 not in consumergroups