diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py deleted file mode 100644 index 100801700..000000000 --- a/kafka/producer/buffer.py +++ /dev/null @@ -1,115 +0,0 @@ -from __future__ import absolute_import, division - -import collections -import io -import threading -import time - -from kafka.metrics.stats import Rate - -import kafka.errors as Errors - - -class SimpleBufferPool(object): - """A simple pool of BytesIO objects with a weak memory ceiling.""" - def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'): - """Create a new buffer pool. - - Arguments: - memory (int): maximum memory that this buffer pool can allocate - poolable_size (int): memory size per buffer to cache in the free - list rather than deallocating - """ - self._poolable_size = poolable_size - self._lock = threading.RLock() - - buffers = int(memory / poolable_size) if poolable_size else 0 - self._free = collections.deque([io.BytesIO() for _ in range(buffers)]) - - self._waiters = collections.deque() - self.wait_time = None - if metrics: - self.wait_time = metrics.sensor('bufferpool-wait-time') - self.wait_time.add(metrics.metric_name( - 'bufferpool-wait-ratio', metric_group_prefix, - 'The fraction of time an appender waits for space allocation.'), - Rate()) - - def allocate(self, size, max_time_to_block_ms): - """ - Allocate a buffer of the given size. This method blocks if there is not - enough memory and the buffer pool is configured with blocking mode. - - Arguments: - size (int): The buffer size to allocate in bytes [ignored] - max_time_to_block_ms (int): The maximum time in milliseconds to - block for buffer memory to be available - - Returns: - io.BytesIO - """ - with self._lock: - # check if we have a free buffer of the right size pooled - if self._free: - return self._free.popleft() - - elif self._poolable_size == 0: - return io.BytesIO() - - else: - # we are out of buffers and will have to block - buf = None - more_memory = threading.Condition(self._lock) - self._waiters.append(more_memory) - # loop over and over until we have a buffer or have reserved - # enough memory to allocate one - while buf is None: - start_wait = time.time() - more_memory.wait(max_time_to_block_ms / 1000.0) - end_wait = time.time() - if self.wait_time: - self.wait_time.record(end_wait - start_wait) - - if self._free: - buf = self._free.popleft() - else: - self._waiters.remove(more_memory) - raise Errors.KafkaTimeoutError( - "Failed to allocate memory within the configured" - " max blocking time") - - # remove the condition for this thread to let the next thread - # in line start getting memory - removed = self._waiters.popleft() - assert removed is more_memory, 'Wrong condition' - - # signal any additional waiters if there is more memory left - # over for them - if self._free and self._waiters: - self._waiters[0].notify() - - # unlock and return the buffer - return buf - - def deallocate(self, buf): - """ - Return buffers to the pool. If they are of the poolable size add them - to the free list, otherwise just mark the memory as free. - - Arguments: - buffer_ (io.BytesIO): The buffer to return - """ - with self._lock: - # BytesIO.truncate here makes the pool somewhat pointless - # but we stick with the BufferPool API until migrating to - # bytesarray / memoryview. The buffer we return must not - # expose any prior data on read(). - buf.truncate(0) - self._free.append(buf) - if self._waiters: - self._waiters[0].notify() - - def queued(self): - """The number of threads blocked waiting on memory.""" - with self._lock: - return len(self._waiters) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 8da14af1c..df86e907e 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -6,6 +6,7 @@ import socket import threading import time +import warnings import weakref from kafka.vendor import six @@ -72,11 +73,6 @@ class KafkaProducer(object): can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency. - The buffer_memory controls the total amount of memory available to the - producer for buffering. If records are sent faster than they can be - transmitted to the server then this buffer space will be exhausted. When - the buffer space is exhausted additional send calls will block. - The key_serializer and value_serializer instruct how to turn the key and value objects the user provides into bytes. @@ -166,12 +162,6 @@ class KafkaProducer(object): messages with the same key are assigned to the same partition. When a key is None, the message is delivered to a random partition (filtered to partitions with available leaders only, if possible). - buffer_memory (int): The total bytes of memory the producer should use - to buffer records waiting to be sent to the server. If records are - sent faster than they can be delivered to the server the producer - will block up to max_block_ms, raising an exception on timeout. - In the current implementation, this setting is an approximation. - Default: 33554432 (32MB) connections_max_idle_ms: Close idle connections after the number of milliseconds specified by this config. The broker closes idle connections after connections.max.idle.ms, so this avoids hitting @@ -319,7 +309,6 @@ class KafkaProducer(object): 'batch_size': 16384, 'linger_ms': 0, 'partitioner': DefaultPartitioner(), - 'buffer_memory': 33554432, 'connections_max_idle_ms': 9 * 60 * 1000, 'max_block_ms': 60000, 'max_request_size': 1048576, @@ -361,6 +350,8 @@ class KafkaProducer(object): 'kafka_client': KafkaClient, } + DEPRECATED_CONFIGS = ('buffer_memory',) + _COMPRESSORS = { 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), @@ -376,6 +367,11 @@ def __init__(self, **configs): if key in configs: self.config[key] = configs.pop(key) + for key in self.DEPRECATED_CONFIGS: + if key in configs: + configs.pop(key) + warnings.warn('Deprecated Producer config: %s' % (key,), DeprecationWarning) + # Only check for extra config keys in top-level class assert not configs, 'Unrecognized configs: %s' % (configs,) @@ -640,9 +636,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest tp = TopicPartition(topic, partition) log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp) result = self._accumulator.append(tp, timestamp_ms, - key_bytes, value_bytes, headers, - self.config['max_block_ms'], - estimated_size=message_size) + key_bytes, value_bytes, headers) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: log.debug("Waking up the sender since %s is either full or" @@ -697,11 +691,6 @@ def _ensure_valid_record_size(self, size): "The message is %d bytes when serialized which is larger than" " the maximum request size you have configured with the" " max_request_size configuration" % (size,)) - if size > self.config['buffer_memory']: - raise Errors.MessageSizeTooLargeError( - "The message is %d bytes when serialized which is larger than" - " the total memory buffer you have configured with the" - " buffer_memory configuration." % (size,)) def _wait_on_metadata(self, topic, max_wait): """ diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 4f08b8c08..6e7fa60f7 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -7,7 +7,6 @@ import time import kafka.errors as Errors -from kafka.producer.buffer import SimpleBufferPool from kafka.producer.future import FutureRecordMetadata, FutureProduceResult from kafka.record.memory_records import MemoryRecordsBuilder from kafka.structs import TopicPartition @@ -36,7 +35,7 @@ def get(self): class ProducerBatch(object): - def __init__(self, tp, records, buffer): + def __init__(self, tp, records): self.max_record_size = 0 now = time.time() self.created = now @@ -48,7 +47,6 @@ def __init__(self, tp, records, buffer): self.topic_partition = tp self.produce_future = FutureProduceResult(tp) self._retry = False - self._buffer = buffer # We only save it, we don't write to it @property def record_count(self): @@ -123,9 +121,6 @@ def in_retry(self): def set_retry(self): self._retry = True - def buffer(self): - return self._buffer - def __str__(self): return 'ProducerBatch(topic_partition=%s, record_count=%d)' % ( self.topic_partition, self.records.next_offset()) @@ -145,12 +140,6 @@ class RecordAccumulator(object): A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). Default: 16384 - buffer_memory (int): The total bytes of memory the producer should use - to buffer records waiting to be sent to the server. If records are - sent faster than they can be delivered to the server the producer - will block up to max_block_ms, raising an exception on timeout. - In the current implementation, this setting is an approximation. - Default: 33554432 (32MB) compression_attrs (int): The compression type for all data generated by the producer. Valid values are gzip(1), snappy(2), lz4(3), or none(0). @@ -168,7 +157,6 @@ class RecordAccumulator(object): all retries in a short period of time. Default: 100 """ DEFAULT_CONFIG = { - 'buffer_memory': 33554432, 'batch_size': 16384, 'compression_attrs': 0, 'linger_ms': 0, @@ -189,18 +177,13 @@ def __init__(self, **configs): self._appends_in_progress = AtomicInteger() self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch] self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries - self._free = SimpleBufferPool(self.config['buffer_memory'], - self.config['batch_size'], - metrics=self.config['metrics'], - metric_group_prefix=self.config['metric_group_prefix']) self._incomplete = IncompleteProducerBatches() # The following variables should only be accessed by the sender thread, # so we don't need to protect them w/ locking. self.muted = set() self._drain_index = 0 - def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms, - estimated_size=0): + def append(self, tp, timestamp_ms, key, value, headers): """Add a record to the accumulator, return the append result. The append result will contain the future metadata, and flag for @@ -213,8 +196,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms, key (bytes): The key for the record value (bytes): The value for the record headers (List[Tuple[str, bytes]]): The header fields for the record - max_time_to_block_ms (int): The maximum time in milliseconds to - block for buffer memory to be available Returns: tuple: (future, batch_is_full, new_batch_created) @@ -240,9 +221,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms, batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False - size = max(self.config['batch_size'], estimated_size) - log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace - buf = self._free.allocate(size, max_time_to_block_ms) with self._tp_locks[tp]: # Need to check if producer is closed again after grabbing the # dequeue lock. @@ -254,7 +232,6 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms, if future is not None: # Somebody else found us a batch, return the one we # waited for! Hopefully this doesn't happen often... - self._free.deallocate(buf) batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False @@ -264,7 +241,7 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms, self.config['batch_size'] ) - batch = ProducerBatch(tp, records, buf) + batch = ProducerBatch(tp, records) future = batch.try_append(timestamp_ms, key, value, headers) if not future: raise Exception() @@ -384,7 +361,6 @@ def ready(self, cluster): unknown_leaders_exist = False now = time.time() - exhausted = bool(self._free.queued() > 0) # several threads are accessing self._batches -- to simplify # concurrent access, we iterate over a snapshot of partitions # and lock each partition separately as needed @@ -414,7 +390,7 @@ def ready(self, cluster): full = bool(len(dq) > 1 or batch.records.is_full()) expired = bool(waited_time >= time_to_wait) - sendable = (full or expired or exhausted or self._closed or + sendable = (full or expired or self._closed or self._flush_in_progress()) if sendable and not backing_off: @@ -506,7 +482,6 @@ def drain(self, cluster, nodes, max_size): def deallocate(self, batch): """Deallocate the record batch.""" self._incomplete.remove(batch) - self._free.deallocate(batch.buffer()) def _flush_in_progress(self): """Are there any threads currently waiting on a flush?""" diff --git a/test/test_producer.py b/test/test_producer.py index 3d1de06d3..069362f26 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,22 +7,9 @@ import pytest from kafka import KafkaConsumer, KafkaProducer, TopicPartition -from kafka.producer.buffer import SimpleBufferPool from test.testutil import env_kafka_version, random_string, maybe_skip_unsupported_compression -def test_buffer_pool(): - pool = SimpleBufferPool(1000, 1000) - - buf1 = pool.allocate(1000, 1000) - message = ''.join(map(str, range(100))) - buf1.write(message.encode('utf-8')) - pool.deallocate(buf1) - - buf2 = pool.allocate(1000, 1000) - assert buf2.read() == b'' - - @contextmanager def producer_factory(**kwargs): producer = KafkaProducer(**kwargs) diff --git a/test/test_sender.py b/test/test_sender.py index 3da1a9f42..1656bbfe9 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -42,10 +42,9 @@ def sender(client, accumulator, metrics, mocker): def test_produce_request(sender, mocker, api_version, produce_version): sender._client._api_versions = BROKER_API_VERSIONS[api_version] tp = TopicPartition('foo', 0) - buffer = io.BytesIO() records = MemoryRecordsBuilder( magic=1, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records, buffer) + batch = ProducerBatch(tp, records) records.close() produce_request = sender._produce_request(0, 0, 0, [batch]) assert isinstance(produce_request, ProduceRequest[produce_version])