diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index a300301c6..9e5c914d4 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -2,9 +2,12 @@ from kafka.admin.config_resource import ConfigResource, ConfigResourceType from kafka.admin.client import KafkaAdminClient +from kafka.admin.acl_resource import (ACLResource, ACLOperation, ACLResourceType, ACLPermissionType, + ACLResourcePatternType) from kafka.admin.new_topic import NewTopic from kafka.admin.new_partitions import NewPartitions __all__ = [ - 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions' + 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClent', 'NewTopic', 'NewPartitions', 'ACLResource', 'ACLOperation', + 'ACLResourceType', 'ACLPermissionType', 'ACLResourcePatternType' ] diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py new file mode 100644 index 000000000..fc273f8d1 --- /dev/null +++ b/kafka/admin/acl_resource.py @@ -0,0 +1,86 @@ +from __future__ import absolute_import +from kafka.errors import IllegalArgumentError + +# enum in stdlib as of py3.4 +try: + from enum import IntEnum # pylint: disable=import-error +except ImportError: + # vendored backport module + from kafka.vendor.enum34 import IntEnum + + +class ACLResourceType(IntEnum): + """An enumerated type of config resources""" + + ANY = 1, + BROKER = 4, + DELEGATION_TOKEN = 6, + GROUP = 3, + TOPIC = 2, + TRANSACTIONAL_ID = 5 + +class ACLOperation(IntEnum): + """An enumerated type of acl operations""" + + ANY = 1, + ALL = 2, + READ = 3, + WRITE = 4, + CREATE = 5, + DELETE = 6, + ALTER = 7, + DESCRIBE = 8, + CLUSTER_ACTION = 9, + DESCRIBE_CONFIGS = 10, + ALTER_CONFIGS = 11, + IDEMPOTENT_WRITE = 12 + + +class ACLPermissionType(IntEnum): + """An enumerated type of permissions""" + + ANY = 1, + DENY = 2, + ALLOW = 3 + +class ACLResourcePatternType(IntEnum): + """An enumerated type of resource patterns""" + + ANY = 1, + MATCH = 2, + LITERAL = 3, + PREFIXED = 4 + +class ACLResource(object): + """A class for specifying config resources. + Arguments: + resource_type (ConfigResourceType): the type of kafka resource + name (string): The name of the kafka resource + configs ({key : value}): A maps of config keys to values. + """ + + def __init__( + self, + resource_type, + operation, + permission_type, + name=None, + principal=None, + host=None, + pattern_type=ACLResourcePatternType.LITERAL + ): + if not isinstance(resource_type, ACLResourceType): + raise IllegalArgumentError("resource_param must be of type ACLResourceType") + self.resource_type = resource_type + if not isinstance(operation, ACLOperation): + raise IllegalArgumentError("operation must be of type ACLOperation") + self.operation = operation + if not isinstance(permission_type, ACLPermissionType): + raise IllegalArgumentError("permission_type must be of type ACLPermissionType") + self.permission_type = permission_type + self.name = name + self.principal = principal + self.host = host + if not isinstance(pattern_type, ACLResourcePatternType): + raise IllegalArgumentError("pattern_type must be of type ACLResourcePatternType") + self.pattern_type = pattern_type diff --git a/kafka/admin/client.py b/kafka/admin/client.py index e25afe7d8..b121b4bf4 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -11,14 +11,15 @@ import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, - UnrecognizedBrokerVersion) + UnrecognizedBrokerVersion, IllegalArgumentError) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest) + ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.admin.acl_resource import ACLOperation, ACLPermissionType from kafka.version import __version__ @@ -428,14 +429,136 @@ def delete_topics(self, topics, timeout_ms=None): # describe cluster functionality is in ClusterMetadata # Note: if implemented here, send the request to the least_loaded_node() - # describe_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + def describe_acls(self, acl_resource): + """Describe a set of ACLs + """ - # create_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + version = self._matching_api_version(DescribeAclsRequest) + if version == 0: + request = DescribeAclsRequest[version]( + resource_type=acl_resource.resource_type, + resource_name=acl_resource.name, + principal=acl_resource.principal, + host=acl_resource.host, + operation=acl_resource.operation, + permission_type=acl_resource.permission_type + ) + elif version <= 1: + request = DescribeAclsRequest[version]( + resource_type=acl_resource.resource_type, + resource_name=acl_resource.name, + resource_pattern_type_filter=acl_resource.pattern_type, + principal=acl_resource.principal, + host=acl_resource.host, + operation=acl_resource.operation, + permission_type=acl_resource.permission_type - # delete_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + ) + else: + raise NotImplementedError( + "Support for DescribeAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + return self._send_request_to_node(self._client.least_loaded_node(), request) + + @staticmethod + def _convert_create_acls_resource_request_v0(acl_resource): + if acl_resource.operation == ACLOperation.ANY: + raise IllegalArgumentError("operation must not be ANY") + if acl_resource.permission_type == ACLPermissionType.ANY: + raise IllegalArgumentError("permission_type must not be ANY") + + return ( + acl_resource.resource_type, + acl_resource.name, + acl_resource.principal, + acl_resource.host, + acl_resource.operation, + acl_resource.permission_type + ) + + @staticmethod + def _convert_create_acls_resource_request_v1(acl_resource): + + if acl_resource.operation == ACLOperation.ANY: + raise IllegalArgumentError("operation must not be ANY") + if acl_resource.permission_type == ACLPermissionType.ANY: + raise IllegalArgumentError("permission_type must not be ANY") + + return ( + acl_resource.resource_type, + acl_resource.name, + acl_resource.pattern_type, + acl_resource.principal, + acl_resource.host, + acl_resource.operation, + acl_resource.permission_type + ) + + def create_acls(self, acl_resources): + """Create a set of ACLs""" + + version = self._matching_api_version(CreateAclsRequest) + if version == 0: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v0(acl_resource) for acl_resource in acl_resources] + ) + elif version <= 1: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v1(acl_resource) for acl_resource in acl_resources] + ) + else: + raise NotImplementedError( + "Support for CreateAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + return self._send_request_to_node(self._client.least_loaded_node(), request) + + @staticmethod + def _convert_delete_acls_resource_request_v0(acl_resource): + return ( + acl_resource.resource_type, + acl_resource.name, + acl_resource.principal, + acl_resource.host, + acl_resource.operation, + acl_resource.permission_type + ) + + @staticmethod + def _convert_delete_acls_resource_request_v1(acl_resource): + return ( + acl_resource.resource_type, + acl_resource.name, + acl_resource.pattern_type, + acl_resource.principal, + acl_resource.host, + acl_resource.operation, + acl_resource.permission_type + ) + + def delete_acls(self, acl_resources): + """Delete a set of ACLSs""" + + version = self._matching_api_version(DeleteAclsRequest) + + if version == 0: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v0(acl_resource) for acl_resource in acl_resources] + ) + elif version <= 1: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v1(acl_resource) for acl_resource in acl_resources] + ) + else: + raise NotImplementedError( + "Support for DeleteAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + return self._send_request_to_node(self._client.least_loaded_node(), request) @staticmethod def _convert_describe_config_resource_request(config_resource): diff --git a/test/test_admin.py b/test/test_admin.py index 300d5bced..371e66e8f 100644 --- a/test/test_admin.py +++ b/test/test_admin.py @@ -26,6 +26,24 @@ def test_new_partitions(): assert good_partitions.new_assignments == [[1, 2, 3]] +def test_acl_resource(): + good_resource = kafka.admin.ACLResource( + kafka.admin.ACLResourceType.TOPIC, + kafka.admin.ACLOperation.ALL, + kafka.admin.ACLPermissionType.ALLOW, + "foo", + "User:bar", + "*", + kafka.admin.ACLResourcePatternType.LITERAL + ) + assert(good_resource.resource_type == kafka.admin.ACLResourceType.TOPIC) + assert(good_resource.operation == kafka.admin.ACLOperation.ALL) + assert(good_resource.permission_type == kafka.admin.ACLPermissionType.ALLOW) + assert(good_resource.pattern_type == kafka.admin.ACLResourcePatternType.LITERAL) + + with pytest.raises(IllegalArgumentError): + bad_resource = kafka.admin.ACLResource("TOPIC", "ALL", "ALLOW", "foo", "User:bar", "*", "LITERAL") + def test_new_topic(): with pytest.raises(IllegalArgumentError): bad_topic = kafka.admin.NewTopic('foo', -1, -1)