Skip to content

Commit 11cf397

Browse files
committed
Stop shadowing ConnectionError
In Python3, `ConnectionError` is a native exception. So rename our custom one to `KafkaConnectionError` to prevent accidentally shadowing the native one. Note that there are still valid uses of `ConnectionError` in this code. They already expect a native Python3 `ConnectionError`, and also already handle the Python2 compatibility issues.
1 parent 9221fcf commit 11cf397

File tree

8 files changed

+30
-31
lines changed

8 files changed

+30
-31
lines changed

kafka/client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from kafka.vendor import six
1212

1313
import kafka.errors
14-
from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError,
14+
from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError,
1515
KafkaTimeoutError, KafkaUnavailableError,
1616
LeaderNotAvailableError, UnknownTopicOrPartitionError,
1717
NotLeaderForPartitionError, ReplicaNotAvailableError)
@@ -73,7 +73,7 @@ def _get_conn(self, host, port, afi):
7373
conn = self._conns[host_key]
7474
if not conn.connect_blocking(self.timeout):
7575
conn.close()
76-
raise ConnectionError("%s:%s (%s)" % (host, port, afi))
76+
raise KafkaConnectionError("%s:%s (%s)" % (host, port, afi))
7777
return conn
7878

7979
def _get_leader_for_partition(self, topic, partition):
@@ -156,7 +156,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
156156
for (host, port, afi) in hosts:
157157
try:
158158
conn = self._get_conn(host, port, afi)
159-
except ConnectionError:
159+
except KafkaConnectionError:
160160
log.warning("Skipping unconnected connection: %s:%s (AFI %s)",
161161
host, port, afi)
162162
continue
@@ -242,7 +242,7 @@ def failed_payloads(payloads):
242242
host, port, afi = get_ip_port_afi(broker.host)
243243
try:
244244
conn = self._get_conn(host, broker.port, afi)
245-
except ConnectionError:
245+
except KafkaConnectionError:
246246
refresh_metadata = True
247247
failed_payloads(broker_payloads)
248248
continue
@@ -344,8 +344,8 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
344344
try:
345345
host, port, afi = get_ip_port_afi(broker.host)
346346
conn = self._get_conn(host, broker.port, afi)
347-
except ConnectionError as e:
348-
log.warning('ConnectionError attempting to send request %s '
347+
except KafkaConnectionError as e:
348+
log.warning('KafkaConnectionError attempting to send request %s '
349349
'to server %s: %s', request_id, broker, e)
350350

351351
for payload in payloads:

kafka/client_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ def _poll(self, timeout):
602602
log.warning('Protocol out of sync on %r, closing', conn)
603603
except socket.error:
604604
pass
605-
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
605+
conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
606606
continue
607607

608608
self._idle_expiry_manager.update(conn.node_id)

kafka/conn.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ def connect(self):
327327
self.last_attempt = time.time()
328328
next_lookup = self._next_afi_sockaddr()
329329
if not next_lookup:
330-
self.close(Errors.ConnectionError('DNS failure'))
330+
self.close(Errors.KafkaConnectionError('DNS failure'))
331331
return
332332
else:
333333
log.debug('%s: creating new socket', self)
@@ -381,12 +381,12 @@ def connect(self):
381381
log.error('Connect attempt to %s returned error %s.'
382382
' Disconnecting.', self, ret)
383383
errstr = errno.errorcode.get(ret, 'UNKNOWN')
384-
self.close(Errors.ConnectionError('{} {}'.format(ret, errstr)))
384+
self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))
385385

386386
# Connection timed out
387387
elif time.time() > request_timeout + self.last_attempt:
388388
log.error('Connection attempt to %s timed out', self)
389-
self.close(Errors.ConnectionError('timeout'))
389+
self.close(Errors.KafkaConnectionError('timeout'))
390390

391391
# Needs retry
392392
else:
@@ -463,7 +463,7 @@ def _try_handshake(self):
463463
pass
464464
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
465465
log.warning('SSL connection closed by server during handshake.')
466-
self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
466+
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
467467
# Other SSLErrors will be raised to user
468468

469469
return False
@@ -488,7 +488,7 @@ def _try_authenticate(self):
488488
return False
489489
elif self._sasl_auth_future.failed():
490490
ex = self._sasl_auth_future.exception
491-
if not isinstance(ex, Errors.ConnectionError):
491+
if not isinstance(ex, Errors.KafkaConnectionError):
492492
raise ex # pylint: disable-msg=raising-bad-type
493493
return self._sasl_auth_future.succeeded()
494494

@@ -558,8 +558,8 @@ def _try_authenticate_plain(self, future):
558558
data = self._recv_bytes_blocking(4)
559559

560560
except ConnectionError as e:
561-
log.exception("%s: Error receiving reply from server", self)
562-
error = Errors.ConnectionError("%s: %s" % (self, e))
561+
log.exception("%s: Error receiving reply from server", self)
562+
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
563563
self.close(error=error)
564564
return future.failure(error)
565565

@@ -621,7 +621,7 @@ def _try_authenticate_gssapi(self, future):
621621

622622
except ConnectionError as e:
623623
log.exception("%s: Error receiving reply from server", self)
624-
error = Errors.ConnectionError("%s: %s" % (self, e))
624+
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
625625
self.close(error=error)
626626
return future.failure(error)
627627
except Exception as e:
@@ -701,7 +701,7 @@ def close(self, error=None):
701701
Arguments:
702702
error (Exception, optional): pending in-flight-requests
703703
will be failed with this exception.
704-
Default: kafka.errors.ConnectionError.
704+
Default: kafka.errors.KafkaConnectionError.
705705
"""
706706
if self.state is ConnectionStates.DISCONNECTED:
707707
if error is not None:
@@ -733,7 +733,7 @@ def send(self, request):
733733
if self.connecting():
734734
return future.failure(Errors.NodeNotReadyError(str(self)))
735735
elif not self.connected():
736-
return future.failure(Errors.ConnectionError(str(self)))
736+
return future.failure(Errors.KafkaConnectionError(str(self)))
737737
elif not self.can_send_more():
738738
return future.failure(Errors.TooManyInFlightRequests(str(self)))
739739
return self._send(request)
@@ -753,7 +753,7 @@ def _send(self, request):
753753
self._sensors.bytes_sent.record(total_bytes)
754754
except ConnectionError as e:
755755
log.exception("Error sending %s to %s", request, self)
756-
error = Errors.ConnectionError("%s: %s" % (self, e))
756+
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
757757
self.close(error=error)
758758
return future.failure(error)
759759
log.debug('%s Request %d: %s', self, correlation_id, request)
@@ -781,7 +781,7 @@ def recv(self):
781781
# If requests are pending, we should close the socket and
782782
# fail all the pending request futures
783783
if self.in_flight_requests:
784-
self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
784+
self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests'))
785785
return ()
786786

787787
elif not self.in_flight_requests:
@@ -821,7 +821,7 @@ def _recv(self):
821821
# without an exception raised
822822
if not data:
823823
log.error('%s: socket disconnected', self)
824-
self.close(error=Errors.ConnectionError('socket disconnected'))
824+
self.close(error=Errors.KafkaConnectionError('socket disconnected'))
825825
return []
826826
else:
827827
recvd.append(data)
@@ -833,7 +833,7 @@ def _recv(self):
833833
break
834834
log.exception('%s: Error receiving network data'
835835
' closing socket', self)
836-
self.close(error=Errors.ConnectionError(e))
836+
self.close(error=Errors.KafkaConnectionError(e))
837837
return []
838838
except BlockingIOError:
839839
if six.PY3:

kafka/errors.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ def __init__(self, payload, *args):
447447
self.payload = payload
448448

449449

450-
class ConnectionError(KafkaError):
450+
class KafkaConnectionError(KafkaError):
451451
retriable = True
452452
invalid_metadata = True
453453

@@ -517,13 +517,13 @@ def check_error(response):
517517

518518
RETRY_BACKOFF_ERROR_TYPES = (
519519
KafkaUnavailableError, LeaderNotAvailableError,
520-
ConnectionError, FailedPayloadsError
520+
KafkaConnectionError, FailedPayloadsError
521521
)
522522

523523

524524
RETRY_REFRESH_ERROR_TYPES = (
525525
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
526-
LeaderNotAvailableError, ConnectionError
526+
LeaderNotAvailableError, KafkaConnectionError
527527
)
528528

529529

kafka/producer/base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,6 @@ def send_messages(self, topic, partition, *msg):
372372
Raises:
373373
FailedPayloadsError: low-level connection error, can be caused by
374374
networking failures, or a malformed request.
375-
ConnectionError:
376375
KafkaUnavailableError: all known brokers are down when attempting
377376
to refresh metadata.
378377
LeaderNotAvailableError: topic or partition is initializing or

test/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from kafka import SimpleClient
99
from kafka.errors import (
1010
KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
11-
UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError)
11+
UnknownTopicOrPartitionError, FailedPayloadsError)
1212
from kafka.future import Future
1313
from kafka.protocol import KafkaProtocol, create_message
1414
from kafka.protocol.metadata import MetadataResponse

test/test_conn.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def test_send_disconnected(conn):
9999
conn.state = ConnectionStates.DISCONNECTED
100100
f = conn.send('foobar')
101101
assert f.failed() is True
102-
assert isinstance(f.exception, Errors.ConnectionError)
102+
assert isinstance(f.exception, Errors.KafkaConnectionError)
103103

104104

105105
def test_send_connecting(conn):
@@ -162,7 +162,7 @@ def test_send_error(_socket, conn):
162162
_socket.send.side_effect = socket.error
163163
f = conn.send(req)
164164
assert f.failed() is True
165-
assert isinstance(f.exception, Errors.ConnectionError)
165+
assert isinstance(f.exception, Errors.KafkaConnectionError)
166166
assert _socket.close.call_count == 1
167167
assert conn.state is ConnectionStates.DISCONNECTED
168168

test/test_failover_integration.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from kafka import SimpleClient, SimpleConsumer, KeyedProducer
66
from kafka.errors import (
7-
FailedPayloadsError, ConnectionError, RequestTimedOutError,
7+
FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
88
NotLeaderForPartitionError)
99
from kafka.producer.base import Producer
1010
from kafka.structs import TopicPartition
@@ -79,7 +79,7 @@ def test_switch_leader(self):
7979
producer.send_messages(topic, partition, b'success')
8080
log.debug("success!")
8181
recovered = True
82-
except (FailedPayloadsError, ConnectionError, RequestTimedOutError,
82+
except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
8383
NotLeaderForPartitionError):
8484
log.debug("caught exception sending message -- will retry")
8585
continue
@@ -167,7 +167,7 @@ def test_switch_leader_keyed_producer(self):
167167
producer.send_messages(topic, key, msg)
168168
if producer.partitioners[topic].partition(key) == 0:
169169
recovered = True
170-
except (FailedPayloadsError, ConnectionError, RequestTimedOutError,
170+
except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
171171
NotLeaderForPartitionError):
172172
log.debug("caught exception sending message -- will retry")
173173
continue

0 commit comments

Comments
 (0)