diff --git a/tests/common/__init__.py b/tests/common/__init__.py index 3d9ec5c7a..5c5c201a9 100644 --- a/tests/common/__init__.py +++ b/tests/common/__init__.py @@ -56,7 +56,8 @@ def use_group_protocol_consumer(): @staticmethod def update_conf_group_protocol(conf=None): if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer(): - conf['group.protocol'] = 'consumer' + if 'group.protocol' not in conf: + conf['group.protocol'] = 'consumer' @staticmethod def remove_forbidden_conf_group_protocol_consumer(conf): diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 3bfbfd88a..6d53df059 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -19,9 +19,7 @@ from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, AclOperation, AclPermissionType) from confluent_kafka.error import ConsumeError -from confluent_kafka import ConsumerGroupState, TopicCollection - -from tests.common import TestUtils +from confluent_kafka import ConsumerGroupState, TopicCollection, ConsumerGroupType topic_prefix = "test-topic" @@ -30,12 +28,16 @@ def verify_commit_result(err, _): assert err is not None -def consume_messages(sasl_cluster, group_id, topic, num_messages=None): +def consume_messages(sasl_cluster, group_id, group_protocol, topic, num_messages=None): conf = {'group.id': group_id, - 'session.timeout.ms': 6000, + 'group.protocol': group_protocol, 'enable.auto.commit': False, 'on_commit': verify_commit_result, 'auto.offset.reset': 'earliest'} + + if group_protocol == 'classic': + conf['session.timeout.ms'] = 6000 + consumer = sasl_cluster.consumer(conf) consumer.subscribe([topic]) read_messages = 0 @@ -164,7 +166,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Consume some messages for the group group = 'test-group' - consume_messages(cluster, group, topic, 2) + consume_messages(cluster, group, 'classic', topic, 2) # Verify Describe Consumer Groups desc = verify_provided_describe_for_authorized_operations(admin_client, @@ -177,10 +179,30 @@ def verify_describe_groups(cluster, admin_client, topic): assert group == desc.group_id assert desc.is_simple_consumer_group is False assert desc.state == ConsumerGroupState.EMPTY + assert desc.type == ConsumerGroupType.CLASSIC # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10) + consumer_group = 'test-group-consumer' + + consume_messages(cluster, consumer_group, 'consumer', topic, 2) + + desc = verify_provided_describe_for_authorized_operations(admin_client, + admin_client.describe_consumer_groups, + AclOperation.READ, + AclOperation.DELETE, + ResourceType.GROUP, + consumer_group, + [consumer_group]) + assert consumer_group == desc.group_id + assert desc.is_simple_consumer_group is False + assert desc.state == ConsumerGroupState.EMPTY + assert desc.type == ConsumerGroupType.CONSUMER + + # Delete group + perform_admin_operation_sync(admin_client.delete_consumer_groups, [consumer_group], request_timeout=10) + def verify_describe_cluster(admin_client): desc = verify_provided_describe_for_authorized_operations(admin_client, @@ -217,11 +239,7 @@ def test_describe_operations(sasl_cluster): verify_describe_topics(admin_client, our_topic) # Verify Authorized Operations in Describe Groups - # Skip this test if using group protocol `consumer` - # as there is new RPC for describe_groups() in - # group protocol `consumer` case. - if not TestUtils.use_group_protocol_consumer(): - verify_describe_groups(sasl_cluster, admin_client, our_topic) + verify_describe_groups(sasl_cluster, admin_client, our_topic) # Delete Topic perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10)