From ae1ae0e67fb82fe8f9befd1c6f184601ea5102ec Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 26 May 2025 15:14:48 +0530 Subject: [PATCH 1/3] Modified describegroup test --- .../admin/test_describe_operations.py | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 3bfbfd88a..6f907c7fe 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -19,7 +19,7 @@ from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, AclOperation, AclPermissionType) from confluent_kafka.error import ConsumeError -from confluent_kafka import ConsumerGroupState, TopicCollection +from confluent_kafka import ConsumerGroupState, TopicCollection, ConsumerGroupType from tests.common import TestUtils @@ -30,12 +30,16 @@ def verify_commit_result(err, _): assert err is not None -def consume_messages(sasl_cluster, group_id, topic, num_messages=None): +def consume_messages(sasl_cluster, group_id, group_protocol, topic, num_messages=None): conf = {'group.id': group_id, - 'session.timeout.ms': 6000, + 'group.protocol': group_protocol, 'enable.auto.commit': False, 'on_commit': verify_commit_result, 'auto.offset.reset': 'earliest'} + + if group_protocol == 'classic' : + conf['session.timeout.ms'] = 6000 + consumer = sasl_cluster.consumer(conf) consumer.subscribe([topic]) read_messages = 0 @@ -164,7 +168,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Consume some messages for the group group = 'test-group' - consume_messages(cluster, group, topic, 2) + consume_messages(cluster, group, 'classic', topic, 2) # Verify Describe Consumer Groups desc = verify_provided_describe_for_authorized_operations(admin_client, @@ -177,10 +181,29 @@ def verify_describe_groups(cluster, admin_client, topic): assert group == desc.group_id assert desc.is_simple_consumer_group is False assert desc.state == ConsumerGroupState.EMPTY + assert desc.type == ConsumerGroupType.CLASSIC # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10) + consumer_group = 'test-group-consumer' + + consume_messages(cluster, consumer_group, 'consumer', topic, 2) + + desc = verify_provided_describe_for_authorized_operations(admin_client, + admin_client.describe_consumer_groups, + AclOperation.READ, + AclOperation.DELETE, + ResourceType.GROUP, + consumer_group, + [consumer_group]) + assert consumer_group == desc.group_id + assert desc.is_simple_consumer_group is False + assert desc.state == ConsumerGroupState.EMPTY + assert desc.type == ConsumerGroupType.CONSUMER + + # Delete group + perform_admin_operation_sync(admin_client.delete_consumer_groups, [consumer_group], request_timeout=10) def verify_describe_cluster(admin_client): desc = verify_provided_describe_for_authorized_operations(admin_client, @@ -217,11 +240,7 @@ def test_describe_operations(sasl_cluster): verify_describe_topics(admin_client, our_topic) # Verify Authorized Operations in Describe Groups - # Skip this test if using group protocol `consumer` - # as there is new RPC for describe_groups() in - # group protocol `consumer` case. - if not TestUtils.use_group_protocol_consumer(): - verify_describe_groups(sasl_cluster, admin_client, our_topic) + verify_describe_groups(sasl_cluster, admin_client, our_topic) # Delete Topic perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10) From 146f02a676a224631eb88407eda6218c1f74a55a Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 26 May 2025 15:24:22 +0530 Subject: [PATCH 2/3] style fix --- tests/integration/admin/test_describe_operations.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 6f907c7fe..4f7778d7f 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -21,8 +21,6 @@ from confluent_kafka.error import ConsumeError from confluent_kafka import ConsumerGroupState, TopicCollection, ConsumerGroupType -from tests.common import TestUtils - topic_prefix = "test-topic" @@ -37,7 +35,7 @@ def consume_messages(sasl_cluster, group_id, group_protocol, topic, num_messages 'on_commit': verify_commit_result, 'auto.offset.reset': 'earliest'} - if group_protocol == 'classic' : + if group_protocol == 'classic': conf['session.timeout.ms'] = 6000 consumer = sasl_cluster.consumer(conf) @@ -186,7 +184,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10) - consumer_group = 'test-group-consumer' + consumer_group = 'test-group-consumer' consume_messages(cluster, consumer_group, 'consumer', topic, 2) @@ -205,6 +203,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [consumer_group], request_timeout=10) + def verify_describe_cluster(admin_client): desc = verify_provided_describe_for_authorized_operations(admin_client, admin_client.describe_cluster, From c7e0997b9ef6e08e0ba013f62b53bae0e492ff8b Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 26 May 2025 19:25:11 +0530 Subject: [PATCH 3/3] style fix and updated condition in update_conf_group_protocol --- tests/common/__init__.py | 3 ++- tests/integration/admin/test_describe_operations.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/common/__init__.py b/tests/common/__init__.py index 3d9ec5c7a..5c5c201a9 100644 --- a/tests/common/__init__.py +++ b/tests/common/__init__.py @@ -56,7 +56,8 @@ def use_group_protocol_consumer(): @staticmethod def update_conf_group_protocol(conf=None): if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer(): - conf['group.protocol'] = 'consumer' + if 'group.protocol' not in conf: + conf['group.protocol'] = 'consumer' @staticmethod def remove_forbidden_conf_group_protocol_consumer(conf): diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 4f7778d7f..6d53df059 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -184,7 +184,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10) - consumer_group = 'test-group-consumer' + consumer_group = 'test-group-consumer' consume_messages(cluster, consumer_group, 'consumer', topic, 2)