Skip to content

Drop unused kafka.producer.buffer / SimpleBufferPool #2580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 0 additions & 115 deletions kafka/producer/buffer.py

This file was deleted.

29 changes: 9 additions & 20 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import socket
import threading
import time
import warnings
import weakref

from kafka.vendor import six
Expand Down Expand Up @@ -72,11 +73,6 @@ class KafkaProducer(object):
can lead to fewer, more efficient requests when not under maximal load at
the cost of a small amount of latency.

The buffer_memory controls the total amount of memory available to the
producer for buffering. If records are sent faster than they can be
transmitted to the server then this buffer space will be exhausted. When
the buffer space is exhausted additional send calls will block.

The key_serializer and value_serializer instruct how to turn the key and
value objects the user provides into bytes.

Expand Down Expand Up @@ -166,12 +162,6 @@ class KafkaProducer(object):
messages with the same key are assigned to the same partition.
When a key is None, the message is delivered to a random partition
(filtered to partitions with available leaders only, if possible).
buffer_memory (int): The total bytes of memory the producer should use
to buffer records waiting to be sent to the server. If records are
sent faster than they can be delivered to the server the producer
will block up to max_block_ms, raising an exception on timeout.
In the current implementation, this setting is an approximation.
Default: 33554432 (32MB)
connections_max_idle_ms: Close idle connections after the number of
milliseconds specified by this config. The broker closes idle
connections after connections.max.idle.ms, so this avoids hitting
Expand Down Expand Up @@ -319,7 +309,6 @@ class KafkaProducer(object):
'batch_size': 16384,
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
'buffer_memory': 33554432,
'connections_max_idle_ms': 9 * 60 * 1000,
'max_block_ms': 60000,
'max_request_size': 1048576,
Expand Down Expand Up @@ -361,6 +350,8 @@ class KafkaProducer(object):
'kafka_client': KafkaClient,
}

DEPRECATED_CONFIGS = ('buffer_memory',)

_COMPRESSORS = {
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
Expand All @@ -376,6 +367,11 @@ def __init__(self, **configs):
if key in configs:
self.config[key] = configs.pop(key)

for key in self.DEPRECATED_CONFIGS:
if key in configs:
configs.pop(key)
warnings.warn('Deprecated Producer config: %s' % (key,), DeprecationWarning)

# Only check for extra config keys in top-level class
assert not configs, 'Unrecognized configs: %s' % (configs,)

Expand Down Expand Up @@ -640,9 +636,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
tp = TopicPartition(topic, partition)
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes, headers,
self.config['max_block_ms'],
estimated_size=message_size)
key_bytes, value_bytes, headers)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("Waking up the sender since %s is either full or"
Expand Down Expand Up @@ -697,11 +691,6 @@ def _ensure_valid_record_size(self, size):
"The message is %d bytes when serialized which is larger than"
" the maximum request size you have configured with the"
" max_request_size configuration" % (size,))
if size > self.config['buffer_memory']:
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the total memory buffer you have configured with the"
" buffer_memory configuration." % (size,))

def _wait_on_metadata(self, topic, max_wait):
"""
Expand Down
33 changes: 4 additions & 29 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import time

import kafka.errors as Errors
from kafka.producer.buffer import SimpleBufferPool
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.record.memory_records import MemoryRecordsBuilder
from kafka.structs import TopicPartition
Expand Down Expand Up @@ -36,7 +35,7 @@ def get(self):


class ProducerBatch(object):
def __init__(self, tp, records, buffer):
def __init__(self, tp, records):
self.max_record_size = 0
now = time.time()
self.created = now
Expand All @@ -48,7 +47,6 @@ def __init__(self, tp, records, buffer):
self.topic_partition = tp
self.produce_future = FutureProduceResult(tp)
self._retry = False
self._buffer = buffer # We only save it, we don't write to it

@property
def record_count(self):
Expand Down Expand Up @@ -123,9 +121,6 @@ def in_retry(self):
def set_retry(self):
self._retry = True

def buffer(self):
return self._buffer

def __str__(self):
return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
self.topic_partition, self.records.next_offset())
Expand All @@ -145,12 +140,6 @@ class RecordAccumulator(object):
A small batch size will make batching less common and may reduce
throughput (a batch size of zero will disable batching entirely).
Default: 16384
buffer_memory (int): The total bytes of memory the producer should use
to buffer records waiting to be sent to the server. If records are
sent faster than they can be delivered to the server the producer
will block up to max_block_ms, raising an exception on timeout.
In the current implementation, this setting is an approximation.
Default: 33554432 (32MB)
compression_attrs (int): The compression type for all data generated by
the producer. Valid values are gzip(1), snappy(2), lz4(3), or
none(0).
Expand All @@ -168,7 +157,6 @@ class RecordAccumulator(object):
all retries in a short period of time. Default: 100
"""
DEFAULT_CONFIG = {
'buffer_memory': 33554432,
'batch_size': 16384,
'compression_attrs': 0,
'linger_ms': 0,
Expand All @@ -189,18 +177,13 @@ def __init__(self, **configs):
self._appends_in_progress = AtomicInteger()
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries
self._free = SimpleBufferPool(self.config['buffer_memory'],
self.config['batch_size'],
metrics=self.config['metrics'],
metric_group_prefix=self.config['metric_group_prefix'])
self._incomplete = IncompleteProducerBatches()
# The following variables should only be accessed by the sender thread,
# so we don't need to protect them w/ locking.
self.muted = set()
self._drain_index = 0

def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
estimated_size=0):
def append(self, tp, timestamp_ms, key, value, headers):
"""Add a record to the accumulator, return the append result.

The append result will contain the future metadata, and flag for
Expand All @@ -213,8 +196,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
key (bytes): The key for the record
value (bytes): The value for the record
headers (List[Tuple[str, bytes]]): The header fields for the record
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available

Returns:
tuple: (future, batch_is_full, new_batch_created)
Expand All @@ -240,9 +221,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False

size = max(self.config['batch_size'], estimated_size)
log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace
buf = self._free.allocate(size, max_time_to_block_ms)
with self._tp_locks[tp]:
# Need to check if producer is closed again after grabbing the
# dequeue lock.
Expand All @@ -254,7 +232,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
if future is not None:
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
self._free.deallocate(buf)
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False

Expand All @@ -264,7 +241,7 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
self.config['batch_size']
)

batch = ProducerBatch(tp, records, buf)
batch = ProducerBatch(tp, records)
future = batch.try_append(timestamp_ms, key, value, headers)
if not future:
raise Exception()
Expand Down Expand Up @@ -384,7 +361,6 @@ def ready(self, cluster):
unknown_leaders_exist = False
now = time.time()

exhausted = bool(self._free.queued() > 0)
# several threads are accessing self._batches -- to simplify
# concurrent access, we iterate over a snapshot of partitions
# and lock each partition separately as needed
Expand Down Expand Up @@ -414,7 +390,7 @@ def ready(self, cluster):
full = bool(len(dq) > 1 or batch.records.is_full())
expired = bool(waited_time >= time_to_wait)

sendable = (full or expired or exhausted or self._closed or
sendable = (full or expired or self._closed or
self._flush_in_progress())

if sendable and not backing_off:
Expand Down Expand Up @@ -506,7 +482,6 @@ def drain(self, cluster, nodes, max_size):
def deallocate(self, batch):
"""Deallocate the record batch."""
self._incomplete.remove(batch)
self._free.deallocate(batch.buffer())

def _flush_in_progress(self):
"""Are there any threads currently waiting on a flush?"""
Expand Down
13 changes: 0 additions & 13 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,9 @@
import pytest

from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from kafka.producer.buffer import SimpleBufferPool
from test.testutil import env_kafka_version, random_string, maybe_skip_unsupported_compression


def test_buffer_pool():
pool = SimpleBufferPool(1000, 1000)

buf1 = pool.allocate(1000, 1000)
message = ''.join(map(str, range(100)))
buf1.write(message.encode('utf-8'))
pool.deallocate(buf1)

buf2 = pool.allocate(1000, 1000)
assert buf2.read() == b''


@contextmanager
def producer_factory(**kwargs):
producer = KafkaProducer(**kwargs)
Expand Down
3 changes: 1 addition & 2 deletions test/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ def sender(client, accumulator, metrics, mocker):
def test_produce_request(sender, mocker, api_version, produce_version):
sender._client._api_versions = BROKER_API_VERSIONS[api_version]
tp = TopicPartition('foo', 0)
buffer = io.BytesIO()
records = MemoryRecordsBuilder(
magic=1, compression_type=0, batch_size=100000)
batch = ProducerBatch(tp, records, buffer)
batch = ProducerBatch(tp, records)
records.close()
produce_request = sender._produce_request(0, 0, 0, [batch])
assert isinstance(produce_request, ProduceRequest[produce_version])
Loading