Skip to content

Commit 28cbad6

Browse files
authored
KIP-98: Add idempotent producer support (#2569)
1 parent ef6d336 commit 28cbad6

File tree

11 files changed

+498
-41
lines changed

11 files changed

+498
-41
lines changed

kafka/client_async.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from kafka.metrics.stats.rate import TimeUnit
2828
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
2929
from kafka.protocol.metadata import MetadataRequest
30-
from kafka.util import Dict, WeakMethod, ensure_valid_topic_name
30+
from kafka.util import Dict, WeakMethod, ensure_valid_topic_name, timeout_ms_fn
3131
# Although this looks unused, it actually monkey-patches socket.socketpair()
3232
# and should be left in as long as we're using socket.socketpair() in this file
3333
from kafka.vendor import socketpair # noqa: F401
@@ -400,6 +400,11 @@ def maybe_connect(self, node_id, wakeup=True):
400400
return True
401401
return False
402402

403+
def connection_failed(self, node_id):
404+
if node_id not in self._conns:
405+
return False
406+
return self._conns[node_id].connect_failed()
407+
403408
def _should_recycle_connection(self, conn):
404409
# Never recycle unless disconnected
405410
if not conn.disconnected():
@@ -1157,6 +1162,39 @@ def bootstrap_connected(self):
11571162
else:
11581163
return False
11591164

1165+
def await_ready(self, node_id, timeout_ms=30000):
1166+
"""
1167+
Invokes `poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
1168+
invocations until the connection to `node` is ready, the timeoutMs expires or the connection fails.
1169+
1170+
It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails,
1171+
an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive
1172+
connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which
1173+
has recently disconnected.
1174+
1175+
This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
1176+
care.
1177+
"""
1178+
inner_timeout_ms = timeout_ms_fn(timeout_ms, None)
1179+
self.poll(timeout_ms=0)
1180+
if self.is_ready(node_id):
1181+
return True
1182+
1183+
while not self.is_ready(node_id) and inner_timeout_ms() > 0:
1184+
if self.connection_failed(node_id):
1185+
raise Errors.KafkaConnectionError("Connection to %s failed." % (node_id,))
1186+
self.maybe_connect(node_id)
1187+
self.poll(timeout_ms=inner_timeout_ms())
1188+
return self.is_ready(node_id)
1189+
1190+
def send_and_receive(self, node_id, request):
1191+
future = self.send(node_id, request)
1192+
self.poll(future=future)
1193+
assert future.is_done
1194+
if future.failed():
1195+
raise future.exception
1196+
return future.value
1197+
11601198

11611199
# OrderedDict requires python2.7+
11621200
try:

kafka/producer/kafka.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
2020
from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
2121
from kafka.producer.sender import Sender
22+
from kafka.producer.transaction_state import TransactionState
2223
from kafka.record.default_records import DefaultRecordBatchBuilder
2324
from kafka.record.legacy_records import LegacyRecordBatchBuilder
2425
from kafka.serializer import Serializer
@@ -93,6 +94,19 @@ class KafkaProducer(object):
9394
value_serializer (callable): used to convert user-supplied message
9495
values to bytes. If not None, called as f(value), should return
9596
bytes. Default: None.
97+
enable_idempotence (bool): When set to True, the producer will ensure
98+
that exactly one copy of each message is written in the stream.
99+
If False, producer retries due to broker failures, etc., may write
100+
duplicates of the retried message in the stream. Default: False.
101+
102+
Note that enabling idempotence requires
103+
`max_in_flight_requests_per_connection` to be set to 1 and `retries`
104+
cannot be zero. Additionally, `acks` must be set to 'all'. If these
105+
values are left at their defaults, the producer will override the
106+
defaults to be suitable. If the values are set to something
107+
incompatible with the idempotent producer, a KafkaConfigurationError
108+
will be raised.
109+
96110
acks (0, 1, 'all'): The number of acknowledgments the producer requires
97111
the leader to have received before considering a request complete.
98112
This controls the durability of records that are sent. The
@@ -303,6 +317,7 @@ class KafkaProducer(object):
303317
'client_id': None,
304318
'key_serializer': None,
305319
'value_serializer': None,
320+
'enable_idempotence': False,
306321
'acks': 1,
307322
'bootstrap_topics_filter': set(),
308323
'compression_type': None,
@@ -365,6 +380,7 @@ class KafkaProducer(object):
365380
def __init__(self, **configs):
366381
log.debug("Starting the Kafka producer") # trace
367382
self.config = copy.copy(self.DEFAULT_CONFIG)
383+
user_provided_configs = set(configs.keys())
368384
for key in self.config:
369385
if key in configs:
370386
self.config[key] = configs.pop(key)
@@ -428,13 +444,41 @@ def __init__(self, **configs):
428444
assert checker(), "Libraries for {} compression codec not found".format(ct)
429445
self.config['compression_attrs'] = compression_attrs
430446

431-
message_version = self._max_usable_produce_magic()
432-
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
447+
self._transaction_state = None
448+
if self.config['enable_idempotence']:
449+
self._transaction_state = TransactionState()
450+
if 'retries' not in user_provided_configs:
451+
log.info("Overriding the default 'retries' config to 3 since the idempotent producer is enabled.")
452+
self.config['retries'] = 3
453+
elif self.config['retries'] == 0:
454+
raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.")
455+
456+
if 'max_in_flight_requests_per_connection' not in user_provided_configs:
457+
log.info("Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.")
458+
self.config['max_in_flight_requests_per_connection'] = 1
459+
elif self.config['max_in_flight_requests_per_connection'] != 1:
460+
raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to 1 in order"
461+
" to use the idempotent producer."
462+
" Otherwise we cannot guarantee idempotence.")
463+
464+
if 'acks' not in user_provided_configs:
465+
log.info("Overriding the default 'acks' config to 'all' since idempotence is enabled")
466+
self.config['acks'] = -1
467+
elif self.config['acks'] != -1:
468+
raise Errors.KafkaConfigurationError("Must set 'acks' config to 'all' in order to use the idempotent"
469+
" producer. Otherwise we cannot guarantee idempotence")
470+
471+
message_version = self.max_usable_produce_magic(self.config['api_version'])
472+
self._accumulator = RecordAccumulator(
473+
transaction_state=self._transaction_state,
474+
message_version=message_version,
475+
**self.config)
433476
self._metadata = client.cluster
434477
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
435478
self._sender = Sender(client, self._metadata,
436479
self._accumulator,
437480
metrics=self._metrics,
481+
transaction_state=self._transaction_state,
438482
guarantee_message_order=guarantee_message_order,
439483
**self.config)
440484
self._sender.daemon = True
@@ -548,16 +592,17 @@ def partitions_for(self, topic):
548592
max_wait = self.config['max_block_ms'] / 1000
549593
return self._wait_on_metadata(topic, max_wait)
550594

551-
def _max_usable_produce_magic(self):
552-
if self.config['api_version'] >= (0, 11):
595+
@classmethod
596+
def max_usable_produce_magic(cls, api_version):
597+
if api_version >= (0, 11):
553598
return 2
554-
elif self.config['api_version'] >= (0, 10, 0):
599+
elif api_version >= (0, 10, 0):
555600
return 1
556601
else:
557602
return 0
558603

559604
def _estimate_size_in_bytes(self, key, value, headers=[]):
560-
magic = self._max_usable_produce_magic()
605+
magic = self.max_usable_produce_magic(self.config['api_version'])
561606
if magic == 2:
562607
return DefaultRecordBatchBuilder.estimate_size_in_bytes(
563608
key, value, headers)

kafka/producer/record_accumulator.py

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ def get(self):
3535

3636

3737
class ProducerBatch(object):
38-
def __init__(self, tp, records):
38+
def __init__(self, tp, records, now=None):
3939
self.max_record_size = 0
40-
now = time.time()
40+
now = time.time() if now is None else now
4141
self.created = now
4242
self.drained = None
4343
self.attempts = 0
@@ -52,13 +52,18 @@ def __init__(self, tp, records):
5252
def record_count(self):
5353
return self.records.next_offset()
5454

55-
def try_append(self, timestamp_ms, key, value, headers):
55+
@property
56+
def producer_id(self):
57+
return self.records.producer_id if self.records else None
58+
59+
def try_append(self, timestamp_ms, key, value, headers, now=None):
5660
metadata = self.records.append(timestamp_ms, key, value, headers)
5761
if metadata is None:
5862
return None
5963

64+
now = time.time() if now is None else now
6065
self.max_record_size = max(self.max_record_size, metadata.size)
61-
self.last_append = time.time()
66+
self.last_append = now
6267
future = FutureRecordMetadata(self.produce_future, metadata.offset,
6368
metadata.timestamp, metadata.crc,
6469
len(key) if key is not None else -1,
@@ -81,7 +86,7 @@ def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_of
8186
log_start_offset, exception) # trace
8287
self.produce_future.failure(exception)
8388

84-
def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full):
89+
def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full, now=None):
8590
"""Expire batches if metadata is not available
8691
8792
A batch whose metadata is not available should be expired if one
@@ -93,7 +98,7 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full)
9398
* the batch is in retry AND request timeout has elapsed after the
9499
backoff period ended.
95100
"""
96-
now = time.time()
101+
now = time.time() if now is None else now
97102
since_append = now - self.last_append
98103
since_ready = now - (self.created + linger_ms / 1000.0)
99104
since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0)
@@ -121,6 +126,10 @@ def in_retry(self):
121126
def set_retry(self):
122127
self._retry = True
123128

129+
@property
130+
def is_done(self):
131+
return self.produce_future.is_done
132+
124133
def __str__(self):
125134
return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
126135
self.topic_partition, self.records.next_offset())
@@ -161,6 +170,7 @@ class RecordAccumulator(object):
161170
'compression_attrs': 0,
162171
'linger_ms': 0,
163172
'retry_backoff_ms': 100,
173+
'transaction_state': None,
164174
'message_version': 0,
165175
}
166176

@@ -171,6 +181,7 @@ def __init__(self, **configs):
171181
self.config[key] = configs.pop(key)
172182

173183
self._closed = False
184+
self._transaction_state = self.config['transaction_state']
174185
self._flushes_in_progress = AtomicInteger()
175186
self._appends_in_progress = AtomicInteger()
176187
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
@@ -233,6 +244,10 @@ def append(self, tp, timestamp_ms, key, value, headers):
233244
batch_is_full = len(dq) > 1 or last.records.is_full()
234245
return future, batch_is_full, False
235246

247+
if self._transaction_state and self.config['message_version'] < 2:
248+
raise Errors.UnsupportedVersionError("Attempting to use idempotence with a broker which"
249+
" does not support the required message format (v2)."
250+
" The broker must be version 0.11 or later.")
236251
records = MemoryRecordsBuilder(
237252
self.config['message_version'],
238253
self.config['compression_attrs'],
@@ -310,9 +325,9 @@ def abort_expired_batches(self, request_timeout_ms, cluster):
310325

311326
return expired_batches
312327

313-
def reenqueue(self, batch):
328+
def reenqueue(self, batch, now=None):
314329
"""Re-enqueue the given record batch in the accumulator to retry."""
315-
now = time.time()
330+
now = time.time() if now is None else now
316331
batch.attempts += 1
317332
batch.last_attempt = now
318333
batch.last_append = now
@@ -323,7 +338,7 @@ def reenqueue(self, batch):
323338
with self._tp_locks[batch.topic_partition]:
324339
dq.appendleft(batch)
325340

326-
def ready(self, cluster):
341+
def ready(self, cluster, now=None):
327342
"""
328343
Get a list of nodes whose partitions are ready to be sent, and the
329344
earliest time at which any non-sendable partition will be ready;
@@ -357,7 +372,7 @@ def ready(self, cluster):
357372
ready_nodes = set()
358373
next_ready_check = 9999999.99
359374
unknown_leaders_exist = False
360-
now = time.time()
375+
now = time.time() if now is None else now
361376

362377
# several threads are accessing self._batches -- to simplify
363378
# concurrent access, we iterate over a snapshot of partitions
@@ -412,7 +427,7 @@ def has_unsent(self):
412427
return True
413428
return False
414429

415-
def drain(self, cluster, nodes, max_size):
430+
def drain(self, cluster, nodes, max_size, now=None):
416431
"""
417432
Drain all the data for the given nodes and collate them into a list of
418433
batches that will fit within the specified size on a per-node basis.
@@ -430,7 +445,7 @@ def drain(self, cluster, nodes, max_size):
430445
if not nodes:
431446
return {}
432447

433-
now = time.time()
448+
now = time.time() if now is None else now
434449
batches = {}
435450
for node_id in nodes:
436451
size = 0
@@ -463,7 +478,26 @@ def drain(self, cluster, nodes, max_size):
463478
# single request
464479
break
465480
else:
481+
producer_id_and_epoch = None
482+
if self._transaction_state:
483+
producer_id_and_epoch = self._transaction_state.producer_id_and_epoch
484+
if not producer_id_and_epoch.is_valid:
485+
# we cannot send the batch until we have refreshed the PID
486+
log.debug("Waiting to send ready batches because transaction producer id is not valid")
487+
break
488+
466489
batch = dq.popleft()
490+
if producer_id_and_epoch and not batch.in_retry():
491+
# If the batch is in retry, then we should not change the pid and
492+
# sequence number, since this may introduce duplicates. In particular,
493+
# the previous attempt may actually have been accepted, and if we change
494+
# the pid and sequence here, this attempt will also be accepted, causing
495+
# a duplicate.
496+
sequence_number = self._transaction_state.sequence_number(batch.topic_partition)
497+
log.debug("Dest: %s: %s producer_id=%s epoch=%s sequence=%s",
498+
node_id, batch.topic_partition, producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch,
499+
sequence_number)
500+
batch.records.set_producer_state(producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, sequence_number)
467501
batch.records.close()
468502
size += batch.records.size_in_bytes()
469503
ready.append(batch)

0 commit comments

Comments
 (0)