@@ -112,7 +112,8 @@ class BrokerConnection(object):
112
112
to apply to broker connection sockets. Default:
113
113
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
114
114
security_protocol (str): Protocol used to communicate with brokers.
115
- Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
115
+ Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
116
+ Default: PLAINTEXT.
116
117
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
117
118
socket connections. If provided, all other ssl_* configurations
118
119
will be ignored. Default: None.
@@ -145,13 +146,15 @@ class BrokerConnection(object):
145
146
metrics (kafka.metrics.Metrics): Optionally provide a metrics
146
147
instance for capturing network IO stats. Default: None.
147
148
metric_group_prefix (str): Prefix for metric names. Default: ''
148
- sasl_mechanism (str): string picking sasl mechanism when security_protocol
149
- is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
150
- Default: None
149
+ sasl_mechanism (str): Authentication mechanism when security_protocol
150
+ is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
151
+ PLAIN, GSSAPI. Default: PLAIN
151
152
sasl_plain_username (str): username for sasl PLAIN authentication.
152
153
Default: None
153
154
sasl_plain_password (str): password for sasl PLAIN authentication.
154
155
Default: None
156
+ sasl_kerberos_service_name (str): Service name to include in GSSAPI
157
+ sasl mechanism handshake. Default: 'kafka'
155
158
"""
156
159
157
160
DEFAULT_CONFIG = {
@@ -179,12 +182,10 @@ class BrokerConnection(object):
179
182
'sasl_mechanism' : 'PLAIN' ,
180
183
'sasl_plain_username' : None ,
181
184
'sasl_plain_password' : None ,
182
- 'sasl_kerberos_service_name' :'kafka'
185
+ 'sasl_kerberos_service_name' : 'kafka'
183
186
}
184
- if gssapi is None :
185
- SASL_MECHANISMS = ('PLAIN' ,)
186
- else :
187
- SASL_MECHANISMS = ('PLAIN' , 'GSSAPI' )
187
+ SECURITY_PROTOCOLS = ('PLAINTEXT' , 'SSL' , 'SASL_PLAINTEXT' , 'SASL_SSL' )
188
+ SASL_MECHANISMS = ('PLAIN' , 'GSSAPI' )
188
189
189
190
def __init__ (self , host , port , afi , ** configs ):
190
191
self .hostname = host
@@ -210,6 +211,9 @@ def __init__(self, host, port, afi, **configs):
210
211
(socket .SOL_SOCKET , socket .SO_SNDBUF ,
211
212
self .config ['send_buffer_bytes' ]))
212
213
214
+ assert self .config ['security_protocol' ] in self .SECURITY_PROTOCOLS , (
215
+ 'security_protcol must be in ' + ', ' .join (self .SECURITY_PROTOCOLS ))
216
+
213
217
if self .config ['security_protocol' ] in ('SSL' , 'SASL_SSL' ):
214
218
assert ssl_available , "Python wasn't built with SSL support"
215
219
@@ -221,7 +225,7 @@ def __init__(self, host, port, afi, **configs):
221
225
assert self .config ['sasl_plain_password' ] is not None , 'sasl_plain_password required for PLAIN sasl'
222
226
if self .config ['sasl_mechanism' ] == 'GSSAPI' :
223
227
assert gssapi is not None , 'GSSAPI lib not available'
224
- assert self .config ['sasl_kerberos_service_name' ] is not None , 'sasl_servicename_kafka required for GSSAPI sasl'
228
+ assert self .config ['sasl_kerberos_service_name' ] is not None , 'sasl_kerberos_service_name required for GSSAPI sasl'
225
229
226
230
self .state = ConnectionStates .DISCONNECTED
227
231
self ._reset_reconnect_backoff ()
@@ -340,6 +344,7 @@ def connect(self):
340
344
log .debug ('%s: initiating SASL authentication' , self )
341
345
self .state = ConnectionStates .AUTHENTICATING
342
346
else :
347
+ # security_protocol PLAINTEXT
343
348
log .debug ('%s: Connection complete.' , self )
344
349
self .state = ConnectionStates .CONNECTED
345
350
self ._reset_reconnect_backoff ()
@@ -375,7 +380,6 @@ def connect(self):
375
380
if self .state is ConnectionStates .AUTHENTICATING :
376
381
assert self .config ['security_protocol' ] in ('SASL_PLAINTEXT' , 'SASL_SSL' )
377
382
if self ._try_authenticate ():
378
- log .info ('%s: Authenticated as %s' , self , self .config ['sasl_plain_username' ])
379
383
log .debug ('%s: Connection complete.' , self )
380
384
self .state = ConnectionStates .CONNECTED
381
385
self ._reset_reconnect_backoff ()
@@ -508,21 +512,21 @@ def _try_authenticate_plain(self, future):
508
512
if data != b'\x00 \x00 \x00 \x00 ' :
509
513
return future .failure (Errors .AuthenticationFailedError ())
510
514
515
+ log .info ('%s: Authenticated as %s' , self , self .config ['sasl_plain_username' ])
511
516
return future .success (True )
512
517
513
518
def _try_authenticate_gssapi (self , future ):
514
-
515
519
data = b''
516
520
gssname = self .config ['sasl_kerberos_service_name' ] + '@' + self .hostname
517
- ctx_Name = gssapi .Name (gssname , name_type = gssapi .NameType .hostbased_service )
521
+ ctx_Name = gssapi .Name (gssname , name_type = gssapi .NameType .hostbased_service )
518
522
ctx_CanonName = ctx_Name .canonicalize (gssapi .MechType .kerberos )
519
523
log .debug ('%s: canonical Servicename: %s' , self , ctx_CanonName )
520
- ctx_Context = gssapi .SecurityContext (name = ctx_CanonName , usage = 'initiate' )
521
- #Exchange tokens until authentication either suceeded or failed :
524
+ ctx_Context = gssapi .SecurityContext (name = ctx_CanonName , usage = 'initiate' )
525
+ # Exchange tokens until authentication either succeeds or fails :
522
526
received_token = None
523
527
try :
524
528
while not ctx_Context .complete :
525
- #calculate the output token
529
+ # calculate the output token
526
530
try :
527
531
output_token = ctx_Context .step (received_token )
528
532
except GSSError as e :
@@ -541,10 +545,10 @@ def _try_authenticate_gssapi(self, future):
541
545
size = Int32 .encode (len (msg ))
542
546
self ._sock .sendall (size + msg )
543
547
544
- # The server will send a token back. processing of this token either
545
- # establishes a security context, or needs further token exchange
546
- # the gssapi will be able to identify the needed next step
547
- # The connection is closed on failure
548
+ # The server will send a token back. Processing of this token either
549
+ # establishes a security context, or it needs further token exchange.
550
+ # The gssapi will be able to identify the needed next step.
551
+ # The connection is closed on failure.
548
552
response = self ._sock .recv (2000 )
549
553
self ._sock .setblocking (False )
550
554
@@ -554,7 +558,7 @@ def _try_authenticate_gssapi(self, future):
554
558
future .failure (error )
555
559
self .close (error = error )
556
560
557
- #pass the received token back to gssapi, strip the first 4 bytes
561
+ # pass the received token back to gssapi, strip the first 4 bytes
558
562
received_token = response [4 :]
559
563
560
564
except Exception as e :
@@ -563,6 +567,7 @@ def _try_authenticate_gssapi(self, future):
563
567
future .failure (error )
564
568
self .close (error = error )
565
569
570
+ log .info ('%s: Authenticated as %s' , self , gssname )
566
571
return future .success (True )
567
572
568
573
def blacked_out (self ):
0 commit comments