diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index be84269da..6a4896585 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -26,14 +26,15 @@ def __init__(self, **config): raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration') self._is_done = False self._is_authenticated = False + self.gssapi_name = None if config.get('sasl_kerberos_name', None) is not None: self.auth_id = str(config['sasl_kerberos_name']) + if isinstance(config['sasl_kerberos_name'], gssapi.Name): + self.gssapi_name = config['sasl_kerberos_name'] else: kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '') self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_domain_name - if isinstance(config.get('sasl_kerberos_name', None), gssapi.Name): - self.gssapi_name = config['sasl_kerberos_name'] - else: + if self.gssapi_name is None: self.gssapi_name = gssapi.Name(self.auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos) self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate') self._next_token = self._client_ctx.step(None) @@ -43,9 +44,8 @@ def auth_bytes(self): # so mark is_done after the final auth_bytes are provided # in practice we'll still receive a response when using SaslAuthenticate # but not when using the prior unframed approach. - if self._client_ctx.complete: + if self._is_authenticated: self._is_done = True - self._is_authenticated = True return self._next_token or b'' def receive(self, auth_bytes): @@ -74,6 +74,13 @@ def receive(self, auth_bytes): ] # add authorization identity to the response, and GSS-wrap self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message + # We need to identify the last token in auth_bytes(); + # we can't rely on client_ctx.complete because it becomes True after generating + # the second-to-last token (after calling .step(auth_bytes) for the final time) + # We could introduce an additional state variable (i.e., self._final_token), + # but instead we just set _is_authenticated. Since the plugin interface does + # not read is_authenticated() until after is_done() is True, this should be fine. + self._is_authenticated = True def is_done(self): return self._is_done diff --git a/test/sasl/test_gssapi.py b/test/sasl/test_gssapi.py new file mode 100644 index 000000000..893414e37 --- /dev/null +++ b/test/sasl/test_gssapi.py @@ -0,0 +1,42 @@ +from __future__ import absolute_import + +try: + from unittest import mock +except ImportError: + import mock + +from kafka.sasl import get_sasl_mechanism +import kafka.sasl.gssapi + + +def test_gssapi(): + config = { + 'sasl_kerberos_domain_name': 'foo', + 'sasl_kerberos_service_name': 'bar', + } + client_ctx = mock.Mock() + client_ctx.step.side_effect = [b'init', b'exchange', b'complete', b'xxxx'] + client_ctx.complete = False + def mocked_message_wrapper(msg, *args): + wrapped = mock.Mock() + type(wrapped).message = mock.PropertyMock(return_value=msg) + return wrapped + client_ctx.unwrap.side_effect = mocked_message_wrapper + client_ctx.wrap.side_effect = mocked_message_wrapper + kafka.sasl.gssapi.gssapi = mock.Mock() + kafka.sasl.gssapi.gssapi.SecurityContext.return_value = client_ctx + gssapi = get_sasl_mechanism('GSSAPI')(**config) + assert isinstance(gssapi, kafka.sasl.gssapi.SaslMechanismGSSAPI) + client_ctx.step.assert_called_with(None) + + while not gssapi.is_done(): + send_token = gssapi.auth_bytes() + receive_token = send_token # not realistic, but enough for testing + if send_token == b'\x00cbar@foo': # final wrapped message + receive_token = b'' # final message gets an empty response + gssapi.receive(receive_token) + if client_ctx.step.call_count == 3: + client_ctx.complete = True + + assert gssapi.is_done() + assert gssapi.is_authenticated()