From 2582245c15396e683689c4298a351271e470fb82 Mon Sep 17 00:00:00 2001 From: Phong Pham Date: Wed, 20 Mar 2019 15:28:25 -0400 Subject: [PATCH 1/5] Initial commit for OAuthBearer support --- kafka/admin/client.py | 15 +++++++++ kafka/client_async.py | 17 +++++++++- kafka/conn.py | 72 +++++++++++++++++++++++++++++++++++++++-- kafka/consumer/group.py | 17 +++++++++- kafka/producer/kafka.py | 17 +++++++++- 5 files changed, 132 insertions(+), 6 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index d02a68a19..9010117b0 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -133,6 +133,20 @@ class KafkaAdminClient(object): Default: None sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' + sasl_oauth_token_provider (Object): OAuthBearer token provider instance. + THE FOLLOWING INTERFACE MUST BE FULFILLED: + (required) #token(): Returns an ID/Access Token to be sent to the Kafka + client. The implementation should ensure token reuse so that multiple + calls at connect time do not create multiple tokens. The implementation + should also periodically refresh the token in order to guarantee + that each call returns an unexpired token. A timeout error should + be returned after a short period of inactivity so that the + broker can log debugging info and retry. + (OPTIONAL) #extensions() - Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not provided, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + Default: None """ DEFAULT_CONFIG = { @@ -166,6 +180,7 @@ class KafkaAdminClient(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', + 'sasl_oauth_token_provider': None, # metrics configs 'metric_reporters': [], diff --git a/kafka/client_async.py b/kafka/client_async.py index fdf5454f6..9784b7e2f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -146,6 +146,20 @@ class KafkaClient(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (Object): OAuthBearer token provider instance. + THE FOLLOWING INTERFACE MUST BE FULFILLED: + (required) #token(): Returns an ID/Access Token to be sent to the Kafka + client. The implementation should ensure token reuse so that multiple + calls at connect time do not create multiple tokens. The implementation + should also periodically refresh the token in order to guarantee + that each call returns an unexpired token. A timeout error should + be returned after a short period of inactivity so that the + broker can log debugging info and retry. + (OPTIONAL) #extensions() - Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not provided, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + Default: None """ DEFAULT_CONFIG = { @@ -182,7 +196,8 @@ class KafkaClient(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } def __init__(self, **configs): diff --git a/kafka/conn.py b/kafka/conn.py index e857d0ac5..eb10b94b7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -179,6 +179,21 @@ class BrokerConnection(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (Object): OAuthBearer token provider instance + that implements method 'token'. + THE FOLLOWING INTERFACE MUST BE FULFILLED: + (required) #token(): Returns an ID/Access Token to be sent to the Kafka + client. The implementation should ensure token reuse so that multiple + calls at connect time do not create multiple tokens. The implementation + should also periodically refresh the token in order to guarantee + that each call returns an unexpired token. A timeout error should + be returned after a short period of inactivity so that the + broker can log debugging info and retry. + (OPTIONAL) #extensions() - Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not provided, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + Default: None """ DEFAULT_CONFIG = { @@ -210,10 +225,11 @@ class BrokerConnection(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI') + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER') def __init__(self, host, port, afi, **configs): self.host = host @@ -257,7 +273,10 @@ def __init__(self, host, port, afi, **configs): if self.config['sasl_mechanism'] == 'GSSAPI': assert gssapi is not None, 'GSSAPI lib not available' assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' - + if self.config['sasl_mechanism'] == 'OAUTHBEARER': + token_provider = self.config['sasl_oauth_token_provider'] + assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' + assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to @@ -536,6 +555,8 @@ def _handle_sasl_handshake_response(self, future, response): return self._try_authenticate_plain(future) elif self.config['sasl_mechanism'] == 'GSSAPI': return self._try_authenticate_gssapi(future) + elif self.config['sasl_mechanism'] == 'OAUTHBEARER': + return self._try_authenticate_oauth(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( @@ -659,6 +680,51 @@ def _try_authenticate_gssapi(self, future): log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) return future.success(True) + def _try_authenticate_oauth(self, future): + data = b'' + # Send PLAIN credentials per RFC-4616 + msg = bytes(self._build_oauth_client_request().encode("utf-8")) + size = Int32.encode(len(msg)) + try: + # Send SASL OAuthBearer request with OAuth token + self._send_bytes_blocking(size + msg) + + # The server will send a zero sized message (that is Int32(0)) on success. + # The connection is closed on failure + data = self._recv_bytes_blocking(4) + + except ConnectionError as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return future.failure(error) + + if data != b'\x00\x00\x00\x00': + error = Errors.AuthenticationFailedError('Unrecognized response during authentication') + return future.failure(error) + + log.info('%s: Authenticated via OAuth', self) + return future.success(True) + + def _build_oauth_client_request(self): + token_provider = self.config['sasl_oauth_token_provider'] + return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions()) + + def _token_extensions(self): + """ + Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER + initial request. + """ + token_provider = self.config['sasl_oauth_token_provider'] + + # Only run if the #extensions() method is implemented by the clients Token Provider class + # Builds up a string separated by \x01 via a dict of key value pairs + if callable(getattr(token_provider, "extensions", None)): + msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) + return "\x01" + msg + else: + return "" + def blacked_out(self): """ Return true if we are disconnected from the given node and can't diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f52189188..9578c9c18 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -235,6 +235,20 @@ class KafkaConsumer(six.Iterator): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (Object): OAuthBearer token provider instance. + THE FOLLOWING INTERFACE MUST BE FULFILLED: + (required) #token(): Returns an ID/Access Token to be sent to the Kafka + client. The implementation should ensure token reuse so that multiple + calls at connect time do not create multiple tokens. The implementation + should also periodically refresh the token in order to guarantee + that each call returns an unexpired token. A timeout error should + be returned after a short period of inactivity so that the + broker can log debugging info and retry. + (OPTIONAL) #extensions() - Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not provided, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + Default: None Note: Configuration parameters are described in more detail at @@ -293,7 +307,8 @@ class KafkaConsumer(six.Iterator): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index ccdd91ad4..60bd6777e 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -272,6 +272,20 @@ class KafkaProducer(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (Object): OAuthBearer token provider instance. + THE FOLLOWING INTERFACE MUST BE FULFILLED: + (required) #token(): Returns an ID/Access Token to be sent to the Kafka + client. The implementation should ensure token reuse so that multiple + calls at connect time do not create multiple tokens. The implementation + should also periodically refresh the token in order to guarantee + that each call returns an unexpired token. A timeout error should + be returned after a short period of inactivity so that the + broker can log debugging info and retry. + (OPTIONAL) #extensions() - Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not provided, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + Default: None Note: Configuration parameters are described in more detail at @@ -322,7 +336,8 @@ class KafkaProducer(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } _COMPRESSORS = { From 971aa1bceb4383e92bc4e28ac3d8480dc14defee Mon Sep 17 00:00:00 2001 From: Phong Pham Date: Thu, 21 Mar 2019 15:20:30 -0400 Subject: [PATCH 2/5] Added Abstract Base Class for Token Provider and improved corresponding docs --- kafka/admin/client.py | 16 ++-------------- kafka/client_async.py | 16 ++-------------- kafka/conn.py | 22 +++++----------------- kafka/consumer/group.py | 16 ++-------------- kafka/oauth/__init__.py | 3 +++ kafka/oauth/abstract.py | 38 ++++++++++++++++++++++++++++++++++++++ kafka/producer/kafka.py | 16 ++-------------- 7 files changed, 54 insertions(+), 73 deletions(-) create mode 100644 kafka/oauth/__init__.py create mode 100644 kafka/oauth/abstract.py diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 9010117b0..39f7e1af7 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -133,20 +133,8 @@ class KafkaAdminClient(object): Default: None sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' - sasl_oauth_token_provider (Object): OAuthBearer token provider instance. - THE FOLLOWING INTERFACE MUST BE FULFILLED: - (required) #token(): Returns an ID/Access Token to be sent to the Kafka - client. The implementation should ensure token reuse so that multiple - calls at connect time do not create multiple tokens. The implementation - should also periodically refresh the token in order to guarantee - that each call returns an unexpired token. A timeout error should - be returned after a short period of inactivity so that the - broker can log debugging info and retry. - (OPTIONAL) #extensions() - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not provided, the values are ignored. This feature is only available - in Kafka >= 2.1.0. - Default: None + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None """ DEFAULT_CONFIG = { diff --git a/kafka/client_async.py b/kafka/client_async.py index 9784b7e2f..b2527a361 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -146,20 +146,8 @@ class KafkaClient(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (Object): OAuthBearer token provider instance. - THE FOLLOWING INTERFACE MUST BE FULFILLED: - (required) #token(): Returns an ID/Access Token to be sent to the Kafka - client. The implementation should ensure token reuse so that multiple - calls at connect time do not create multiple tokens. The implementation - should also periodically refresh the token in order to guarantee - that each call returns an unexpired token. A timeout error should - be returned after a short period of inactivity so that the - broker can log debugging info and retry. - (OPTIONAL) #extensions() - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not provided, the values are ignored. This feature is only available - in Kafka >= 2.1.0. - Default: None + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None """ DEFAULT_CONFIG = { diff --git a/kafka/conn.py b/kafka/conn.py index eb10b94b7..6ff5b95d2 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -25,6 +25,7 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate +from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest @@ -179,21 +180,8 @@ class BrokerConnection(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (Object): OAuthBearer token provider instance - that implements method 'token'. - THE FOLLOWING INTERFACE MUST BE FULFILLED: - (required) #token(): Returns an ID/Access Token to be sent to the Kafka - client. The implementation should ensure token reuse so that multiple - calls at connect time do not create multiple tokens. The implementation - should also periodically refresh the token in order to guarantee - that each call returns an unexpired token. A timeout error should - be returned after a short period of inactivity so that the - broker can log debugging info and retry. - (OPTIONAL) #extensions() - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not provided, the values are ignored. This feature is only available - in Kafka >= 2.1.0. - Default: None + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None """ DEFAULT_CONFIG = { @@ -276,7 +264,7 @@ def __init__(self, host, port, afi, **configs): if self.config['sasl_mechanism'] == 'OAUTHBEARER': token_provider = self.config['sasl_oauth_token_provider'] assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' - assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' + assert isinstance(token_provider, AbstractTokenProvider), 'sasl_oauth_token_provider must implement AbstractTokenProvider' # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to @@ -682,7 +670,7 @@ def _try_authenticate_gssapi(self, future): def _try_authenticate_oauth(self, future): data = b'' - # Send PLAIN credentials per RFC-4616 + msg = bytes(self._build_oauth_client_request().encode("utf-8")) size = Int32.encode(len(msg)) try: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 9578c9c18..f9ddabc7e 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -235,20 +235,8 @@ class KafkaConsumer(six.Iterator): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (Object): OAuthBearer token provider instance. - THE FOLLOWING INTERFACE MUST BE FULFILLED: - (required) #token(): Returns an ID/Access Token to be sent to the Kafka - client. The implementation should ensure token reuse so that multiple - calls at connect time do not create multiple tokens. The implementation - should also periodically refresh the token in order to guarantee - that each call returns an unexpired token. A timeout error should - be returned after a short period of inactivity so that the - broker can log debugging info and retry. - (OPTIONAL) #extensions() - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not provided, the values are ignored. This feature is only available - in Kafka >= 2.1.0. - Default: None + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None Note: Configuration parameters are described in more detail at diff --git a/kafka/oauth/__init__.py b/kafka/oauth/__init__.py new file mode 100644 index 000000000..8c8349564 --- /dev/null +++ b/kafka/oauth/__init__.py @@ -0,0 +1,3 @@ +from __future__ import absolute_import + +from kafka.oauth.abstract import AbstractTokenProvider diff --git a/kafka/oauth/abstract.py b/kafka/oauth/abstract.py new file mode 100644 index 000000000..966d0e19c --- /dev/null +++ b/kafka/oauth/abstract.py @@ -0,0 +1,38 @@ +from __future__ import absolute_import + +import abc + +# This statement is compatible with both Python 2.7 & 3+ +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + +class AbstractTokenProvider(ABC): + """ + A Token Provider must be used for the SASL OAuthBearer protocol. + + The implementation shsould ensure token reuse so that multiple + calls at connect time do not create multiple tokens. The implementation + should also periodically refresh the token in order to guarantee + that each call returns an unexpired token. A timeout error should + be returned after a short period of inactivity so that the + broker can log debugging info and retry. + + Token Providers MUST be implemented from this ABC. + + An optional method that may be implemented if the user chooses is: + #extensions() - Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not provided, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + """ + + def __init__(self, **config): + pass + + @abc.abstractmethod + def token(self): + """ + Returns a (str) ID/Access Token to be sent to the Kafka + client. + """ + pass + diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 60bd6777e..2c76c59d4 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -272,20 +272,8 @@ class KafkaProducer(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - sasl_oauth_token_provider (Object): OAuthBearer token provider instance. - THE FOLLOWING INTERFACE MUST BE FULFILLED: - (required) #token(): Returns an ID/Access Token to be sent to the Kafka - client. The implementation should ensure token reuse so that multiple - calls at connect time do not create multiple tokens. The implementation - should also periodically refresh the token in order to guarantee - that each call returns an unexpired token. A timeout error should - be returned after a short period of inactivity so that the - broker can log debugging info and retry. - (OPTIONAL) #extensions() - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not provided, the values are ignored. This feature is only available - in Kafka >= 2.1.0. - Default: None + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None Note: Configuration parameters are described in more detail at From 7fc9cef1188c86c4d7743f95c37826232d2113b4 Mon Sep 17 00:00:00 2001 From: Phong Pham Date: Fri, 22 Mar 2019 11:16:22 -0400 Subject: [PATCH 3/5] Handle empty extensions map, fix documentation --- kafka/conn.py | 2 +- kafka/oauth/abstract.py | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 6ff5b95d2..eb22f4302 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -707,7 +707,7 @@ def _token_extensions(self): # Only run if the #extensions() method is implemented by the clients Token Provider class # Builds up a string separated by \x01 via a dict of key value pairs - if callable(getattr(token_provider, "extensions", None)): + if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0: msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) return "\x01" + msg else: diff --git a/kafka/oauth/abstract.py b/kafka/oauth/abstract.py index 966d0e19c..0b4c62e92 100644 --- a/kafka/oauth/abstract.py +++ b/kafka/oauth/abstract.py @@ -9,7 +9,7 @@ class AbstractTokenProvider(ABC): """ A Token Provider must be used for the SASL OAuthBearer protocol. - The implementation shsould ensure token reuse so that multiple + The implementation should ensure token reuse so that multiple calls at connect time do not create multiple tokens. The implementation should also periodically refresh the token in order to guarantee that each call returns an unexpired token. A timeout error should @@ -17,12 +17,6 @@ class AbstractTokenProvider(ABC): broker can log debugging info and retry. Token Providers MUST be implemented from this ABC. - - An optional method that may be implemented if the user chooses is: - #extensions() - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not provided, the values are ignored. This feature is only available - in Kafka >= 2.1.0. """ def __init__(self, **config): @@ -36,3 +30,13 @@ def token(self): """ pass + def extensions(self): + """ + This is an OPTIONAL method that may be implemented. + + Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not implemented, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + """ + pass From acfcd867f6969c917c4f61f406c2ca0d36748654 Mon Sep 17 00:00:00 2001 From: Phong Pham Date: Fri, 22 Mar 2019 11:29:34 -0400 Subject: [PATCH 4/5] Fix abstract token provider to return empty dict with extensions() by default --- kafka/oauth/abstract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/oauth/abstract.py b/kafka/oauth/abstract.py index 0b4c62e92..fc4e5ad57 100644 --- a/kafka/oauth/abstract.py +++ b/kafka/oauth/abstract.py @@ -39,4 +39,4 @@ def extensions(self): not implemented, the values are ignored. This feature is only available in Kafka >= 2.1.0. """ - pass + return {} From f925328e99173bbaa09431eccefe822747215eb1 Mon Sep 17 00:00:00 2001 From: Phong Pham Date: Fri, 22 Mar 2019 11:32:10 -0400 Subject: [PATCH 5/5] Change subclass check for duck-typing check --- kafka/conn.py | 2 +- kafka/oauth/abstract.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index eb22f4302..7a1eb2332 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -264,7 +264,7 @@ def __init__(self, host, port, afi, **configs): if self.config['sasl_mechanism'] == 'OAUTHBEARER': token_provider = self.config['sasl_oauth_token_provider'] assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' - assert isinstance(token_provider, AbstractTokenProvider), 'sasl_oauth_token_provider must implement AbstractTokenProvider' + assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to diff --git a/kafka/oauth/abstract.py b/kafka/oauth/abstract.py index fc4e5ad57..8d89ff51d 100644 --- a/kafka/oauth/abstract.py +++ b/kafka/oauth/abstract.py @@ -16,7 +16,7 @@ class AbstractTokenProvider(ABC): be returned after a short period of inactivity so that the broker can log debugging info and retry. - Token Providers MUST be implemented from this ABC. + Token Providers MUST implement the token() method """ def __init__(self, **config):