Skip to content

Commit d458486

Browse files
Harald-Berghoff88manpreet
authored andcommitted
added gssapi support (Kerberos) for SASL (dpkp#1152)
1 parent f2931eb commit d458486

File tree

1 file changed

+75
-2
lines changed

1 file changed

+75
-2
lines changed

kafka/conn.py

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ class SSLWantReadError(Exception):
5454
class SSLWantWriteError(Exception):
5555
pass
5656

57+
# needed for SASL_GSSAPI authentication:
58+
try:
59+
import gssapi
60+
from gssapi.raw.misc import GSSError
61+
except ImportError:
62+
#no gssapi available, will disable gssapi mechanism
63+
gssapi = None
64+
GSSError = None
65+
5766
class ConnectionStates(object):
5867
DISCONNECTING = '<disconnecting>'
5968
DISCONNECTED = '<disconnected>'
@@ -167,9 +176,13 @@ class BrokerConnection(object):
167176
'metric_group_prefix': '',
168177
'sasl_mechanism': 'PLAIN',
169178
'sasl_plain_username': None,
170-
'sasl_plain_password': None
179+
'sasl_plain_password': None,
180+
'sasl_kerberos_service_name':'kafka'
171181
}
172-
SASL_MECHANISMS = ('PLAIN',)
182+
if gssapi is None:
183+
SASL_MECHANISMS = ('PLAIN',)
184+
else:
185+
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
173186

174187
def __init__(self, host, port, afi, **configs):
175188
self.hostname = host
@@ -203,6 +216,9 @@ def __init__(self, host, port, afi, **configs):
203216
if self.config['sasl_mechanism'] == 'PLAIN':
204217
assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
205218
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
219+
if self.config['sasl_mechanism'] == 'GSSAPI':
220+
assert gssapi is not None, 'GSSAPI lib not available'
221+
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl'
206222

207223
self.state = ConnectionStates.DISCONNECTED
208224
self._reset_reconnect_backoff()
@@ -445,6 +461,8 @@ def _handle_sasl_handshake_response(self, future, response):
445461

446462
if self.config['sasl_mechanism'] == 'PLAIN':
447463
return self._try_authenticate_plain(future)
464+
elif self.config['sasl_mechanism'] == 'GSSAPI':
465+
return self._try_authenticate_gssapi(future)
448466
else:
449467
return future.failure(
450468
Errors.UnsupportedSaslMechanismError(
@@ -489,6 +507,61 @@ def _try_authenticate_plain(self, future):
489507

490508
return future.success(True)
491509

510+
def _try_authenticate_gssapi(self, future):
511+
512+
data = b''
513+
gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
514+
ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
515+
ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
516+
log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
517+
ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
518+
#Exchange tokens until authentication either suceeded or failed:
519+
received_token = None
520+
try:
521+
while not ctx_Context.complete:
522+
#calculate the output token
523+
try:
524+
output_token = ctx_Context.step(received_token)
525+
except GSSError as e:
526+
log.exception("%s: Error invalid token received from server", self)
527+
error = Errors.ConnectionError("%s: %s" % (self, e))
528+
529+
if not output_token:
530+
if ctx_Context.complete:
531+
log.debug("%s: Security Context complete ", self)
532+
log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name)
533+
break
534+
try:
535+
self._sock.setblocking(True)
536+
# Send output token
537+
msg = output_token
538+
size = Int32.encode(len(msg))
539+
self._sock.sendall(size + msg)
540+
541+
# The server will send a token back. processing of this token either
542+
# establishes a security context, or needs further token exchange
543+
# the gssapi will be able to identify the needed next step
544+
# The connection is closed on failure
545+
response = self._sock.recv(2000)
546+
self._sock.setblocking(False)
547+
548+
except (AssertionError, ConnectionError) as e:
549+
log.exception("%s: Error receiving reply from server", self)
550+
error = Errors.ConnectionError("%s: %s" % (self, e))
551+
future.failure(error)
552+
self.close(error=error)
553+
554+
#pass the received token back to gssapi, strip the first 4 bytes
555+
received_token = response[4:]
556+
557+
except Exception as e:
558+
log.exception("%s: GSSAPI handshake error", self)
559+
error = Errors.ConnectionError("%s: %s" % (self, e))
560+
future.failure(error)
561+
self.close(error=error)
562+
563+
return future.success(True)
564+
492565
def blacked_out(self):
493566
"""
494567
Return true if we are disconnected from the given node and can't

0 commit comments

Comments
 (0)