Skip to content

Commit 6dfcf71

Browse files
dpkp88manpreet
authored andcommitted
KIP-144: Exponential backoff for broker reconnections (dpkp#1124)
1 parent 6ced4f8 commit 6dfcf71

File tree

4 files changed

+69
-22
lines changed

4 files changed

+69
-22
lines changed

kafka/client_async.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ class KafkaClient(object):
6767
reconnect_backoff_ms (int): The amount of time in milliseconds to
6868
wait before attempting to reconnect to a given host.
6969
Default: 50.
70+
reconnect_backoff_max_ms (int): The maximum amount of time in
71+
milliseconds to wait when reconnecting to a broker that has
72+
repeatedly failed to connect. If provided, the backoff per host
73+
will increase exponentially for each consecutive connection
74+
failure, up to this maximum. To avoid connection storms, a
75+
randomization factor of 0.2 will be applied to the backoff
76+
resulting in a random range between 20% below and 20% above
77+
the computed value. Default: 1000.
7078
request_timeout_ms (int): Client request timeout in milliseconds.
7179
Default: 40000.
7280
retry_backoff_ms (int): Milliseconds to backoff when retrying on
@@ -137,6 +145,7 @@ class KafkaClient(object):
137145
'request_timeout_ms': 40000,
138146
'connections_max_idle_ms': 9 * 60 * 1000,
139147
'reconnect_backoff_ms': 50,
148+
'reconnect_backoff_max_ms': 1000,
140149
'max_in_flight_requests_per_connection': 5,
141150
'receive_buffer_bytes': None,
142151
'send_buffer_bytes': None,
@@ -432,15 +441,7 @@ def connection_delay(self, node_id):
432441
"""
433442
if node_id not in self._conns:
434443
return 0
435-
436-
conn = self._conns[node_id]
437-
time_waited_ms = time.time() - (conn.last_attempt or 0)
438-
if conn.disconnected():
439-
return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
440-
elif conn.connecting():
441-
return 0
442-
else:
443-
return 999999999
444+
return self._conns[node_id].connection_delay()
444445

445446
def is_ready(self, node_id, metadata_priority=True):
446447
"""Check whether a node is ready to send more requests.
@@ -646,12 +647,10 @@ def in_flight_request_count(self, node_id=None):
646647
def least_loaded_node(self):
647648
"""Choose the node with fewest outstanding requests, with fallbacks.
648649
649-
This method will prefer a node with an existing connection, but will
650-
potentially choose a node for which we don't yet have a connection if
651-
all existing connections are in use. This method will never choose a
652-
node that was disconnected within the reconnect backoff period.
653-
If all else fails, the method will attempt to bootstrap again using the
654-
bootstrap_servers list.
650+
This method will prefer a node with an existing connection and no
651+
in-flight-requests. If no such node is found, a node will be chosen
652+
randomly from disconnected nodes that are not "blacked out" (i.e.,
653+
are not subject to a reconnect backoff).
655654
656655
Returns:
657656
node_id or None if no suitable node was found
@@ -686,10 +685,6 @@ def least_loaded_node(self):
686685
elif 'bootstrap' in self._conns:
687686
return 'bootstrap'
688687

689-
# Last option: try to bootstrap again
690-
# this should only happen if no prior bootstrap has been successful
691-
log.error('No nodes found in metadata -- retrying bootstrap')
692-
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
693688
return None
694689

695690
def set_topics(self, topics):

kafka/conn.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import errno
66
import logging
77
import io
8-
from random import shuffle
8+
from random import shuffle, uniform
99
import socket
1010
import time
1111
import traceback
@@ -78,6 +78,14 @@ class BrokerConnection(object):
7878
reconnect_backoff_ms (int): The amount of time in milliseconds to
7979
wait before attempting to reconnect to a given host.
8080
Default: 50.
81+
reconnect_backoff_max_ms (int): The maximum amount of time in
82+
milliseconds to wait when reconnecting to a broker that has
83+
repeatedly failed to connect. If provided, the backoff per host
84+
will increase exponentially for each consecutive connection
85+
failure, up to this maximum. To avoid connection storms, a
86+
randomization factor of 0.2 will be applied to the backoff
87+
resulting in a random range between 20% below and 20% above
88+
the computed value. Default: 1000.
8189
request_timeout_ms (int): Client request timeout in milliseconds.
8290
Default: 40000.
8391
max_in_flight_requests_per_connection (int): Requests are pipelined
@@ -140,6 +148,7 @@ class BrokerConnection(object):
140148
'node_id': 0,
141149
'request_timeout_ms': 40000,
142150
'reconnect_backoff_ms': 50,
151+
'reconnect_backoff_max_ms': 1000,
143152
'max_in_flight_requests_per_connection': 5,
144153
'receive_buffer_bytes': None,
145154
'send_buffer_bytes': None,
@@ -196,6 +205,7 @@ def __init__(self, host, port, afi, **configs):
196205
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
197206

198207
self.state = ConnectionStates.DISCONNECTED
208+
self._reset_reconnect_backoff()
199209
self._sock = None
200210
self._ssl_context = None
201211
if self.config['ssl_context'] is not None:
@@ -301,6 +311,7 @@ def connect(self):
301311
else:
302312
log.debug('%s: Connection complete.', self)
303313
self.state = ConnectionStates.CONNECTED
314+
self._reset_reconnect_backoff()
304315
self.config['state_change_callback'](self)
305316

306317
# Connection failed
@@ -336,6 +347,7 @@ def connect(self):
336347
log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
337348
log.debug('%s: Connection complete.', self)
338349
self.state = ConnectionStates.CONNECTED
350+
self._reset_reconnect_backoff()
339351
self.config['state_change_callback'](self)
340352

341353
return self.state
@@ -471,11 +483,19 @@ def blacked_out(self):
471483
re-establish a connection yet
472484
"""
473485
if self.state is ConnectionStates.DISCONNECTED:
474-
backoff = self.config['reconnect_backoff_ms'] / 1000.0
475-
if time.time() < self.last_attempt + backoff:
486+
if time.time() < self.last_attempt + self._reconnect_backoff:
476487
return True
477488
return False
478489

490+
def connection_delay(self):
491+
time_waited_ms = time.time() - (self.last_attempt or 0)
492+
if self.state is ConnectionStates.DISCONNECTED:
493+
return max(self._reconnect_backoff - time_waited_ms, 0)
494+
elif self.connecting():
495+
return 0
496+
else:
497+
return 999999999
498+
479499
def connected(self):
480500
"""Return True iff socket is connected."""
481501
return self.state is ConnectionStates.CONNECTED
@@ -491,6 +511,19 @@ def disconnected(self):
491511
"""Return True iff socket is closed"""
492512
return self.state is ConnectionStates.DISCONNECTED
493513

514+
def _reset_reconnect_backoff(self):
515+
self._failures = 0
516+
self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0
517+
518+
def _update_reconnect_backoff(self):
519+
if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']:
520+
self._failures += 1
521+
self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
522+
self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms'])
523+
self._reconnect_backoff *= uniform(0.8, 1.2)
524+
self._reconnect_backoff /= 1000.0
525+
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
526+
494527
def close(self, error=None):
495528
"""Close socket and fail all in-flight-requests.
496529
@@ -508,6 +541,7 @@ def close(self, error=None):
508541
log.info('%s: Closing connection. %s', self, error or '')
509542
self.state = ConnectionStates.DISCONNECTING
510543
self.config['state_change_callback'](self)
544+
self._update_reconnect_backoff()
511545
if self._sock:
512546
self._sock.close()
513547
self._sock = None

kafka/consumer/group.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ class KafkaConsumer(six.Iterator):
9191
reconnect_backoff_ms (int): The amount of time in milliseconds to
9292
wait before attempting to reconnect to a given host.
9393
Default: 50.
94+
reconnect_backoff_max_ms (int): The maximum amount of time in
95+
milliseconds to wait when reconnecting to a broker that has
96+
repeatedly failed to connect. If provided, the backoff per host
97+
will increase exponentially for each consecutive connection
98+
failure, up to this maximum. To avoid connection storms, a
99+
randomization factor of 0.2 will be applied to the backoff
100+
resulting in a random range between 20% below and 20% above
101+
the computed value. Default: 1000.
94102
max_in_flight_requests_per_connection (int): Requests are pipelined
95103
to kafka brokers up to this number of maximum requests per
96104
broker connection. Default: 5.
@@ -230,6 +238,7 @@ class KafkaConsumer(six.Iterator):
230238
'request_timeout_ms': 40 * 1000,
231239
'retry_backoff_ms': 100,
232240
'reconnect_backoff_ms': 50,
241+
'reconnect_backoff_max_ms': 1000,
233242
'max_in_flight_requests_per_connection': 5,
234243
'auto_offset_reset': 'latest',
235244
'enable_auto_commit': True,

kafka/producer/kafka.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,14 @@ class KafkaProducer(object):
198198
reconnect_backoff_ms (int): The amount of time in milliseconds to
199199
wait before attempting to reconnect to a given host.
200200
Default: 50.
201+
reconnect_backoff_max_ms (int): The maximum amount of time in
202+
milliseconds to wait when reconnecting to a broker that has
203+
repeatedly failed to connect. If provided, the backoff per host
204+
will increase exponentially for each consecutive connection
205+
failure, up to this maximum. To avoid connection storms, a
206+
randomization factor of 0.2 will be applied to the backoff
207+
resulting in a random range between 20% below and 20% above
208+
the computed value. Default: 1000.
201209
max_in_flight_requests_per_connection (int): Requests are pipelined
202210
to kafka brokers up to this number of maximum requests per
203211
broker connection. Default: 5.
@@ -275,6 +283,7 @@ class KafkaProducer(object):
275283
'send_buffer_bytes': None,
276284
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
277285
'reconnect_backoff_ms': 50,
286+
'reconnect_backoff_max': 1000,
278287
'max_in_flight_requests_per_connection': 5,
279288
'security_protocol': 'PLAINTEXT',
280289
'ssl_context': None,

0 commit comments

Comments
 (0)