Skip to content

Commit 807d0b6

Browse files
committed
Handle blocking socket recv during authentication
1 parent 8bffeb9 commit 807d0b6

File tree

1 file changed

+20
-13
lines changed

1 file changed

+20
-13
lines changed

kafka/conn.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,19 @@ def _handle_sasl_handshake_response(self, future, response):
467467
'kafka-python does not support SASL mechanism %s' %
468468
self.config['sasl_mechanism']))
469469

470+
def _recv_bytes_blocking(self, n):
471+
self._sock.setblocking(True)
472+
try:
473+
data = b''
474+
while len(data) < n:
475+
fragment = self._sock.recv(n - len(data))
476+
if not fragment:
477+
raise ConnectionError('Connection reset during recv')
478+
data += fragment
479+
return data
480+
finally:
481+
self._sock.setblocking(False)
482+
470483
def _try_authenticate_plain(self, future):
471484
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
472485
log.warning('%s: Sending username and password in the clear', self)
@@ -480,19 +493,12 @@ def _try_authenticate_plain(self, future):
480493
self.config['sasl_plain_password']]).encode('utf-8'))
481494
size = Int32.encode(len(msg))
482495
self._sock.sendall(size + msg)
496+
self._sock.setblocking(False)
483497

484498
# The server will send a zero sized message (that is Int32(0)) on success.
485499
# The connection is closed on failure
486-
while len(data) < 4:
487-
fragment = self._sock.recv(4 - len(data))
488-
if not fragment:
489-
log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
490-
error = Errors.AuthenticationFailedError(
491-
'Authentication failed for user {0}'.format(
492-
self.config['sasl_plain_username']))
493-
return future.failure(error)
494-
data += fragment
495-
self._sock.setblocking(False)
500+
self._recv_bytes_blocking(4)
501+
496502
except ConnectionError as e:
497503
log.exception("%s: Error receiving reply from server", self)
498504
error = Errors.ConnectionError("%s: %s" % (self, e))
@@ -528,14 +534,15 @@ def _try_authenticate_gssapi(self, future):
528534
msg = output_token
529535
size = Int32.encode(len(msg))
530536
self._sock.sendall(size + msg)
537+
self._sock.setblocking(False)
538+
531539
# The server will send a token back. Processing of this token either
532540
# establishes a security context, or it needs further token exchange.
533541
# The gssapi will be able to identify the needed next step.
534542
# The connection is closed on failure.
535-
header = self._sock.recv(4)
543+
header = self._recv_bytes_blocking(4)
536544
token_size = struct.unpack('>i', header)
537-
received_token = self._sock.recv(token_size)
538-
self._sock.setblocking(False)
545+
received_token = self._recv_bytes_blocking(token_size)
539546

540547
except ConnectionError as e:
541548
log.exception("%s: Error receiving reply from server", self)

0 commit comments

Comments
 (0)