Skip to content

Commit 5bb1abd

Browse files
authored
Catch TimeoutError in BrokerConnection send/recv (#1820)
1 parent 21b00c3 commit 5bb1abd

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

kafka/conn.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
if six.PY2:
3838
ConnectionError = socket.error
39+
TimeoutError = socket.error
3940
BlockingIOError = Exception
4041

4142
log = logging.getLogger(__name__)
@@ -498,7 +499,7 @@ def _try_handshake(self):
498499
# old ssl in python2.6 will swallow all SSLErrors here...
499500
except (SSLWantReadError, SSLWantWriteError):
500501
pass
501-
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
502+
except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError):
502503
log.warning('SSL connection closed by server during handshake.')
503504
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
504505
# Other SSLErrors will be raised to user
@@ -599,7 +600,7 @@ def _try_authenticate_plain(self, future):
599600
# The connection is closed on failure
600601
data = self._recv_bytes_blocking(4)
601602

602-
except ConnectionError as e:
603+
except (ConnectionError, TimeoutError) as e:
603604
log.exception("%s: Error receiving reply from server", self)
604605
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
605606
self.close(error=error)
@@ -665,7 +666,7 @@ def _try_authenticate_gssapi(self, future):
665666
size = Int32.encode(len(msg))
666667
self._send_bytes_blocking(size + msg)
667668

668-
except ConnectionError as e:
669+
except (ConnectionError, TimeoutError) as e:
669670
self._lock.release()
670671
log.exception("%s: Error receiving reply from server", self)
671672
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
@@ -695,7 +696,7 @@ def _try_authenticate_oauth(self, future):
695696
# The connection is closed on failure
696697
data = self._recv_bytes_blocking(4)
697698

698-
except ConnectionError as e:
699+
except (ConnectionError, TimeoutError) as e:
699700
self._lock.release()
700701
log.exception("%s: Error receiving reply from server", self)
701702
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
@@ -886,7 +887,7 @@ def send_pending_requests(self):
886887
if self._sensors:
887888
self._sensors.bytes_sent.record(total_bytes)
888889
return total_bytes
889-
except ConnectionError as e:
890+
except (ConnectionError, TimeoutError) as e:
890891
log.exception("Error sending request data to %s", self)
891892
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
892893
self.close(error=error)
@@ -954,7 +955,7 @@ def _recv(self):
954955

955956
except SSLWantReadError:
956957
break
957-
except ConnectionError as e:
958+
except (ConnectionError, TimeoutError) as e:
958959
if six.PY2 and e.errno == errno.EWOULDBLOCK:
959960
break
960961
log.exception('%s: Error receiving network data'

0 commit comments

Comments
 (0)