Skip to content

Commit f747738

Browse files
committed
Refactor dns lookup into separate functions
1 parent 01c34c5 commit f747738

File tree

2 files changed

+74
-67
lines changed

2 files changed

+74
-67
lines changed

kafka/conn.py

Lines changed: 49 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ def __init__(self, host, port, afi, **configs):
203203
self.afi = afi
204204
self._init_host = host
205205
self._init_port = port
206+
self._init_afi = afi
206207
self.in_flight_requests = collections.deque()
207208
self._api_versions = None
208209

@@ -250,67 +251,42 @@ def __init__(self, host, port, afi, **configs):
250251
self._sasl_auth_future = None
251252
self.last_attempt = 0
252253
self._gai = None
253-
self._gai_index = 0
254254
self._sensors = None
255255
if self.config['metrics']:
256256
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
257257
self.config['metric_group_prefix'],
258258
self.node_id)
259259

260+
def _next_afi_host_port(self):
261+
if not self._gai:
262+
self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi)
263+
if not self._gai:
264+
log.error('DNS lookup failed for {0}:{1} ({2})'
265+
.format(self._init_host, self._init_port, self._init_afi))
266+
return
267+
268+
afi, _, __, ___, sockaddr = self._gai.pop(0)
269+
host, port = sockaddr[:2]
270+
return (afi, host, port)
271+
260272
def connect(self):
261273
"""Attempt to connect and return ConnectionState"""
262274
if self.state is ConnectionStates.DISCONNECTED:
263-
log.debug('%s: creating new socket', self)
264-
# if self.afi is set to AF_UNSPEC, then we need to do a name
265-
# resolution and try all available address families
266-
if self.afi == socket.AF_UNSPEC:
267-
if self._gai is None:
268-
# XXX: all DNS functions in Python are blocking. If we really
269-
# want to be non-blocking here, we need to use a 3rd-party
270-
# library like python-adns, or move resolution onto its
271-
# own thread. This will be subject to the default libc
272-
# name resolution timeout (5s on most Linux boxes)
273-
try:
274-
self._gai = socket.getaddrinfo(self._init_host,
275-
self._init_port,
276-
socket.AF_UNSPEC,
277-
socket.SOCK_STREAM)
278-
except socket.gaierror as ex:
279-
log.warning('DNS lookup failed for %s:%d,'
280-
' exception was %s. Is your'
281-
' advertised.listeners (called'
282-
' advertised.host.name before Kafka 9)'
283-
' correct and resolvable?',
284-
self._init_host, self._init_port, ex)
285-
self._gai = []
286-
self._gai_index = 0
287-
else:
288-
# if self._gai already exists, then we should try the next
289-
# name
290-
self._gai_index += 1
291-
while True:
292-
if self._gai_index >= len(self._gai):
293-
log.error('Unable to connect to any of the names for {0}:{1}'
294-
.format(self._init_host, self._init_port))
295-
self._gai = None
296-
self._gai_index = 0
297-
return
298-
afi, _, __, ___, sockaddr = self._gai[self._gai_index]
299-
if afi not in (socket.AF_INET, socket.AF_INET6):
300-
self._gai_index += 1
301-
continue
302-
break
303-
self.host, self.port = sockaddr[:2]
304-
self._sock = socket.socket(afi, socket.SOCK_STREAM)
275+
self.last_attempt = time.time()
276+
next_lookup = self._next_afi_host_port()
277+
if not next_lookup:
278+
self.close(Errors.ConnectionError('DNS failure'))
279+
return
305280
else:
281+
log.debug('%s: creating new socket', self)
282+
self.afi, self.host, self.port = next_lookup
306283
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
307284

308285
for option in self.config['socket_options']:
309286
log.debug('%s: setting socket option %s', self, option)
310287
self._sock.setsockopt(*option)
311288

312289
self._sock.setblocking(False)
313-
self.last_attempt = time.time()
314290
self.state = ConnectionStates.CONNECTING
315291
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
316292
self._wrap_ssl()
@@ -643,23 +619,15 @@ def close(self, error=None):
643619
will be failed with this exception.
644620
Default: kafka.errors.ConnectionError.
645621
"""
646-
if self.state is ConnectionStates.DISCONNECTED:
647-
if error is not None:
648-
if sys.version_info >= (3, 2):
649-
log.warning('%s: close() called on disconnected connection with error: %s', self, error, stack_info=True)
650-
else:
651-
log.warning('%s: close() called on disconnected connection with error: %s', self, error)
652-
return
653-
654622
log.info('%s: Closing connection. %s', self, error or '')
655-
self.state = ConnectionStates.DISCONNECTING
656-
self.config['state_change_callback'](self)
623+
if self.state is not ConnectionStates.DISCONNECTED:
624+
self.state = ConnectionStates.DISCONNECTING
625+
self.config['state_change_callback'](self)
657626
self._update_reconnect_backoff()
658627
if self._sock:
659628
self._sock.close()
660629
self._sock = None
661630
self.state = ConnectionStates.DISCONNECTED
662-
self.last_attempt = time.time()
663631
self._sasl_auth_future = None
664632
self._protocol = KafkaProtocol(
665633
client_id=self.config['client_id'],
@@ -1169,3 +1137,29 @@ def collect_hosts(hosts, randomize=True):
11691137
shuffle(result)
11701138

11711139
return result
1140+
1141+
1142+
def is_inet_4_or_6(gai):
1143+
"""Given a getaddrinfo struct, return True iff ipv4 or ipv6"""
1144+
return gai[0] in (socket.AF_INET, socket.AF_INET6)
1145+
1146+
1147+
def dns_lookup(host, port, afi=socket.AF_UNSPEC):
1148+
"""Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)"""
1149+
# XXX: all DNS functions in Python are blocking. If we really
1150+
# want to be non-blocking here, we need to use a 3rd-party
1151+
# library like python-adns, or move resolution onto its
1152+
# own thread. This will be subject to the default libc
1153+
# name resolution timeout (5s on most Linux boxes)
1154+
try:
1155+
return list(filter(is_inet_4_or_6,
1156+
socket.getaddrinfo(host, port, afi,
1157+
socket.SOCK_STREAM)))
1158+
except socket.gaierror as ex:
1159+
log.warning('DNS lookup failed for %s:%d,'
1160+
' exception was %s. Is your'
1161+
' advertised.listeners (called'
1162+
' advertised.host.name before Kafka 9)'
1163+
' correct and resolvable?',
1164+
host, port, ex)
1165+
return []

test/test_conn.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,6 @@ def test_connect_timeout(_socket, conn):
6666
assert conn.state is ConnectionStates.DISCONNECTED
6767

6868

69-
def test_connect_dns_failure(_socket, conn):
70-
assert conn.state is ConnectionStates.DISCONNECTED
71-
assert conn._gai is None
72-
# Setup _gai / index to state where we need gai and there are no more entries to test
73-
conn.afi = socket.AF_UNSPEC
74-
conn._gai = ['foo']
75-
conn._gai_index = 1
76-
conn.connect()
77-
assert conn.state is ConnectionStates.DISCONNECTED
78-
assert conn._gai is None
79-
80-
8169
def test_blacked_out(conn):
8270
assert conn.blacked_out() is False
8371
conn.last_attempt = time.time()
@@ -279,3 +267,28 @@ def test_lookup_on_connect():
279267
m.assert_called_once_with(hostname, port, 0, 1)
280268
conn.close()
281269
assert conn.host == ip2
270+
271+
272+
def test_relookup_on_failure():
273+
hostname = 'example.org'
274+
port = 9092
275+
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
276+
assert conn.host == conn.hostname == hostname
277+
mock_return1 = []
278+
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
279+
last_attempt = conn.last_attempt
280+
conn.connect()
281+
m.assert_called_once_with(hostname, port, 0, 1)
282+
assert conn.disconnected()
283+
assert conn.last_attempt > last_attempt
284+
285+
ip2 = '127.0.0.2'
286+
mock_return2 = [
287+
(2, 2, 17, '', (ip2, 9092)),
288+
]
289+
290+
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
291+
conn.connect()
292+
m.assert_called_once_with(hostname, port, 0, 1)
293+
conn.close()
294+
assert conn.host == ip2

0 commit comments

Comments
 (0)