Skip to content

Commit b602975

Browse files
author
Ulrik Johansson
committed
Add KafkaAdmin ACL methods
1 parent ace5a76 commit b602975

File tree

3 files changed

+221
-6
lines changed

3 files changed

+221
-6
lines changed

kafka/admin/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from __future__ import absolute_import
22

33
from kafka.admin.config_resource import ConfigResource, ConfigResourceType
4+
from kafka.admin.acl_resource import (AclResource, AclOperation, AclResourceType, AclPermissionType,
5+
AclResourcePatternType)
46
from kafka.admin.kafka import KafkaAdmin
57
from kafka.admin.new_topic import NewTopic
68
from kafka.admin.new_partitions import NewPartitions
79

810
__all__ = [
9-
'ConfigResource', 'ConfigResourceType', 'KafkaAdmin', 'NewTopic', 'NewPartitions'
11+
'ConfigResource', 'ConfigResourceType', 'KafkaAdmin', 'NewTopic', 'NewPartitions', 'AclResource', 'AclOperation',
12+
'AclResourceType', 'AclPermissionType', 'AclResourcePatternType'
1013
]

kafka/admin/acl_resource.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from __future__ import absolute_import
2+
3+
# enum in stdlib as of py3.4
4+
try:
5+
from enum import IntEnum # pylint: disable=import-error
6+
except ImportError:
7+
# vendored backport module
8+
from kafka.vendor.enum34 import IntEnum
9+
10+
class AclResourceType(IntEnum):
11+
"""An enumerated type of config resources"""
12+
13+
ANY = 1,
14+
BROKER = 4,
15+
DELEGATION_TOKEN = 6,
16+
GROUP = 3,
17+
TOPIC = 2,
18+
TRANSACTIONAL_ID = 5
19+
20+
class AclOperation(IntEnum):
21+
"""An enumerated type of acl operations"""
22+
23+
ANY = 1,
24+
ALL = 2,
25+
READ = 3,
26+
WRITE = 4,
27+
CREATE = 5,
28+
DELETE = 6,
29+
ALTER = 7,
30+
DESCRIBE = 8,
31+
CLUSTER_ACTION = 9,
32+
DESCRIBE_CONFIGS = 10,
33+
ALTER_CONFIGS = 11,
34+
IDEMPOTENT_WRITE = 12
35+
36+
37+
class AclPermissionType(IntEnum):
38+
"""An enumerated type of permissions"""
39+
40+
ANY = 1,
41+
DENY = 2,
42+
ALLOW = 3
43+
44+
class AclResourcePatternType(IntEnum):
45+
"""An enumerated type of resource patterns"""
46+
47+
ANY = 1,
48+
MATCH = 2,
49+
LITERAL = 3,
50+
PREFIXED = 4
51+
52+
class AclResource(object):
53+
"""A class for specifying config resources.
54+
Arguments:
55+
resource_type (ConfigResourceType): the type of kafka resource
56+
name (string): The name of the kafka resource
57+
configs ({key : value}): A maps of config keys to values.
58+
"""
59+
60+
def __init__(
61+
self,
62+
resource_type,
63+
operation,
64+
permission_type,
65+
name=None,
66+
principal=None,
67+
host=None,
68+
pattern_type=AclResourcePatternType.LITERAL
69+
):
70+
if not isinstance(resource_type, AclResourceType):
71+
resource_type = AclResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object
72+
self.resource_type = resource_type
73+
if not isinstance(operation, AclOperation):
74+
operation = AclOperation[str(operation).upper()] # pylint: disable-msg:unsubscriptable-object
75+
self.operation = operation
76+
if not isinstance(permission_type, AclPermissionType):
77+
permission_type = AclPermissionType[str(permission_type).upper()] # pylint: disable-msg=unsubscriptable-object
78+
self.permission_type = permission_type
79+
self.name = name
80+
self.principal = principal
81+
self.host = host
82+
if not isinstance(pattern_type, AclResourcePatternType):
83+
pattern_type = AclResourcePatternType[str(pattern_type).upper()] # pylint: disable-msg=unsubscriptable-object
84+
self.pattern_type = pattern_type

kafka/admin/kafka.py

Lines changed: 133 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
import socket
66
from kafka.client_async import KafkaClient, selectors
77
from kafka.errors import (
8-
KafkaConfigurationError, UnsupportedVersionError, NodeNotReadyError, NotControllerError, KafkaConnectionError)
8+
KafkaConfigurationError, UnsupportedVersionError, NodeNotReadyError, NotControllerError, KafkaConnectionError,
9+
IllegalArgumentError)
910
from kafka.metrics import MetricConfig, Metrics
1011
from kafka.protocol.admin import (
1112
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
12-
ListGroupsRequest, DescribeGroupsRequest)
13+
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
1314
from kafka.protocol.metadata import MetadataRequest
15+
from kafka.admin.acl_resource import AclOperation, AclPermissionType
1416
from kafka.version import __version__
1517

1618
log = logging.getLogger(__name__)
@@ -358,11 +360,137 @@ def delete_topics(self, topics, timeout_ms=None):
358360

359361
# describe cluster functionality is in ClusterMetadata
360362

361-
# describe_acls protocol not implemented
363+
def describe_acls(self, acl_resource):
364+
"""Describe a set of ACLs
365+
"""
366+
367+
version = self._matching_api_version(DescribeAclsRequest)
368+
if version == 0:
369+
request = DescribeAclsRequest[version](
370+
resource_type=acl_resource.resource_type,
371+
resource_name=acl_resource.name,
372+
principal=acl_resource.principal,
373+
host=acl_resource.host,
374+
operation=acl_resource.operation,
375+
permission_type=acl_resource.permission_type
376+
)
377+
elif version <= 1:
378+
request = DescribeAclsRequest[version](
379+
resource_type=acl_resource.resource_type,
380+
resource_name=acl_resource.name,
381+
resource_pattern_type_filter=acl_resource.pattern_type,
382+
principal=acl_resource.principal,
383+
host=acl_resource.host,
384+
operation=acl_resource.operation,
385+
permission_type=acl_resource.permission_type
386+
387+
)
388+
else:
389+
raise UnsupportedVersionError(
390+
"missing implementation of DescribeAcls for library supported version {}"
391+
.format(version)
392+
)
393+
394+
return self._send(request)
395+
396+
@staticmethod
397+
def _convert_create_acls_resource_request_v0(acl_resource):
398+
if acl_resource.operation == AclOperation.ANY:
399+
raise IllegalArgumentError("operation must not be ANY")
400+
if acl_resource.permission_type == AclPermissionType.ANY:
401+
raise IllegalArgumentError("permission_type must not be ANY")
402+
403+
return (
404+
acl_resource.resource_type,
405+
acl_resource.name,
406+
acl_resource.principal,
407+
acl_resource.host,
408+
acl_resource.operation,
409+
acl_resource.permission_type
410+
)
411+
412+
@staticmethod
413+
def _convert_create_acls_resource_request_v1(acl_resource):
414+
415+
if acl_resource.operation == AclOperation.ANY:
416+
raise IllegalArgumentError("operation must not be ANY")
417+
if acl_resource.permission_type == AclPermissionType.ANY:
418+
raise IllegalArgumentError("permission_type must not be ANY")
419+
420+
return (
421+
acl_resource.resource_type,
422+
acl_resource.name,
423+
acl_resource.pattern_type,
424+
acl_resource.principal,
425+
acl_resource.host,
426+
acl_resource.operation,
427+
acl_resource.permission_type
428+
)
429+
430+
def create_acls(self, acl_resources):
431+
"""Create a set of ACLs"""
432+
433+
version = self._matching_api_version(DescribeAclsRequest)
434+
if version == 0:
435+
request = CreateAclsRequest[version](
436+
creations=[self._convert_create_acls_resource_request_v0(acl_resource) for acl_resource in acl_resources]
437+
)
438+
elif version <= 1:
439+
request = CreateAclsRequest[version](
440+
creations=[self._convert_create_acls_resource_request_v1(acl_resource) for acl_resource in acl_resources]
441+
)
442+
else:
443+
raise UnsupportedVersionError(
444+
"missing implementation of DescribeAcls for library supported version {}"
445+
.format(version)
446+
)
447+
448+
449+
return self._send(request)
450+
451+
@staticmethod
452+
def _convert_delete_acls_resource_request_v0(acl_resource):
453+
return (
454+
acl_resource.resource_type,
455+
acl_resource.name,
456+
acl_resource.principal,
457+
acl_resource.host,
458+
acl_resource.operation,
459+
acl_resource.permission_type
460+
)
461+
462+
@staticmethod
463+
def _convert_delete_acls_resource_request_v1(acl_resource):
464+
return (
465+
acl_resource.resource_type,
466+
acl_resource.name,
467+
acl_resource.pattern_type,
468+
acl_resource.principal,
469+
acl_resource.host,
470+
acl_resource.operation,
471+
acl_resource.permission_type
472+
)
473+
474+
def delete_acls(self, acl_resources):
475+
"""Delete a set of ACLSs"""
362476

363-
# create_acls protocol not implemented
477+
version = self._matching_api_version(DescribeAclsRequest)
478+
479+
if version == 0:
480+
request = DeleteAclsRequest[version](
481+
filters=[self._convert_delete_acls_resource_request_v0(acl_resource) for acl_resource in acl_resources]
482+
)
483+
elif version <= 1:
484+
request = DeleteAclsRequest[version](
485+
filters=[self._convert_delete_acls_resource_request_v1(acl_resource) for acl_resource in acl_resources]
486+
)
487+
else:
488+
raise UnsupportedVersionError(
489+
"missing implementation of DescribeAcls for library supported version {}"
490+
.format(version)
491+
)
364492

365-
# delete_acls protocol not implemented
493+
return self._send(request)
366494

367495
@staticmethod
368496
def _convert_describe_config_resource_request(config_resource):

0 commit comments

Comments
 (0)