From da49c15ad98183da6da70d8836bede84132d102b Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Fri, 16 Nov 2018 22:58:46 +0100 Subject: [PATCH 1/3] Add KafkaAdmin ACL methods --- kafka/admin/__init__.py | 6 +- kafka/admin/acl_resource.py | 210 ++++++++++++++++++++++++++++ kafka/admin/client.py | 265 ++++++++++++++++++++++++++++++++++-- kafka/errors.py | 6 + test/test_admin.py | 31 +++++ 5 files changed, 509 insertions(+), 9 deletions(-) create mode 100644 kafka/admin/acl_resource.py diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index a300301c6..c240fc6d0 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -2,9 +2,13 @@ from kafka.admin.config_resource import ConfigResource, ConfigResourceType from kafka.admin.client import KafkaAdminClient +from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, + ResourceType, ACLPermissionType, ACLResourcePatternType) from kafka.admin.new_topic import NewTopic from kafka.admin.new_partitions import NewPartitions __all__ = [ - 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions' + 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions', 'ACL', 'ACLFilter', + 'ResourcePattern', 'ResourcePatternFilter', 'ACLOperation', 'ResourceType', 'ACLPermissionType', + 'ACLResourcePatternType' ] diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py new file mode 100644 index 000000000..4f54c83ba --- /dev/null +++ b/kafka/admin/acl_resource.py @@ -0,0 +1,210 @@ +from __future__ import absolute_import +from kafka.errors import IllegalArgumentError + +# enum in stdlib as of py3.4 +try: + from enum import IntEnum # pylint: disable=import-error +except ImportError: + # vendored backport module + from kafka.vendor.enum34 import IntEnum + + +class ResourceType(IntEnum): + """Type of kafka resource to set ACL for + + The ANY value is only valid in a filter context + """ + + UNKNOWN = 0, + ANY = 1, + CLUSTER = 4, + DELEGATION_TOKEN = 6, + GROUP = 3, + TOPIC = 2, + TRANSACTIONAL_ID = 5 + + +class ACLOperation(IntEnum): + """Type of operation + + The ANY value is only valid in a filter context + """ + + ANY = 1, + ALL = 2, + READ = 3, + WRITE = 4, + CREATE = 5, + DELETE = 6, + ALTER = 7, + DESCRIBE = 8, + CLUSTER_ACTION = 9, + DESCRIBE_CONFIGS = 10, + ALTER_CONFIGS = 11, + IDEMPOTENT_WRITE = 12 + + +class ACLPermissionType(IntEnum): + """An enumerated type of permissions + + The ANY value is only valid in a filter context + """ + + ANY = 1, + DENY = 2, + ALLOW = 3 + + +class ACLResourcePatternType(IntEnum): + """An enumerated type of resource patterns + + More details on the pattern types and how they work + can be found in KIP-290 (Support for prefixed ACLs) + https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs + """ + + ANY = 1, + MATCH = 2, + LITERAL = 3, + PREFIXED = 4 + + +class ACLFilter(object): + """Represents a filter to use with describing and deleting ACLs + + The difference between this class and the ACL class is mainly that + we allow using ANY with the operation, permission, and resource type objects + to fetch ALCs matching any of the properties. + """ + + def __init__( + self, + principal, + host, + operation, + permission_type, + resource_pattern + ): + self.principal = principal + self.host = host + self.operation = operation + self.permission_type = permission_type + self.resource_pattern = resource_pattern + + self.validate() + + def validate(self): + if not isinstance(self.operation, ACLOperation): + raise IllegalArgumentError("operation must be an ACLOperation object, and cannot be ANY") + if not isinstance(self.permission_type, ACLPermissionType): + raise IllegalArgumentError("permission_type must be an ACLPermissionType object, and cannot be ANY") + if not isinstance(self.resource_pattern, ResourcePatternFilter): + raise IllegalArgumentError("resource_pattern must be a ResourcePatternFilter object") + + def __repr__(self): + return "".format( + principal=self.principal, + host=self.host, + operation=self.operation.name, + type=self.permission_type.name, + resource=self.resource_pattern + ) + + +class ACL(ACLFilter): + """Represents a concrete ACL for a specific ResourcePattern + + In kafka an ACL is a 4-tuple of (principal, host, operation, permission_type) + that limits who can do what on a specific resource (or since KIP-290 a resource pattern) + + Terminology: + Principal -> This is the identifier for the user. Depending on the authorization method used (SSL, SASL etc) + the principal will look different. See http://kafka.apache.org/documentation/#security_authz for details. + The principal must be on the format "User:" or kafka will treat it as invalid. It's possible to use + other principal types than "User" if using a custom authorizer for the cluster. + Host -> This must currently be an IP address. It cannot be a range, and it cannot be a domain name. + It can be set to "*", which is special cased in kafka to mean "any host" + Operation -> Which client operation this ACL refers to. Has different meaning depending + on the resource type the ACL refers to. See https://docs.confluent.io/current/kafka/authorization.html#acl-format + for a list of which combinations of resource/operation that unlocks which kafka APIs + Permission Type: Whether this ACL is allowing or denying access + Resource Pattern -> This is a representation of the resource or resource pattern that the ACL + refers to. See the ResourcePattern class for details. + + """ + + def __init__( + self, + principal, + host, + operation, + permission_type, + resource_pattern + ): + super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern) + self.validate() + + def validate(self): + if self.operation == ACLOperation.ANY: + raise IllegalArgumentError("operation cannot be ANY") + if self.permission_type == ACLPermissionType.ANY: + raise IllegalArgumentError("permission_type cannot be ANY") + if not isinstance(self.resource_pattern, ResourcePattern): + raise IllegalArgumentError("resource_pattern must be a ResourcePattern object") + + +class ResourcePatternFilter(object): + def __init__( + self, + resource_type, + resource_name, + pattern_type + ): + self.resource_type = resource_type + self.resource_name = resource_name + self.pattern_type = pattern_type + + self.validate() + + def validate(self): + if not isinstance(self.resource_type, ResourceType): + raise IllegalArgumentError("resource_type must be a ResourceType object") + if not isinstance(self.pattern_type, ACLResourcePatternType): + raise IllegalArgumentError("pattern_type must be an ACLResourcePatternType object") + + def __repr__(self): + return "".format( + self.resource_type.name, + self.resource_name, + self.pattern_type.name + ) + + +class ResourcePattern(ResourcePatternFilter): + """A resource pattern to apply the ACL to + + Resource patterns are used to be able to specify which resources an ACL + describes in a more flexible way than just pointing to a literal topic name for example. + Since KIP-290 (kafka 2.0) it's possible to set an ACL for a prefixed resource name, which + can cut down considerably on the number of ACLs needed when the number of topics and + consumer groups start to grow. + The default pattern_type is LITERAL, and it describes a specific resource. This is also how + ACLs worked before the introduction of prefixed ACLs + """ + + def __init__( + self, + resource_type, + resource_name, + pattern_type=ACLResourcePatternType.LITERAL + ): + super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type) + self.validate() + + def validate(self): + if self.resource_type == ResourceType.ANY: + raise IllegalArgumentError("resource_type cannot be ANY") + if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]: + raise IllegalArgumentError( + "pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name) + ) \ No newline at end of file diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 155ad21d6..224a46527 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -11,14 +11,16 @@ import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, - UnrecognizedBrokerVersion) + UnrecognizedBrokerVersion, IllegalArgumentError) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest) + ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ + ACLResourcePatternType from kafka.version import __version__ @@ -450,14 +452,261 @@ def delete_topics(self, topics, timeout_ms=None): # describe cluster functionality is in ClusterMetadata # Note: if implemented here, send the request to the least_loaded_node() - # describe_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + @staticmethod + def _convert_describe_acls_response_to_acls(describe_response): + version = describe_response.API_VERSION + + error = Errors.for_code(describe_response.error_code) + acl_list = [] + for resources in describe_response.resources: + if version == 0: + resource_type, resource_name, acls = resources + resource_pattern_type = ACLResourcePatternType.LITERAL.value + elif version <= 1: + resource_type, resource_name, resource_pattern_type, acls = resources + else: + raise NotImplementedError( + "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." + .format(version) + ) + for acl in acls: + principal, host, operation, permission_type = acl + conv_acl = ACL( + principal=principal, + host=host, + operation=ACLOperation(operation), + permission_type=ACLPermissionType(permission_type), + resource_pattern=ResourcePattern( + ResourceType(resource_type), + resource_name, + ACLResourcePatternType(resource_pattern_type) + ) + ) + acl_list.append(conv_acl) + + return (acl_list, error,) + + def describe_acls(self, acl_filter): + """Describe a set of ACLs + + Used to return a set of ACLs matching the supplied ACLFilter. + The cluster must be configured with an authorizer for this to work, or + you will get a SecurityDisabledError + + :param acl_filter: an ACLFilter object + :return: tuple of a list of matching ACL objects and a KafkaError (NoError if successful) + """ - # create_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + version = self._matching_api_version(DescribeAclsRequest) + if version == 0: + request = DescribeAclsRequest[version]( + resource_type=acl_filter.resource_pattern.resource_type, + resource_name=acl_filter.resource_pattern.resource_name, + principal=acl_filter.principal, + host=acl_filter.host, + operation=acl_filter.operation, + permission_type=acl_filter.permission_type + ) + elif version <= 1: + request = DescribeAclsRequest[version]( + resource_type=acl_filter.resource_pattern.resource_type, + resource_name=acl_filter.resource_pattern.resource_name, + resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type, + principal=acl_filter.principal, + host=acl_filter.host, + operation=acl_filter.operation, + permission_type=acl_filter.permission_type - # delete_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + ) + else: + raise NotImplementedError( + "Support for DescribeAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + response = self._send_request_to_node(self._client.least_loaded_node(), request) + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + # optionally we could retry if error_type.retriable + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + + return self._convert_describe_acls_response_to_acls(response) + + @staticmethod + def _convert_create_acls_resource_request_v0(acl): + + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_create_acls_resource_request_v1(acl): + + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.resource_pattern.pattern_type, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_create_acls_response_to_acls(acls, create_response): + version = create_response.API_VERSION + + creations_error = [] + creations_success = [] + for i, creations in enumerate(create_response.creation_responses): + if version <= 1: + error_code, error_message = creations + acl = acls[i] + error = Errors.for_code(error_code) + else: + raise NotImplementedError( + "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + if error is Errors.NoError: + creations_success.append(acl) + else: + creations_error.append((acl, error,)) + + return {"succeeded": creations_success, "failed": creations_error} + + def create_acls(self, acls): + """Create a list of ACLs + + This endpoint only accepts a list of concrete ACL objects, no ACLFilters. + Throws TopicAlreadyExistsError if topic is already present. + + :param acls: a list of ACL objects + :return: dict of successes and failures + """ + + for acl in acls: + if not isinstance(acl, ACL): + raise IllegalArgumentError("acls must contain ACL objects") + + version = self._matching_api_version(CreateAclsRequest) + if version == 0: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls] + ) + elif version <= 1: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls] + ) + else: + raise NotImplementedError( + "Support for CreateAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + response = self._send_request_to_node(self._client.least_loaded_node(), request) + + return self._convert_create_acls_response_to_acls(acls, response) + + @staticmethod + def _convert_delete_acls_resource_request_v0(acl): + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_delete_acls_resource_request_v1(acl): + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.resource_pattern.pattern_type, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response): + version = delete_response.API_VERSION + filter_result_list = [] + for i, filter_responses in enumerate(delete_response.filter_responses): + filter_error_code, filter_error_message, matching_acls = filter_responses + filter_error = Errors.for_code(filter_error_code) + acl_result_list = [] + for acl in matching_acls: + if version == 0: + error_code, error_message, resource_type, resource_name, principal, host, operation, permission_type = acl + resource_pattern_type = ACLResourcePatternType.LITERAL.value + elif version == 1: + error_code, error_message, resource_type, resource_name, resource_pattern_type, principal, host, operation, permission_type = acl + else: + raise NotImplementedError( + "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." + .format(version) + ) + acl_error = Errors.for_code(error_code) + conv_acl = ACL( + principal=principal, + host=host, + operation=ACLOperation(operation), + permission_type=ACLPermissionType(permission_type), + resource_pattern=ResourcePattern( + ResourceType(resource_type), + resource_name, + ACLResourcePatternType(resource_pattern_type) + ) + ) + acl_result_list.append((conv_acl, acl_error,)) + filter_result_list.append((acl_filters[i], acl_result_list, filter_error,)) + return filter_result_list + + def delete_acls(self, acl_filters): + """Delete a set of ACLs + + Deletes all ACLs matching the list of input ACLFilter + + :param acl_filters: a list of ACLFilter + :return: a list of 3-tuples corresponding to the list of input filters. + The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance) + """ + + for acl in acl_filters: + if not isinstance(acl, ACLFilter): + raise IllegalArgumentError("acl_filters must contain ACLFilter type objects") + + version = self._matching_api_version(DeleteAclsRequest) + + if version == 0: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v0(acl) for acl in acl_filters] + ) + elif version <= 1: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters] + ) + else: + raise NotImplementedError( + "Support for DeleteAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + response = self._send_request_to_node(self._client.least_loaded_node(), request) + + return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) @staticmethod def _convert_describe_config_resource_request(config_resource): diff --git a/kafka/errors.py b/kafka/errors.py index f13f97853..abef2c5bf 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -443,6 +443,12 @@ class PolicyViolationError(BrokerResponseError): description = 'Request parameters do not satisfy the configured policy.' +class SecurityDisabledError(BrokerResponseError): + errno = 54 + message = 'SECURITY_DISABLED' + description = 'Security features are disabled.' + + class KafkaUnavailableError(KafkaError): pass diff --git a/test/test_admin.py b/test/test_admin.py index 300d5bced..279f85abf 100644 --- a/test/test_admin.py +++ b/test/test_admin.py @@ -26,6 +26,37 @@ def test_new_partitions(): assert good_partitions.new_assignments == [[1, 2, 3]] +def test_acl_resource(): + good_acl = kafka.admin.ACL( + "User:bar", + "*", + kafka.admin.ACLOperation.ALL, + kafka.admin.ACLPermissionType.ALLOW, + kafka.admin.ResourcePattern( + kafka.admin.ResourceType.TOPIC, + "foo", + kafka.admin.ACLResourcePatternType.LITERAL + ) + ) + + assert(good_acl.resource_pattern.resource_type == kafka.admin.ResourceType.TOPIC) + assert(good_acl.operation == kafka.admin.ACLOperation.ALL) + assert(good_acl.permission_type == kafka.admin.ACLPermissionType.ALLOW) + assert(good_acl.resource_pattern.pattern_type == kafka.admin.ACLResourcePatternType.LITERAL) + + with pytest.raises(IllegalArgumentError): + kafka.admin.ACL( + "User:bar", + "*", + kafka.admin.ACLOperation.ANY, + kafka.admin.ACLPermissionType.ANY, + kafka.admin.ResourcePattern( + kafka.admin.ResourceType.TOPIC, + "foo", + kafka.admin.ACLResourcePatternType.LITERAL + ) + ) + def test_new_topic(): with pytest.raises(IllegalArgumentError): bad_topic = kafka.admin.NewTopic('foo', -1, -1) From 367f8fd23a23329d43f3dfa763d12fcbc747cb3d Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Fri, 9 Aug 2019 13:33:59 +0200 Subject: [PATCH 2/3] Wait for future returned by _send_request_to_node() --- kafka/admin/client.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 224a46527..0e3e8a184 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -524,7 +524,10 @@ def describe_acls(self, acl_filter): .format(version) ) - response = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + response = future.value + error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: # optionally we could retry if error_type.retriable @@ -612,7 +615,10 @@ def create_acls(self, acls): .format(version) ) - response = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + response = future.value + return self._convert_create_acls_response_to_acls(acls, response) @@ -704,7 +710,9 @@ def delete_acls(self, acl_filters): .format(version) ) - response = self._send_request_to_node(self._client.least_loaded_node(), request) + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + response = future.value return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) From 23c712350c1372a08e7e638ecbcf3f5f31e13016 Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Fri, 9 Aug 2019 13:37:40 +0200 Subject: [PATCH 3/3] Add KafkaAdminClient acl integration test Had to add 2 lines to all of the kafka.properties files. One to enable the ACL feature in kafka, and the other to allow the test clients to do stuff without fiddling with authentication --- kafka/admin/acl_resource.py | 2 + servers/0.10.0.0/resources/kafka.properties | 3 + servers/0.10.0.1/resources/kafka.properties | 3 + servers/0.10.1.1/resources/kafka.properties | 3 + servers/0.10.2.1/resources/kafka.properties | 3 + servers/0.11.0.0/resources/kafka.properties | 3 + servers/0.11.0.1/resources/kafka.properties | 3 + servers/0.11.0.2/resources/kafka.properties | 3 + servers/0.9.0.0/resources/kafka.properties | 3 + servers/0.9.0.1/resources/kafka.properties | 3 + servers/1.0.0/resources/kafka.properties | 3 + servers/1.0.1/resources/kafka.properties | 3 + servers/1.0.2/resources/kafka.properties | 3 + servers/1.1.0/resources/kafka.properties | 3 + servers/1.1.1/resources/kafka.properties | 3 + servers/2.0.0/resources/kafka.properties | 3 + servers/2.0.1/resources/kafka.properties | 3 + test/test_admin_integration.py | 107 ++++++++++++++++++++ 18 files changed, 157 insertions(+) create mode 100644 test/test_admin_integration.py diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py index 4f54c83ba..7a012d2fa 100644 --- a/kafka/admin/acl_resource.py +++ b/kafka/admin/acl_resource.py @@ -75,6 +75,8 @@ class ACLFilter(object): The difference between this class and the ACL class is mainly that we allow using ANY with the operation, permission, and resource type objects to fetch ALCs matching any of the properties. + + To make a filter matching any principal, set principal to None """ def __init__( diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties index 7d8e2b1f0..534b7ba36 100644 --- a/servers/0.10.0.0/resources/kafka.properties +++ b/servers/0.10.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.10.0.1/resources/kafka.properties b/servers/0.10.0.1/resources/kafka.properties index 7d8e2b1f0..534b7ba36 100644 --- a/servers/0.10.0.1/resources/kafka.properties +++ b/servers/0.10.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.10.1.1/resources/kafka.properties b/servers/0.10.1.1/resources/kafka.properties index 7d8e2b1f0..534b7ba36 100644 --- a/servers/0.10.1.1/resources/kafka.properties +++ b/servers/0.10.1.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.10.2.1/resources/kafka.properties b/servers/0.10.2.1/resources/kafka.properties index 7d8e2b1f0..534b7ba36 100644 --- a/servers/0.10.2.1/resources/kafka.properties +++ b/servers/0.10.2.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.11.0.0/resources/kafka.properties b/servers/0.11.0.0/resources/kafka.properties index 28668db95..630dbc5fa 100644 --- a/servers/0.11.0.0/resources/kafka.properties +++ b/servers/0.11.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.11.0.1/resources/kafka.properties b/servers/0.11.0.1/resources/kafka.properties index 28668db95..630dbc5fa 100644 --- a/servers/0.11.0.1/resources/kafka.properties +++ b/servers/0.11.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.11.0.2/resources/kafka.properties b/servers/0.11.0.2/resources/kafka.properties index 28668db95..630dbc5fa 100644 --- a/servers/0.11.0.2/resources/kafka.properties +++ b/servers/0.11.0.2/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.9.0.0/resources/kafka.properties b/servers/0.9.0.0/resources/kafka.properties index b4c4088db..a8aaa284a 100644 --- a/servers/0.9.0.0/resources/kafka.properties +++ b/servers/0.9.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.9.0.1/resources/kafka.properties b/servers/0.9.0.1/resources/kafka.properties index 7d8e2b1f0..534b7ba36 100644 --- a/servers/0.9.0.1/resources/kafka.properties +++ b/servers/0.9.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.0.0/resources/kafka.properties b/servers/1.0.0/resources/kafka.properties index 28668db95..630dbc5fa 100644 --- a/servers/1.0.0/resources/kafka.properties +++ b/servers/1.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.0.1/resources/kafka.properties b/servers/1.0.1/resources/kafka.properties index 28668db95..630dbc5fa 100644 --- a/servers/1.0.1/resources/kafka.properties +++ b/servers/1.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.0.2/resources/kafka.properties b/servers/1.0.2/resources/kafka.properties index 28668db95..630dbc5fa 100644 --- a/servers/1.0.2/resources/kafka.properties +++ b/servers/1.0.2/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.1.0/resources/kafka.properties b/servers/1.1.0/resources/kafka.properties index 28668db95..630dbc5fa 100644 --- a/servers/1.1.0/resources/kafka.properties +++ b/servers/1.1.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.1.1/resources/kafka.properties b/servers/1.1.1/resources/kafka.properties index 64f94d528..fe6a89f4a 100644 --- a/servers/1.1.1/resources/kafka.properties +++ b/servers/1.1.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # List of enabled mechanisms, can be more than one sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN diff --git a/servers/2.0.0/resources/kafka.properties b/servers/2.0.0/resources/kafka.properties index 28668db95..630dbc5fa 100644 --- a/servers/2.0.0/resources/kafka.properties +++ b/servers/2.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/2.0.1/resources/kafka.properties b/servers/2.0.1/resources/kafka.properties index 28668db95..630dbc5fa 100644 --- a/servers/2.0.1/resources/kafka.properties +++ b/servers/2.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py new file mode 100644 index 000000000..0be192001 --- /dev/null +++ b/test/test_admin_integration.py @@ -0,0 +1,107 @@ +import pytest +import os + +from test.fixtures import ZookeeperFixture, KafkaFixture, version +from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset + +from kafka.errors import NoError +from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL + + +class TestAdminClientIntegration(KafkaIntegrationTestCase): + @classmethod + def setUpClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + + cls.zk = ZookeeperFixture.instance() + cls.server = KafkaFixture.instance(0, cls.zk) + + @classmethod + def tearDownClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + + cls.server.close() + cls.zk.close() + + @kafka_versions('>=0.9.0') + def test_create_describe_delete_acls(self): + """Tests that we can add, list and remove ACLs + """ + + # Setup + brokers = '%s:%d' % (self.server.host, self.server.port) + admin_client = KafkaAdminClient( + bootstrap_servers=brokers + ) + + # Check that we don't have any ACLs in the cluster + acls, error = admin_client.describe_acls( + ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ) + + self.assertIs(error, NoError) + self.assertEqual(0, len(acls)) + + # Try to add an ACL + acl = ACL( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + result = admin_client.create_acls([acl]) + + self.assertFalse(len(result["failed"])) + self.assertEqual(len(result["succeeded"]), 1) + + # Check that we can list the ACL we created + acl_filter = ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + acls, error = admin_client.describe_acls(acl_filter) + + self.assertIs(error, NoError) + self.assertEqual(1, len(acls)) + + # Remove the ACL + delete_results = admin_client.delete_acls( + [ + ACLFilter( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ] + ) + + self.assertEqual(1, len(delete_results)) + self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs + + + # Make sure the ACL does not exist in the cluster anymore + acls, error = admin_client.describe_acls( + ACLFilter( + principal="*", + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ) + self.assertIs(error, NoError) + self.assertEqual(0, len(acls))