Skip to content

Minor SASL documentation and logging fixes #1231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 3, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class BrokerConnection(object):
to apply to broker connection sockets. Default:
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
Expand Down Expand Up @@ -145,13 +146,15 @@ class BrokerConnection(object):
metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None.
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): string picking sasl mechanism when security_protocol
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
Default: None
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI. Default: PLAIN
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
"""

DEFAULT_CONFIG = {
Expand Down Expand Up @@ -179,12 +182,10 @@ class BrokerConnection(object):
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name':'kafka'
'sasl_kerberos_service_name': 'kafka'
}
if gssapi is None:
SASL_MECHANISMS = ('PLAIN',)
else:
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')

def __init__(self, host, port, afi, **configs):
self.hostname = host
Expand Down Expand Up @@ -213,6 +214,9 @@ def __init__(self, host, port, afi, **configs):
(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes']))

assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, (
'security_protcol must be in ' + ', '.join(self.SECURITY_PROTOCOLS))

if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
assert ssl_available, "Python wasn't built with SSL support"

Expand All @@ -224,7 +228,7 @@ def __init__(self, host, port, afi, **configs):
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'

self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
Expand Down Expand Up @@ -332,6 +336,7 @@ def connect(self):
log.debug('%s: initiating SASL authentication', self)
self.state = ConnectionStates.AUTHENTICATING
else:
# security_protocol PLAINTEXT
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
Expand Down Expand Up @@ -367,7 +372,6 @@ def connect(self):
if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
if self._try_authenticate():
log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
Expand Down Expand Up @@ -500,21 +504,21 @@ def _try_authenticate_plain(self, future):
if data != b'\x00\x00\x00\x00':
return future.failure(Errors.AuthenticationFailedError())

log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
return future.success(True)

def _try_authenticate_gssapi(self, future):

data = b''
gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
#Exchange tokens until authentication either suceeded or failed:
ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
# Exchange tokens until authentication either succeeds or fails:
received_token = None
try:
while not ctx_Context.complete:
#calculate the output token
# calculate the output token
try:
output_token = ctx_Context.step(received_token)
except GSSError as e:
Expand All @@ -533,10 +537,10 @@ def _try_authenticate_gssapi(self, future):
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)

# The server will send a token back. processing of this token either
# establishes a security context, or needs further token exchange
# the gssapi will be able to identify the needed next step
# The connection is closed on failure
# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
response = self._sock.recv(2000)
self._sock.setblocking(False)

Expand All @@ -546,7 +550,7 @@ def _try_authenticate_gssapi(self, future):
future.failure(error)
self.close(error=error)

#pass the received token back to gssapi, strip the first 4 bytes
# pass the received token back to gssapi, strip the first 4 bytes
received_token = response[4:]

except Exception as e:
Expand All @@ -555,6 +559,7 @@ def _try_authenticate_gssapi(self, future):
future.failure(error)
self.close(error=error)

log.info('%s: Authenticated as %s', self, gssname)
return future.success(True)

def blacked_out(self):
Expand Down