From 01487607f3e95530f8a01fdf8ec5024db2833056 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 09:03:17 -0700 Subject: [PATCH 01/18] InitProducerId Request/Response --- kafka/protocol/init_producer_id.py | 46 ++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 kafka/protocol/init_producer_id.py diff --git a/kafka/protocol/init_producer_id.py b/kafka/protocol/init_producer_id.py new file mode 100644 index 000000000..8426fe00b --- /dev/null +++ b/kafka/protocol/init_producer_id.py @@ -0,0 +1,46 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.types import Int16, Int32, Int64, Schema, String + + +class InitProducerIdResponse_v0(Response): + API_KEY = 22 + API_VERSION = 0 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('producer_id', Int64), + ('producer_epoch', Int16), + ) + + +class InitProducerIdResponse_v1(Response): + API_KEY = 22 + API_VERSION = 1 + SCHEMA = InitProducerIdResponse_v0.SCHEMA + + +class InitProducerIdRequest_v0(Request): + API_KEY = 22 + API_VERSION = 0 + RESPONSE_TYPE = InitProducerIdResponse_v0 + SCHEMA = Schema( + ('transactional_id', String('utf-8')), + ('transaction_timeout_ms', Int32), + ) + + +class InitProducerIdRequest_v1(Request): + API_KEY = 22 + API_VERSION = 1 + RESPONSE_TYPE = InitProducerIdResponse_v1 + SCHEMA = InitProducerIdRequest_v0.SCHEMA + + +InitProducerIdRequest = [ + InitProducerIdRequest_v0, InitProducerIdRequest_v1, +] +InitProducerIdResponse = [ + InitProducerIdResponse_v0, InitProducerIdResponse_v1, +] From fd861cc76a4f302f357dab6b114ae5042f37de19 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 09:03:29 -0700 Subject: [PATCH 02/18] Producer TransactionState --- kafka/producer/transaction_state.py | 96 +++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 kafka/producer/transaction_state.py diff --git a/kafka/producer/transaction_state.py b/kafka/producer/transaction_state.py new file mode 100644 index 000000000..1c9999672 --- /dev/null +++ b/kafka/producer/transaction_state.py @@ -0,0 +1,96 @@ +from __future__ import absolute_import, division + +import collections +import threading +import time + +from kafka.errors import IllegalStateError + + +NO_PRODUCER_ID = -1 +NO_PRODUCER_EPOCH = -1 + + +class PidAndEpoch(object): + __slots__ = ('producer_id', 'epoch') + + def __init__(self, producer_id, epoch): + self.producer_id = producer_id + self.epoch = epoch + + @property + def is_valid(self): + return NO_PRODUCER_ID < self.producer_id + + def __str__(self): + return "PidAndEpoch(producer_id={}, epoch={})".format(self.producer_id, self.epoch) + +class TransactionState(object): + __slots__ = ('pid_and_epoch', '_sequence_numbers', '_lock') + + def __init__(self): + self.pid_and_epoch = PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH) + self._sequence_numbers = collections.defaultdict(lambda: 0) + self._lock = threading.Condition() + + def has_pid(self): + return self.pid_and_epoch.is_valid + + + def await_pid_and_epoch(self, max_wait_time_ms): + """ + A blocking call to get the pid and epoch for the producer. If the PID and epoch has not been set, this method + will block for at most maxWaitTimeMs. It is expected that this method be called from application thread + contexts (ie. through Producer.send). The PID it self will be retrieved in the background thread. + + Arguments: + max_wait_time_ms (numeric): The maximum time to block. + + Returns: + PidAndEpoch object. Callers must check the 'is_valid' property of the returned object to ensure that a + valid pid and epoch is actually returned. + """ + with self._lock: + start = time.time() + elapsed = 0 + while not self.has_pid() and elapsed < max_wait_time_ms: + self._lock.wait(max_wait_time_ms / 1000) + elapsed = time.time() - start + return self.pid_and_epoch + + def set_pid_and_epoch(self, producer_id, epoch): + """ + Set the pid and epoch atomically. This method will signal any callers blocked on the `pidAndEpoch` method + once the pid is set. This method will be called on the background thread when the broker responds with the pid. + """ + with self._lock: + self.pid_and_epoch = PidAndEpoch(producer_id, epoch) + if self.pid_and_epoch.is_valid: + self._lock.notify_all() + + def reset_producer_id(self): + """ + This method is used when the producer needs to reset it's internal state because of an irrecoverable exception + from the broker. + + We need to reset the producer id and associated state when we have sent a batch to the broker, but we either get + a non-retriable exception or we run out of retries, or the batch expired in the producer queue after it was already + sent to the broker. + + In all of these cases, we don't know whether batch was actually committed on the broker, and hence whether the + sequence number was actually updated. If we don't reset the producer state, we risk the chance that all future + messages will return an OutOfOrderSequenceException. + """ + with self._lock: + self.pid_and_epoch = PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH) + self._sequence_numbers.clear() + + def sequence_number(self, tp): + with self._lock: + return self._sequence_numbers[tp] + + def increment_sequence_number(self, tp, increment): + with self._lock: + if tp not in self._sequence_numbers: + raise IllegalStateError("Attempt to increment sequence number for a partition with no current sequence.") + self._sequence_numbers[tp] += increment From abdd00f3b08ef6a0b96109b202fa9c5e16da3c55 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 09:55:15 -0700 Subject: [PATCH 03/18] MemoryRecordsBuilder / DefaultRecordsBuilder support for setting producer state --- kafka/record/default_records.py | 9 +++++++++ kafka/record/memory_records.py | 27 +++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 0d69d72a2..855306bbd 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -448,6 +448,15 @@ def __init__( self._buffer = bytearray(self.HEADER_STRUCT.size) + def set_producer_state(self, producer_id, producer_epoch, base_sequence): + self._producer_id = producer_id + self._producer_epoch = producer_epoch + self._base_sequence = base_sequence + + @property + def producer_id(self): + return self._producer_id + def _get_attributes(self, include_compression_type=True): attrs = 0 if include_compression_type: diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index 72baea547..a803047ea 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -22,7 +22,7 @@ import struct -from kafka.errors import CorruptRecordException +from kafka.errors import CorruptRecordException, IllegalStateError, UnsupportedVersionError from kafka.record.abc import ABCRecords from kafka.record.legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder from kafka.record.default_records import DefaultRecordBatch, DefaultRecordBatchBuilder @@ -113,7 +113,7 @@ def next_batch(self, _min_slice=MIN_SLICE, class MemoryRecordsBuilder(object): __slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed", - "_bytes_written") + "_magic", "_bytes_written", "_producer_id") def __init__(self, magic, compression_type, batch_size, offset=0): assert magic in [0, 1, 2], "Not supported magic" @@ -123,15 +123,18 @@ def __init__(self, magic, compression_type, batch_size, offset=0): magic=magic, compression_type=compression_type, is_transactional=False, producer_id=-1, producer_epoch=-1, base_sequence=-1, batch_size=batch_size) + self._producer_id = -1 else: self._builder = LegacyRecordBatchBuilder( magic=magic, compression_type=compression_type, batch_size=batch_size) + self._producer_id = None self._batch_size = batch_size self._buffer = None self._next_offset = offset self._closed = False + self._magic = magic self._bytes_written = 0 def skip(self, offsets_to_skip): @@ -155,6 +158,24 @@ def append(self, timestamp, key, value, headers=[]): self._next_offset += 1 return metadata + def set_producer_state(self, producer_id, producer_epoch, base_sequence): + if self._magic < 2: + raise UnsupportedVersionError('Producer State requires Message format v2+') + elif self._closed: + # Sequence numbers are assigned when the batch is closed while the accumulator is being drained. + # If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will + # be re queued. In this case, we should not attempt to set the state again, since changing the pid and sequence + # once a batch has been sent to the broker risks introducing duplicates. + raise IllegalStateError("Trying to set producer state of an already closed batch. This indicates a bug on the client.") + self._builder.set_producer_state(producer_id, producer_epoch, base_sequence) + self._producer_id = producer_id + + @property + def producer_id(self): + if self._magic < 2: + raise UnsupportedVersionError('Producer State requires Message format v2+') + return self._producer_id + def close(self): # This method may be called multiple times on the same batch # i.e., on retries @@ -164,6 +185,8 @@ def close(self): if not self._closed: self._bytes_written = self._builder.size() self._buffer = bytes(self._builder.build()) + if self._magic == 2: + self._producer_id = self._builder.producer_id self._builder = None self._closed = True From 7daf68867380b79258bcc3cc433e42624ef40174 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 09:56:26 -0700 Subject: [PATCH 04/18] KafkaProducer configs for enable_idempotence --- kafka/producer/kafka.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f0eb37a8f..8d10a35fb 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -19,6 +19,7 @@ from kafka.producer.future import FutureRecordMetadata, FutureProduceResult from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator from kafka.producer.sender import Sender +from kafka.producer.transaction_state import TransactionState from kafka.record.default_records import DefaultRecordBatchBuilder from kafka.record.legacy_records import LegacyRecordBatchBuilder from kafka.serializer import Serializer @@ -93,6 +94,19 @@ class KafkaProducer(object): value_serializer (callable): used to convert user-supplied message values to bytes. If not None, called as f(value), should return bytes. Default: None. + enable_idempotence (bool): When set to True, the producer will ensure + that exactly one copy of each message is written in the stream. + If False, producer retries due to broker failures, etc., may write + duplicates of the retried message in the stream. Default: False. + + Note that enabling idempotence requires + `max_in_flight_requests_per_connection` to be set to 1 and `retries` + cannot be zero. Additionally, `acks` must be set to 'all'. If these + values are left at their defaults, the producer will override the + defaults to be suitable. If the values are set to something + incompatible with the idempotent producer, a KafkaConfigurationError + will be raised. + acks (0, 1, 'all'): The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The @@ -303,6 +317,7 @@ class KafkaProducer(object): 'client_id': None, 'key_serializer': None, 'value_serializer': None, + 'enable_idempotence': False, 'acks': 1, 'bootstrap_topics_filter': set(), 'compression_type': None, @@ -365,6 +380,7 @@ class KafkaProducer(object): def __init__(self, **configs): log.debug("Starting the Kafka producer") # trace self.config = copy.copy(self.DEFAULT_CONFIG) + user_provided_configs = set(configs.keys()) for key in self.config: if key in configs: self.config[key] = configs.pop(key) @@ -428,6 +444,30 @@ def __init__(self, **configs): assert checker(), "Libraries for {} compression codec not found".format(ct) self.config['compression_attrs'] = compression_attrs + self._transaction_state = None + if self.config['enable_idempotence']: + self._transaction_state = TransactionState() + if 'retries' not in user_provided_configs: + log.info("Overriding the default 'retries' config to 3 since the idempotent producer is enabled.") + self.config['retries'] = 3 + elif self.config['retries'] == 0: + raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.") + + if 'max_in_flight_requests_per_connection' not in user_provided_configs: + log.info("Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.") + self.config['max_in_flight_requests_per_connection'] = 1 + elif self.config['max_in_flight_requests_per_connection'] != 1: + raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to 1 in order" + " to use the idempotent producer." + " Otherwise we cannot guarantee idempotence.") + + if 'acks' not in user_provided_configs: + log.info("Overriding the default 'acks' config to 'all' since idempotence is enabled") + self.config['acks'] = -1 + elif self.config['acks'] != -1: + raise Errors.KafkaConfigurationError("Must set 'acks' config to 'all' in order to use the idempotent" + " producer. Otherwise we cannot guarantee idempotence") + message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._metadata = client.cluster From 85e4512df47d3f91a9b93fefe48a954134b251fd Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 10:17:41 -0700 Subject: [PATCH 05/18] Handle transaction state in RecordAccumulator --- kafka/producer/kafka.py | 5 ++++- kafka/producer/record_accumulator.py | 25 +++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 8d10a35fb..0fbc901ae 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -469,7 +469,10 @@ def __init__(self, **configs): " producer. Otherwise we cannot guarantee idempotence") message_version = self._max_usable_produce_magic() - self._accumulator = RecordAccumulator(message_version=message_version, **self.config) + self._accumulator = RecordAccumulator( + transaction_state=self._transaction_state, + message_version=message_version, + **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) self._sender = Sender(client, self._metadata, diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index ba823500d..3fc8c467d 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -161,6 +161,7 @@ class RecordAccumulator(object): 'compression_attrs': 0, 'linger_ms': 0, 'retry_backoff_ms': 100, + 'transaction_state': None, 'message_version': 0, } @@ -171,6 +172,7 @@ def __init__(self, **configs): self.config[key] = configs.pop(key) self._closed = False + self._transaction_state = self.config['transaction_state'] self._flushes_in_progress = AtomicInteger() self._appends_in_progress = AtomicInteger() self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch] @@ -233,6 +235,10 @@ def append(self, tp, timestamp_ms, key, value, headers): batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False + if self._transaction_state and self.config['message_version'] < 2: + raise Errors.UnsupportedVersionError("Attempting to use idempotence with a broker which" + " does not support the required message format (v2)." + " The broker must be version 0.11 or later.") records = MemoryRecordsBuilder( self.config['message_version'], self.config['compression_attrs'], @@ -463,7 +469,26 @@ def drain(self, cluster, nodes, max_size): # single request break else: + pid_and_epoch = None + if self._transaction_state: + pid_and_epoch = self._transaction_state.pid_and_epoch + if not pid_and_epoch.is_valid: + # we cannot send the batch until we have refreshed the PID + log.debug("Waiting to send ready batches because transaction producer id is not valid") + break + batch = dq.popleft() + if pid_and_epoch and not batch.in_retry(): + # If the batch is in retry, then we should not change the pid and + # sequence number, since this may introduce duplicates. In particular, + # the previous attempt may actually have been accepted, and if we change + # the pid and sequence here, this attempt will also be accepted, causing + # a duplicate. + sequence_number = self._transaction_state.sequence_number(batch.topic_partition) + log.debug("Dest: %s : producer_idd: %s epoch: %s Assigning sequence for %s: %s", + node_id, pid_and_epoch.producer_id, pid_and_epoch.epoch, + batch.topic_partition, sequence_number) + batch.records.set_producer_state(pid_and_epoch.producer_id, pid_and_epoch.epoch, sequence_number) batch.records.close() size += batch.records.size_in_bytes() ready.append(batch) From 71bdf1f37e2f392ca10a3b4b8424b44dee9612d4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 14:30:27 -0700 Subject: [PATCH 06/18] KafkaClient: send_and_receive, await_ready, connection_failed --- kafka/client_async.py | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 19508b242..30258b7bd 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -27,7 +27,7 @@ from kafka.metrics.stats.rate import TimeUnit from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.metadata import MetadataRequest -from kafka.util import Dict, WeakMethod, ensure_valid_topic_name +from kafka.util import Dict, WeakMethod, ensure_valid_topic_name, timeout_ms_fn # Although this looks unused, it actually monkey-patches socket.socketpair() # and should be left in as long as we're using socket.socketpair() in this file from kafka.vendor import socketpair # noqa: F401 @@ -400,6 +400,11 @@ def maybe_connect(self, node_id, wakeup=True): return True return False + def connection_failed(self, node_id): + if node_id not in self._conns: + return False + return self._conns[node_id].connect_failed() + def _should_recycle_connection(self, conn): # Never recycle unless disconnected if not conn.disconnected(): @@ -1157,6 +1162,39 @@ def bootstrap_connected(self): else: return False + def await_ready(self, node_id, timeout_ms=30000): + """ + Invokes `poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll` + invocations until the connection to `node` is ready, the timeoutMs expires or the connection fails. + + It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails, + an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive + connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which + has recently disconnected. + + This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with + care. + """ + inner_timeout_ms = timeout_ms_fn(timeout_ms, None) + self.poll(timeout_ms=0) + if self.is_ready(node_id): + return True + + while not self.is_ready(node_id) and inner_timeout_ms() > 0: + if self.connection_failed(node_id): + raise Errors.KafkaConnectionError("Connection to %s failed." % (node_id,)) + self.maybe_connect(node_id) + self.poll(timeout_ms=inner_timeout_ms()) + return self.is_ready(node_id) + + def send_and_receive(self, node_id, request): + future = self.send(node_id, request) + self.poll(future=future) + assert future.is_done + if future.failed(): + raise future.exception + return future.value + # OrderedDict requires python2.7+ try: From a0e2b5b931d7a118ccc198790e7b1cc73ee5032b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 14:34:12 -0700 Subject: [PATCH 07/18] Implement transaction_state in Sender --- kafka/producer/kafka.py | 1 + kafka/producer/record_accumulator.py | 4 + kafka/producer/sender.py | 125 ++++++++++++++++++++++----- 3 files changed, 110 insertions(+), 20 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0fbc901ae..db370edd0 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -478,6 +478,7 @@ def __init__(self, **configs): self._sender = Sender(client, self._metadata, self._accumulator, metrics=self._metrics, + transaction_state=self._transaction_state, guarantee_message_order=guarantee_message_order, **self.config) self._sender.daemon = True diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 3fc8c467d..0964ae358 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -52,6 +52,10 @@ def __init__(self, tp, records): def record_count(self): return self.records.next_offset() + @property + def producer_id(self): + return self.records.producer_id if self.records else None + def try_append(self, timestamp_ms, key, value, headers): metadata = self.records.append(timestamp_ms, key, value, headers) if metadata is None: diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 20af28d07..a51d3c594 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -11,6 +11,7 @@ from kafka import errors as Errors from kafka.metrics.measurable import AnonMeasurable from kafka.metrics.stats import Avg, Max, Rate +from kafka.protocol.init_producer_id import InitProducerIdRequest from kafka.protocol.produce import ProduceRequest from kafka.structs import TopicPartition from kafka.version import __version__ @@ -29,8 +30,12 @@ class Sender(threading.Thread): 'acks': 1, 'retries': 0, 'request_timeout_ms': 30000, + 'retry_backoff_ms': 100, 'metrics': None, 'guarantee_message_order': False, + 'transaction_state': None, + 'transactional_id': None, + 'transaction_timeout_ms': 60000, 'client_id': 'kafka-python-' + __version__, } @@ -52,6 +57,7 @@ def __init__(self, client, metadata, accumulator, **configs): self._sensors = SenderMetrics(self.config['metrics'], self._client, self._metadata) else: self._sensors = None + self._transaction_state = self.config['transaction_state'] def run(self): """The main run loop for the sender thread.""" @@ -95,6 +101,8 @@ def run_once(self): while self._topics_to_add: self._client.add_topic(self._topics_to_add.pop()) + self._maybe_wait_for_producer_id() + # get the list of partitions with data ready to send result = self._accumulator.ready(self._metadata) ready_nodes, next_ready_check_delay, unknown_leaders_exist = result @@ -128,6 +136,13 @@ def run_once(self): expired_batches = self._accumulator.abort_expired_batches( self.config['request_timeout_ms'], self._metadata) + # Reset the PID if an expired batch has previously been sent to the broker. + # See the documentation of `TransactionState.reset_producer_id` to understand why + # we need to reset the producer id here. + if self._transaction_state and any([batch.in_retry() for batch in expired_batches]): + self._transaction_state.reset_producer_id() + return + if self._sensors: for expired_batch in expired_batches: self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count) @@ -185,6 +200,44 @@ def add_topic(self, topic): self._topics_to_add.add(topic) self.wakeup() + def _maybe_wait_for_producer_id(self): + log.debug("_maybe_wait_for_producer_id") + if not self._transaction_state: + log.debug("_maybe_wait_for_producer_id: no transaction_state...") + return + + while not self._transaction_state.has_pid(): + try: + node_id = self._client.least_loaded_node() + if node_id is None or not self._client.await_ready(node_id): + log.debug("Could not find an available broker to send InitProducerIdRequest to." + + " Will back off and try again.") + time.sleep(self._client.least_loaded_node_refresh_ms() / 1000) + continue + version = self._client.api_version(InitProducerIdRequest, max_version=1) + request = InitProducerIdRequest[version]( + transactional_id=self.config['transactional_id'], + transaction_timeout_ms=self.config['transaction_timeout_ms'], + ) + response = self._client.send_and_receive(node_id, request) + error_type = Errors.for_code(response.error_code) + if error_type is Errors.NoError: + self._transaction_state.set_pid_and_epoch(response.producer_id, response.producer_epoch) + return + elif getattr(error_type, 'retriable', False): + log.debug("Retriable error from InitProducerId response: %s", error_type.__name__) + if getattr(error_type, 'invalid_metadata', False): + self._metadata.request_update() + else: + log.error("Received a non-retriable error from InitProducerId response: %s", error_type.__name__) + break + except Errors.KafkaConnectionError: + log.debug("Broker %s disconnected while awaiting InitProducerId response", node_id) + except Errors.RequestTimedOutError: + log.debug("InitProducerId request to node %s timed out", node_id) + time.sleep(self.config['retry_backoff_ms'] / 1000) + log.debug("_maybe_wait_for_producer_id: ok: %s", self._transaction_state.pid_and_epoch) + def _failed_produce(self, batches, node_id, error): log.error("Error sending produce request to node %d: %s", node_id, error) # trace for batch in batches: @@ -235,28 +288,60 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star if error is Errors.NoError: error = None - if error is not None and self._can_retry(batch, error): - # retry - log.warning("Got error produce response on topic-partition %s," - " retrying (%d attempts left). Error: %s", - batch.topic_partition, - self.config['retries'] - batch.attempts - 1, - error) - self._accumulator.reenqueue(batch) - if self._sensors: - self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) - else: - if error is Errors.TopicAuthorizationFailedError: - error = error(batch.topic_partition.topic) + if error is not None: + if self._can_retry(batch, error): + # retry + log.warning("Got error produce response on topic-partition %s," + " retrying (%d attempts left). Error: %s", + batch.topic_partition, + self.config['retries'] - batch.attempts - 1, + error) + + # If idempotence is enabled only retry the request if the current PID is the same as the pid of the batch. + if not self._transaction_state or self._transaction_state.pid_and_epoch.producer_id == batch.producer_id: + log.debug("Retrying batch to topic-partition %s. Sequence number: %s", + batch.topic_partition, + self._transaction_state.sequence_number(batch.topic_partition) if self._transaction_state else None) + self._accumulator.reenqueue(batch) + if self._sensors: + self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) + else: + self._transaction_state.reset_producer_id() + log.warning("Attempted to retry sending a batch but the producer id changed from %s to %s. This batch will be dropped" % ( + batch.producer_id, self._transaction_state.pid_and_epoch.producer_id)) + batch.done(base_offset, timestamp_ms, error, log_start_offset) + if self._sensors: + self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) + else: + if error is Errors.OutOfOrderSequenceNumberError and batch.producer_id == self._transaction_state.pid_and_epoch.producer_id: + log.error("The broker received an out of order sequence number error for produer_id %s, topic-partition %s" + " at offset %s. This indicates data loss on the broker, and should be investigated.", + batch.producer_id, batch.topic_partition, base_offset) + + if error is Errors.TopicAuthorizationFailedError: + error = error(batch.topic_partition.topic) + + # tell the user the result of their request + batch.done(base_offset, timestamp_ms, error, log_start_offset) + if self._sensors: + self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) + + if error is Errors.UnknownTopicOrPartitionError: + log.warning("Received unknown topic or partition error in produce request on partition %s." + " The topic/partition may not exist or the user may not have Describe access to it", + batch.topic_partition) + + if getattr(error, 'invalid_metadata', False): + self._metadata.request_update() - # tell the user the result of their request - batch.done(base_offset, timestamp_ms, error, log_start_offset) - self._accumulator.deallocate(batch) - if error is not None and self._sensors: - self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) + else: + batch.done(base_offset, timestamp_ms, error, log_start_offset) + self._accumulator.deallocate(batch) - if getattr(error, 'invalid_metadata', False): - self._metadata.request_update() + if self._transaction_state and self._transaction_state.pid_and_epoch.producer_id == batch.producer_id: + self._transaction_state.increment_sequence_number(batch.topic_partition, batch.record_count) + log.debug("Incremented sequence number for topic-partition %s to %s", batch.topic_partition, + self._transaction_state.sequence_number(batch.topic_partition)) # Unmute the completed partition. if self.config['guarantee_message_order']: From cbf6fc5b711b84872ea0cf08e356ca41c266661f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 14:48:45 -0700 Subject: [PATCH 08/18] pid => producer_id --- kafka/producer/record_accumulator.py | 12 ++++++------ kafka/producer/sender.py | 12 ++++++------ kafka/producer/transaction_state.py | 24 ++++++++++++------------ 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 0964ae358..fb9643e2e 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -473,16 +473,16 @@ def drain(self, cluster, nodes, max_size): # single request break else: - pid_and_epoch = None + producer_id_and_epoch = None if self._transaction_state: - pid_and_epoch = self._transaction_state.pid_and_epoch - if not pid_and_epoch.is_valid: + producer_id_and_epoch = self._transaction_state.producer_id_and_epoch + if not producer_id_and_epoch.is_valid: # we cannot send the batch until we have refreshed the PID log.debug("Waiting to send ready batches because transaction producer id is not valid") break batch = dq.popleft() - if pid_and_epoch and not batch.in_retry(): + if producer_id_and_epoch and not batch.in_retry(): # If the batch is in retry, then we should not change the pid and # sequence number, since this may introduce duplicates. In particular, # the previous attempt may actually have been accepted, and if we change @@ -490,9 +490,9 @@ def drain(self, cluster, nodes, max_size): # a duplicate. sequence_number = self._transaction_state.sequence_number(batch.topic_partition) log.debug("Dest: %s : producer_idd: %s epoch: %s Assigning sequence for %s: %s", - node_id, pid_and_epoch.producer_id, pid_and_epoch.epoch, + node_id, producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, batch.topic_partition, sequence_number) - batch.records.set_producer_state(pid_and_epoch.producer_id, pid_and_epoch.epoch, sequence_number) + batch.records.set_producer_state(producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, sequence_number) batch.records.close() size += batch.records.size_in_bytes() ready.append(batch) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index a51d3c594..faec643fc 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -222,7 +222,7 @@ def _maybe_wait_for_producer_id(self): response = self._client.send_and_receive(node_id, request) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - self._transaction_state.set_pid_and_epoch(response.producer_id, response.producer_epoch) + self._transaction_state.set_producer_id_and_epoch(response.producer_id, response.producer_epoch) return elif getattr(error_type, 'retriable', False): log.debug("Retriable error from InitProducerId response: %s", error_type.__name__) @@ -236,7 +236,7 @@ def _maybe_wait_for_producer_id(self): except Errors.RequestTimedOutError: log.debug("InitProducerId request to node %s timed out", node_id) time.sleep(self.config['retry_backoff_ms'] / 1000) - log.debug("_maybe_wait_for_producer_id: ok: %s", self._transaction_state.pid_and_epoch) + log.debug("_maybe_wait_for_producer_id: ok: %s", self._transaction_state.producer_id_and_epoch) def _failed_produce(self, batches, node_id, error): log.error("Error sending produce request to node %d: %s", node_id, error) # trace @@ -298,7 +298,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star error) # If idempotence is enabled only retry the request if the current PID is the same as the pid of the batch. - if not self._transaction_state or self._transaction_state.pid_and_epoch.producer_id == batch.producer_id: + if not self._transaction_state or self._transaction_state.producer_id_and_epoch.producer_id == batch.producer_id: log.debug("Retrying batch to topic-partition %s. Sequence number: %s", batch.topic_partition, self._transaction_state.sequence_number(batch.topic_partition) if self._transaction_state else None) @@ -308,12 +308,12 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star else: self._transaction_state.reset_producer_id() log.warning("Attempted to retry sending a batch but the producer id changed from %s to %s. This batch will be dropped" % ( - batch.producer_id, self._transaction_state.pid_and_epoch.producer_id)) + batch.producer_id, self._transaction_state.producer_id_and_epoch.producer_id)) batch.done(base_offset, timestamp_ms, error, log_start_offset) if self._sensors: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) else: - if error is Errors.OutOfOrderSequenceNumberError and batch.producer_id == self._transaction_state.pid_and_epoch.producer_id: + if error is Errors.OutOfOrderSequenceNumberError and batch.producer_id == self._transaction_state.producer_id_and_epoch.producer_id: log.error("The broker received an out of order sequence number error for produer_id %s, topic-partition %s" " at offset %s. This indicates data loss on the broker, and should be investigated.", batch.producer_id, batch.topic_partition, base_offset) @@ -338,7 +338,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star batch.done(base_offset, timestamp_ms, error, log_start_offset) self._accumulator.deallocate(batch) - if self._transaction_state and self._transaction_state.pid_and_epoch.producer_id == batch.producer_id: + if self._transaction_state and self._transaction_state.producer_id_and_epoch.producer_id == batch.producer_id: self._transaction_state.increment_sequence_number(batch.topic_partition, batch.record_count) log.debug("Incremented sequence number for topic-partition %s to %s", batch.topic_partition, self._transaction_state.sequence_number(batch.topic_partition)) diff --git a/kafka/producer/transaction_state.py b/kafka/producer/transaction_state.py index 1c9999672..05cdc5766 100644 --- a/kafka/producer/transaction_state.py +++ b/kafka/producer/transaction_state.py @@ -11,7 +11,7 @@ NO_PRODUCER_EPOCH = -1 -class PidAndEpoch(object): +class ProducerIdAndEpoch(object): __slots__ = ('producer_id', 'epoch') def __init__(self, producer_id, epoch): @@ -23,21 +23,21 @@ def is_valid(self): return NO_PRODUCER_ID < self.producer_id def __str__(self): - return "PidAndEpoch(producer_id={}, epoch={})".format(self.producer_id, self.epoch) + return "ProducerIdAndEpoch(producer_id={}, epoch={})".format(self.producer_id, self.epoch) class TransactionState(object): - __slots__ = ('pid_and_epoch', '_sequence_numbers', '_lock') + __slots__ = ('producer_id_and_epoch', '_sequence_numbers', '_lock') def __init__(self): - self.pid_and_epoch = PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH) + self.producer_id_and_epoch = ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH) self._sequence_numbers = collections.defaultdict(lambda: 0) self._lock = threading.Condition() def has_pid(self): - return self.pid_and_epoch.is_valid + return self.producer_id_and_epoch.is_valid - def await_pid_and_epoch(self, max_wait_time_ms): + def await_producer_id_and_epoch(self, max_wait_time_ms): """ A blocking call to get the pid and epoch for the producer. If the PID and epoch has not been set, this method will block for at most maxWaitTimeMs. It is expected that this method be called from application thread @@ -47,7 +47,7 @@ def await_pid_and_epoch(self, max_wait_time_ms): max_wait_time_ms (numeric): The maximum time to block. Returns: - PidAndEpoch object. Callers must check the 'is_valid' property of the returned object to ensure that a + ProducerIdAndEpoch object. Callers must check the 'is_valid' property of the returned object to ensure that a valid pid and epoch is actually returned. """ with self._lock: @@ -56,16 +56,16 @@ def await_pid_and_epoch(self, max_wait_time_ms): while not self.has_pid() and elapsed < max_wait_time_ms: self._lock.wait(max_wait_time_ms / 1000) elapsed = time.time() - start - return self.pid_and_epoch + return self.producer_id_and_epoch - def set_pid_and_epoch(self, producer_id, epoch): + def set_producer_id_and_epoch(self, producer_id, epoch): """ Set the pid and epoch atomically. This method will signal any callers blocked on the `pidAndEpoch` method once the pid is set. This method will be called on the background thread when the broker responds with the pid. """ with self._lock: - self.pid_and_epoch = PidAndEpoch(producer_id, epoch) - if self.pid_and_epoch.is_valid: + self.producer_id_and_epoch = ProducerIdAndEpoch(producer_id, epoch) + if self.producer_id_and_epoch.is_valid: self._lock.notify_all() def reset_producer_id(self): @@ -82,7 +82,7 @@ def reset_producer_id(self): messages will return an OutOfOrderSequenceException. """ with self._lock: - self.pid_and_epoch = PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH) + self.producer_id_and_epoch = ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH) self._sequence_numbers.clear() def sequence_number(self, tp): From 83ab1b545af107eab35b9f20fc493a6c9653dee0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 16:11:01 -0700 Subject: [PATCH 09/18] fixup spacing in Sender; use kwargs for batch.done --- kafka/producer/sender.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index faec643fc..c370d87c4 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -309,7 +309,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star self._transaction_state.reset_producer_id() log.warning("Attempted to retry sending a batch but the producer id changed from %s to %s. This batch will be dropped" % ( batch.producer_id, self._transaction_state.producer_id_and_epoch.producer_id)) - batch.done(base_offset, timestamp_ms, error, log_start_offset) + batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms, exception=error, log_start_offset=log_start_offset) if self._sensors: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) else: @@ -322,7 +322,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, timestamp_ms, error, log_start_offset) + batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms, exception=error, log_start_offset=log_start_offset) if self._sensors: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) @@ -335,13 +335,13 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star self._metadata.request_update() else: - batch.done(base_offset, timestamp_ms, error, log_start_offset) - self._accumulator.deallocate(batch) + batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms, log_start_offset=log_start_offset) + self._accumulator.deallocate(batch) - if self._transaction_state and self._transaction_state.producer_id_and_epoch.producer_id == batch.producer_id: - self._transaction_state.increment_sequence_number(batch.topic_partition, batch.record_count) - log.debug("Incremented sequence number for topic-partition %s to %s", batch.topic_partition, - self._transaction_state.sequence_number(batch.topic_partition)) + if self._transaction_state and self._transaction_state.producer_id_and_epoch.producer_id == batch.producer_id: + self._transaction_state.increment_sequence_number(batch.topic_partition, batch.record_count) + log.debug("Incremented sequence number for topic-partition %s to %s", batch.topic_partition, + self._transaction_state.sequence_number(batch.topic_partition)) # Unmute the completed partition. if self.config['guarantee_message_order']: From 55ed258bea02e4a83dce877447daa16d5fee7664 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 16:38:12 -0700 Subject: [PATCH 10/18] KafkaProducer class method max_usable_produce_magic --- kafka/producer/kafka.py | 11 ++++++----- test/test_sender.py | 3 +++ 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index db370edd0..320a1657f 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -468,7 +468,7 @@ def __init__(self, **configs): raise Errors.KafkaConfigurationError("Must set 'acks' config to 'all' in order to use the idempotent" " producer. Otherwise we cannot guarantee idempotence") - message_version = self._max_usable_produce_magic() + message_version = self.max_usable_produce_magic(self.config['api_version']) self._accumulator = RecordAccumulator( transaction_state=self._transaction_state, message_version=message_version, @@ -592,16 +592,17 @@ def partitions_for(self, topic): max_wait = self.config['max_block_ms'] / 1000 return self._wait_on_metadata(topic, max_wait) - def _max_usable_produce_magic(self): - if self.config['api_version'] >= (0, 11): + @classmethod + def max_usable_produce_magic(cls, api_version): + if api_version >= (0, 11): return 2 - elif self.config['api_version'] >= (0, 10, 0): + elif api_version >= (0, 10, 0): return 1 else: return 0 def _estimate_size_in_bytes(self, key, value, headers=[]): - magic = self._max_usable_produce_magic() + magic = self.max_usable_produce_magic(self.config['api_version']) if magic == 2: return DefaultRecordBatchBuilder.estimate_size_in_bytes( key, value, headers) diff --git a/test/test_sender.py b/test/test_sender.py index b037d2b48..eedc43d25 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -6,6 +6,7 @@ from kafka.client_async import KafkaClient from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS +from kafka.producer.kafka import KafkaProducer from kafka.protocol.produce import ProduceRequest from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch from kafka.producer.sender import Sender @@ -24,6 +25,7 @@ def sender(client, accumulator, metrics, mocker): @pytest.mark.parametrize(("api_version", "produce_version"), [ + ((2, 1), 7), ((0, 10, 0), 2), ((0, 9), 1), ((0, 8, 0), 0) @@ -31,6 +33,7 @@ def sender(client, accumulator, metrics, mocker): def test_produce_request(sender, mocker, api_version, produce_version): sender._client._api_versions = BROKER_API_VERSIONS[api_version] tp = TopicPartition('foo', 0) + magic = KafkaProducer.max_usable_produce_magic(api_version) records = MemoryRecordsBuilder( magic=1, compression_type=0, batch_size=100000) batch = ProducerBatch(tp, records) From 4eaa1a02866152fb95fcad0c7b75346065ed4fff Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 16:38:26 -0700 Subject: [PATCH 11/18] test ProducerBatch in test_record_accumulator --- test/test_record_accumulator.py | 61 +++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 test/test_record_accumulator.py diff --git a/test/test_record_accumulator.py b/test/test_record_accumulator.py new file mode 100644 index 000000000..4817fb56d --- /dev/null +++ b/test/test_record_accumulator.py @@ -0,0 +1,61 @@ +# pylint: skip-file +from __future__ import absolute_import + +import pytest +import io + +from kafka.producer.future import FutureRecordMetadata, RecordMetadata +from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch +from kafka.record.memory_records import MemoryRecordsBuilder +from kafka.structs import TopicPartition + + +def test_producer_batch_producer_id(): + tp = TopicPartition('foo', 0) + records = MemoryRecordsBuilder( + magic=2, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records, io.BytesIO()) + assert batch.producer_id == -1 + batch.records.set_producer_state(123, 456, 789) + assert batch.producer_id == 123 + records.close() + assert batch.producer_id == 123 + +@pytest.mark.parametrize("magic", [0, 1, 2]) +def test_producer_batch_try_append(magic): + tp = TopicPartition('foo', 0) + records = MemoryRecordsBuilder( + magic=magic, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records, io.BytesIO()) + assert batch.record_count == 0 + future = batch.try_append(0, b'key', b'value', []) + assert isinstance(future, FutureRecordMetadata) + assert not future.is_done + batch.done(base_offset=123, timestamp_ms=456, log_start_offset=0) + assert future.is_done + # record-level checksum only provided in v0/v1 formats; payload includes magic-byte + if magic == 0: + checksum = 592888119 + elif magic == 1: + checksum = 213653215 + else: + checksum = None + + expected_metadata = RecordMetadata( + topic=tp[0], partition=tp[1], topic_partition=tp, + offset=123, timestamp=456, log_start_offset=0, + checksum=checksum, + serialized_key_size=3, serialized_value_size=5, serialized_header_size=-1) + assert future.value == expected_metadata + +def test_producer_batch_retry(): + tp = TopicPartition('foo', 0) + records = MemoryRecordsBuilder( + magic=2, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records, io.BytesIO()) + assert not batch.in_retry() + batch.set_retry() + assert batch.in_retry() + +def test_producer_batch_maybe_expire(): + pass From 0d981eefec6c7440b90a9214a4cf763091d2de3e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 20:53:14 -0700 Subject: [PATCH 12/18] Add optional now kwarg to RecordAccumulator methods --- kafka/producer/record_accumulator.py | 33 ++++++++++++++++------------ 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index fb9643e2e..f0248490d 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -35,9 +35,9 @@ def get(self): class ProducerBatch(object): - def __init__(self, tp, records): + def __init__(self, tp, records, now=None): self.max_record_size = 0 - now = time.time() + now = time.time() if now is None else now self.created = now self.drained = None self.attempts = 0 @@ -56,13 +56,14 @@ def record_count(self): def producer_id(self): return self.records.producer_id if self.records else None - def try_append(self, timestamp_ms, key, value, headers): + def try_append(self, timestamp_ms, key, value, headers, now=None): metadata = self.records.append(timestamp_ms, key, value, headers) if metadata is None: return None + now = time.time() if now is None else now self.max_record_size = max(self.max_record_size, metadata.size) - self.last_append = time.time() + self.last_append = now future = FutureRecordMetadata(self.produce_future, metadata.offset, metadata.timestamp, metadata.crc, len(key) if key is not None else -1, @@ -85,7 +86,7 @@ def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_of log_start_offset, exception) # trace self.produce_future.failure(exception) - def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full): + def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full, now=None): """Expire batches if metadata is not available A batch whose metadata is not available should be expired if one @@ -97,7 +98,7 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full) * the batch is in retry AND request timeout has elapsed after the backoff period ended. """ - now = time.time() + now = time.time() if now is None else now since_append = now - self.last_append since_ready = now - (self.created + linger_ms / 1000.0) since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0) @@ -125,6 +126,10 @@ def in_retry(self): def set_retry(self): self._retry = True + @property + def is_done(self): + return self.produce_future.is_done + def __str__(self): return 'ProducerBatch(topic_partition=%s, record_count=%d)' % ( self.topic_partition, self.records.next_offset()) @@ -320,9 +325,9 @@ def abort_expired_batches(self, request_timeout_ms, cluster): return expired_batches - def reenqueue(self, batch): + def reenqueue(self, batch, now=None): """Re-enqueue the given record batch in the accumulator to retry.""" - now = time.time() + now = time.time() if now is None else now batch.attempts += 1 batch.last_attempt = now batch.last_append = now @@ -333,7 +338,7 @@ def reenqueue(self, batch): with self._tp_locks[batch.topic_partition]: dq.appendleft(batch) - def ready(self, cluster): + def ready(self, cluster, now=None): """ Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable partition will be ready; @@ -367,7 +372,7 @@ def ready(self, cluster): ready_nodes = set() next_ready_check = 9999999.99 unknown_leaders_exist = False - now = time.time() + now = time.time() if now is None else now # several threads are accessing self._batches -- to simplify # concurrent access, we iterate over a snapshot of partitions @@ -422,7 +427,7 @@ def has_unsent(self): return True return False - def drain(self, cluster, nodes, max_size): + def drain(self, cluster, nodes, max_size, now=None): """ Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified size on a per-node basis. @@ -440,7 +445,7 @@ def drain(self, cluster, nodes, max_size): if not nodes: return {} - now = time.time() + now = time.time() if now is None else now batches = {} for node_id in nodes: size = 0 @@ -489,8 +494,8 @@ def drain(self, cluster, nodes, max_size): # the pid and sequence here, this attempt will also be accepted, causing # a duplicate. sequence_number = self._transaction_state.sequence_number(batch.topic_partition) - log.debug("Dest: %s : producer_idd: %s epoch: %s Assigning sequence for %s: %s", - node_id, producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, + log.debug("Dest: %s: %s producer_id=%s epoch=%s sequence=%s", + node_id, batch.topic_partition, producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, batch.topic_partition, sequence_number) batch.records.set_producer_state(producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, sequence_number) batch.records.close() From 5bab3f253d5826841338c7ed21e207ddb959bd8e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 26 Mar 2025 20:53:42 -0700 Subject: [PATCH 13/18] test_producer_batch_maybe_expire --- test/test_record_accumulator.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/test/test_record_accumulator.py b/test/test_record_accumulator.py index 4817fb56d..3ae596369 100644 --- a/test/test_record_accumulator.py +++ b/test/test_record_accumulator.py @@ -4,6 +4,7 @@ import pytest import io +from kafka.errors import KafkaTimeoutError from kafka.producer.future import FutureRecordMetadata, RecordMetadata from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch from kafka.record.memory_records import MemoryRecordsBuilder @@ -58,4 +59,17 @@ def test_producer_batch_retry(): assert batch.in_retry() def test_producer_batch_maybe_expire(): - pass + tp = TopicPartition('foo', 0) + records = MemoryRecordsBuilder( + magic=2, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records, io.BytesIO(), now=1) + future = batch.try_append(0, b'key', b'value', [], now=2) + request_timeout_ms = 5000 + retry_backoff_ms = 200 + linger_ms = 1000 + is_full = True + batch.maybe_expire(request_timeout_ms, retry_backoff_ms, linger_ms, is_full, now=20) + assert batch.is_done + assert future.is_done + assert future.failed() + assert isinstance(future.exception, KafkaTimeoutError) From 8ae67a7f81259a67e60eb6245042475748f1b6e1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 27 Mar 2025 00:31:36 -0400 Subject: [PATCH 14/18] Remove maybe_wait_for_producer_id test debug statements; PID => producer_id --- kafka/producer/sender.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index c370d87c4..b66c6fda0 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -136,7 +136,7 @@ def run_once(self): expired_batches = self._accumulator.abort_expired_batches( self.config['request_timeout_ms'], self._metadata) - # Reset the PID if an expired batch has previously been sent to the broker. + # Reset the producer_id if an expired batch has previously been sent to the broker. # See the documentation of `TransactionState.reset_producer_id` to understand why # we need to reset the producer id here. if self._transaction_state and any([batch.in_retry() for batch in expired_batches]): @@ -201,9 +201,7 @@ def add_topic(self, topic): self.wakeup() def _maybe_wait_for_producer_id(self): - log.debug("_maybe_wait_for_producer_id") if not self._transaction_state: - log.debug("_maybe_wait_for_producer_id: no transaction_state...") return while not self._transaction_state.has_pid(): @@ -236,7 +234,6 @@ def _maybe_wait_for_producer_id(self): except Errors.RequestTimedOutError: log.debug("InitProducerId request to node %s timed out", node_id) time.sleep(self.config['retry_backoff_ms'] / 1000) - log.debug("_maybe_wait_for_producer_id: ok: %s", self._transaction_state.producer_id_and_epoch) def _failed_produce(self, batches, node_id, error): log.error("Error sending produce request to node %d: %s", node_id, error) # trace From f53966c9ebe43dca0bef3f28cae7e491c62dc5c9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 27 Mar 2025 00:33:41 -0400 Subject: [PATCH 15/18] Fix producer tests max_usable_produce_magic --- test/test_producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_producer.py b/test/test_producer.py index 069362f26..303832b9f 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -100,7 +100,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): retries=5, max_block_ms=30000, compression_type=compression) as producer: - magic = producer._max_usable_produce_magic() + magic = producer.max_usable_produce_magic(producer.config['api_version']) # record headers are supported in 0.11.0 if env_kafka_version() < (0, 11, 0): From a6e59e82bc4fd0bdabe9a8e1329f01eb8a0e958d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Apr 2025 14:48:08 -0700 Subject: [PATCH 16/18] fix merge conflict in test_record_accumulator --- test/test_record_accumulator.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/test_record_accumulator.py b/test/test_record_accumulator.py index 3ae596369..babff5617 100644 --- a/test/test_record_accumulator.py +++ b/test/test_record_accumulator.py @@ -15,7 +15,7 @@ def test_producer_batch_producer_id(): tp = TopicPartition('foo', 0) records = MemoryRecordsBuilder( magic=2, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records, io.BytesIO()) + batch = ProducerBatch(tp, records) assert batch.producer_id == -1 batch.records.set_producer_state(123, 456, 789) assert batch.producer_id == 123 @@ -27,7 +27,7 @@ def test_producer_batch_try_append(magic): tp = TopicPartition('foo', 0) records = MemoryRecordsBuilder( magic=magic, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records, io.BytesIO()) + batch = ProducerBatch(tp, records) assert batch.record_count == 0 future = batch.try_append(0, b'key', b'value', []) assert isinstance(future, FutureRecordMetadata) @@ -53,7 +53,7 @@ def test_producer_batch_retry(): tp = TopicPartition('foo', 0) records = MemoryRecordsBuilder( magic=2, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records, io.BytesIO()) + batch = ProducerBatch(tp, records) assert not batch.in_retry() batch.set_retry() assert batch.in_retry() @@ -62,7 +62,7 @@ def test_producer_batch_maybe_expire(): tp = TopicPartition('foo', 0) records = MemoryRecordsBuilder( magic=2, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records, io.BytesIO(), now=1) + batch = ProducerBatch(tp, records, now=1) future = batch.try_append(0, b'key', b'value', [], now=2) request_timeout_ms = 5000 retry_backoff_ms = 200 From 621d33f13e0666e1f5f6ddb95dafe9f74083aab5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Apr 2025 15:19:24 -0700 Subject: [PATCH 17/18] sender _fail_batch() --- kafka/producer/sender.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index b66c6fda0..24b84a9b1 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -271,6 +271,17 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for batch in batches: self._complete_batch(batch, None, -1) + def _fail_batch(batch, *args, **kwargs): + if self._transaction_state and self._transaction_state.producer_id_and_epoch.producer_id == batch.producer_id: + # Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees + # about the previously committed message. Note that this will discard the producer id and sequence + # numbers for all existing partitions. + self._transaction_state.reset_producer_id() + batch.done(*args, **kwargs) + self._accumulator.deallocate(batch) + if self._sensors: + self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None): """Complete or retry the given batch of records. @@ -303,12 +314,9 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star if self._sensors: self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: - self._transaction_state.reset_producer_id() log.warning("Attempted to retry sending a batch but the producer id changed from %s to %s. This batch will be dropped" % ( batch.producer_id, self._transaction_state.producer_id_and_epoch.producer_id)) - batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms, exception=error, log_start_offset=log_start_offset) - if self._sensors: - self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) + self._fail_batch(batch, base_offset=base_offset, timestamp_ms=timestamp_ms, exception=error, log_start_offset=log_start_offset) else: if error is Errors.OutOfOrderSequenceNumberError and batch.producer_id == self._transaction_state.producer_id_and_epoch.producer_id: log.error("The broker received an out of order sequence number error for produer_id %s, topic-partition %s" @@ -319,9 +327,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms, exception=error, log_start_offset=log_start_offset) - if self._sensors: - self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) + self._fail_batch(batch, base_offset=base_offset, timestamp_ms=timestamp_ms, exception=error, log_start_offset=log_start_offset) if error is Errors.UnknownTopicOrPartitionError: log.warning("Received unknown topic or partition error in produce request on partition %s." From 53c195945b432cd6cef92fe68518993699ceac6d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 6 Apr 2025 08:27:33 -0700 Subject: [PATCH 18/18] logging fixup --- kafka/producer/record_accumulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index f0248490d..60fa0a323 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -496,7 +496,7 @@ def drain(self, cluster, nodes, max_size, now=None): sequence_number = self._transaction_state.sequence_number(batch.topic_partition) log.debug("Dest: %s: %s producer_id=%s epoch=%s sequence=%s", node_id, batch.topic_partition, producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, - batch.topic_partition, sequence_number) + sequence_number) batch.records.set_producer_state(producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, sequence_number) batch.records.close() size += batch.records.size_in_bytes()