Description
I tried upgrading one of our projects to 1.4.5, hoping to use the fix for consuming compacted topics in #1701. However, it constantly results in KafkaTimeoutErrors almost immediately.
This only seems to happen when consuming (a compacted section, e.g. from the start of) a compacted topic. Consuming from the non-compacted end of a compacted topic seems to be fine. Similarly, consuming from (the start or end of) a non-compacted topic seems to be fine, though at a much slower speed - dropping from ~6k/s on 1.4.4 to ~2k/s on 1.4.5 in one test.
It seems to be inconsistent about exactly when it breaks - sometimes it doesn't produce anything before failing, sometimes it produces a small amount then fails, sometimes it fails while trying to commit the first time, sometimes it commits the first time and then fails - etc.
I tried upgrading to the current master, hoping the extra lock that was added since 1.4.5 would fix the issue, but it doesn't seem to have made any difference.
I also tried setting the api_version
to 1.1.0 manually, since it was incorrectly auto-detecting 1.0.0, but that also hasn't helped.
The code I'm running is a little more complicated than a standard consumer/producer, but worked fine on 1.4.4.
The consumer uses a consumer group, but has auto-commit disabled. The producer uses snappy
compression, and acks='all'
. Messages are consumed via the KafkaConsumer
iterator interface. Consumed messages are produced with KafkaProducer.send()
, without waiting on the futures - they're kept in a buffer. When the buffer is full, the thread will wait on one or more of the produce futures to clear space in the buffer. Periodically, the thread will call KafkaConsumer.commit(offsets=x)
, where x
is the appropriate offsets based on the produces that have been confirmed to be completed (their futures cleared from the buffer).
Here's the logs from an example, where it produced a small amount, but failed before committing. While it produced some messages, based on the logs it didn't complete flushing any produce futures from the buffer, nor did it commit any offsets.
[2019-03-22 03:46:46,861] kafka.consumer.subscription_state.INFO MainThread Updating subscribed topics to: ['topic.0']
[2019-03-22 03:46:46,875] kafka.conn.INFO kafka-python-producer-1-network-thread <BrokerConnection node_id=bootstrap host=0.0.0.0:9092 <connecting> [IPv4 ('0.0.0.0', 9092)]>: connecting to 0.0.0.0:9092 [('0.0.0.0', 9092) IPv4]
[2019-03-22 03:46:46,876] kafka.conn.INFO MainThread <BrokerConnection node_id=bootstrap host=0.0.0.0:9092 <connecting> [IPv4 ('0.0.0.0', 9092)]>: connecting to 0.0.0.0:9092 [('0.0.0.0', 9092) IPv4]
[2019-03-22 03:46:46,876] kafka.conn.INFO kafka-python-producer-1-network-thread <BrokerConnection node_id=bootstrap host=0.0.0.0:9092 <connecting> [IPv4 ('0.0.0.0', 9092)]>: Connection complete.
[2019-03-22 03:46:46,977] kafka.conn.INFO MainThread <BrokerConnection node_id=bootstrap host=0.0.0.0:9092 <connecting> [IPv4 ('0.0.0.0', 9092)]>: Connection complete.
[2019-03-22 03:46:47,079] kafka.cluster.INFO MainThread Group coordinator for group-0 is BrokerMetadata(nodeId=0, host='kafka-0', port=9092, rack=None)
[2019-03-22 03:46:47,079] kafka.coordinator.INFO MainThread Discovered coordinator 0 for group group-0
[2019-03-22 03:46:47,079] kafka.coordinator.INFO MainThread Starting new heartbeat thread
[2019-03-22 03:46:47,080] kafka.coordinator.consumer.INFO MainThread Revoking previously assigned partitions set() for group group-0
[2019-03-22 03:46:47,080] root.INFO MainThread Partitions revoked - committing outstanding offsets
[2019-03-22 03:46:47,081] kafka.conn.INFO MainThread <BrokerConnection node_id=0 host=kafka-0:9092 <connecting> [IPv4 ('0.0.0.0', 9092)]>: connecting to kafka-0:9092 [('0.0.0.0', 9092) IPv4]
[2019-03-22 03:46:47,182] kafka.conn.INFO MainThread <BrokerConnection node_id=0 host=kafka-0:9092 <connecting> [IPv4 ('0.0.0.0', 9092)]>: Connection complete.
[2019-03-22 03:46:47,182] kafka.conn.INFO MainThread <BrokerConnection node_id=bootstrap host=0.0.0.0:9092 <connected> [IPv4 ('0.0.0.0', 9092)]>: Closing connection.
[2019-03-22 03:46:47,284] kafka.coordinator.INFO MainThread (Re-)joining group group-0
[2019-03-22 03:46:47,287] kafka.coordinator.INFO MainThread Elected group leader -- performing partition assignments using range
[2019-03-22 03:46:47,292] kafka.coordinator.INFO MainThread Successfully joined group group-0 with generation 58
[2019-03-22 03:46:47,293] kafka.consumer.subscription_state.INFO MainThread Updated partition assignment: [TopicPartition(topic='topic.0', partition=0), TopicPartition(topic='topic.0', partition=1), TopicPartition(topic='topic.0', partition=2), TopicPartition(topic='topic.0', partition=3), TopicPartition(topic='topic.0', partition=4), TopicPartition(topic='topic.0', partition=5), TopicPartition(topic='topic.0', partition=6), TopicPartition(topic='topic.0', partition=7), TopicPartition(topic='topic.0', partition=8), TopicPartition(topic='topic.0', partition=9), TopicPartition(topic='topic.0', partition=10), TopicPartition(topic='topic.0', partition=11)]
[2019-03-22 03:46:47,293] kafka.coordinator.consumer.INFO MainThread Setting newly assigned partitions {TopicPartition(topic='topic.0', partition=0), TopicPartition(topic='topic.0', partition=2), TopicPartition(topic='topic.0', partition=9), TopicPartition(topic='topic.0', partition=11), TopicPartition(topic='topic.0', partition=5), TopicPartition(topic='topic.0', partition=7), TopicPartition(topic='topic.0', partition=1), TopicPartition(topic='topic.0', partition=8), TopicPartition(topic='topic.0', partition=3), TopicPartition(topic='topic.0', partition=10), TopicPartition(topic='topic.0', partition=4), TopicPartition(topic='topic.0', partition=6)} for group group-0
[2019-03-22 03:46:47,293] root.INFO MainThread Partitions assigned - resetting outstanding offsets
[2019-03-22 03:46:47,293] root.INFO MainThread Resetting offset manager
[2019-03-22 03:46:47,364] kafka.conn.INFO MainThread <BrokerConnection node_id=1 host=kafka-1:9092 <connecting> [IPv4 ('0.0.0.1', 9092)]>: connecting to kafka-1:9092 [('0.0.0.1', 9092) IPv4]
[2019-03-22 03:46:47,391] kafka.conn.INFO MainThread <BrokerConnection node_id=2 host=kafka-2:9092 <connecting> [IPv4 ('0.0.0.2', 9092)]>: connecting to kafka-2:9092 [('0.0.0.2', 9092) IPv4]
[2019-03-22 03:46:47,412] kafka.conn.INFO MainThread <BrokerConnection node_id=3 host=kafka-3:9092 <connecting> [IPv4 ('0.0.0.3', 9092)]>: connecting to kafka-3:9092 [('0.0.0.3', 9092) IPv4]
[2019-03-22 03:46:47,543] kafka.conn.INFO MainThread <BrokerConnection node_id=4 host=kafka-4:9092 <connecting> [IPv4 ('0.0.0.4', 9092)]>: connecting to kafka-4:9092 [('0.0.0.4', 9092) IPv4]
[2019-03-22 03:46:47,567] kafka.conn.INFO MainThread <BrokerConnection node_id=5 host=kafka-5:9092 <connecting> [IPv4 ('0.0.0.5', 9092)]>: connecting to kafka-5:9092 [('0.0.0.5', 9092) IPv4]
[2019-03-22 03:46:47,568] kafka.conn.INFO group-0-heartbeat <BrokerConnection node_id=1 host=kafka-1:9092 <connecting> [IPv4 ('0.0.0.1', 9092)]>: Connection complete.
[2019-03-22 03:46:47,568] kafka.conn.INFO group-0-heartbeat <BrokerConnection node_id=2 host=kafka-2:9092 <connecting> [IPv4 ('0.0.0.2', 9092)]>: Connection complete.
[2019-03-22 03:46:47,568] kafka.conn.INFO group-0-heartbeat <BrokerConnection node_id=3 host=kafka-3:9092 <connecting> [IPv4 ('0.0.0.3', 9092)]>: Connection complete.
[2019-03-22 03:46:47,568] kafka.conn.INFO group-0-heartbeat <BrokerConnection node_id=4 host=kafka-4:9092 <connecting> [IPv4 ('0.0.0.4', 9092)]>: Connection complete.
[2019-03-22 03:46:47,568] kafka.conn.INFO group-0-heartbeat <BrokerConnection node_id=5 host=kafka-5:9092 <connecting> [IPv4 ('0.0.0.5', 9092)]>: Connection complete.
[2019-03-22 03:46:48,608] kafka.conn.INFO kafka-python-producer-1-network-thread <BrokerConnection node_id=3 host=kafka-3:9092 <connecting> [IPv4 ('0.0.0.3', 9092)]>: connecting to kafka-3:9092 [('0.0.0.3', 9092) IPv4]
[2019-03-22 03:46:48,609] kafka.conn.INFO kafka-python-producer-1-network-thread <BrokerConnection node_id=3 host=kafka-3:9092 <connecting> [IPv4 ('0.0.0.3', 9092)]>: Connection complete.
[2019-03-22 03:46:48,610] kafka.conn.INFO kafka-python-producer-1-network-thread <BrokerConnection node_id=bootstrap host=0.0.0.0:9092 <connected> [IPv4 ('0.0.0.0', 9092)]>: Closing connection.
[2019-03-22 03:46:48,695] kafka.conn.INFO kafka-python-producer-1-network-thread <BrokerConnection node_id=1 host=kafka-1:9092 <connecting> [IPv4 ('0.0.0.1', 9092)]>: connecting to kafka-1:9092 [('0.0.0.1', 9092) IPv4]
[2019-03-22 03:46:48,702] kafka.conn.INFO kafka-python-producer-1-network-thread <BrokerConnection node_id=1 host=kafka-1:9092 <connecting> [IPv4 ('0.0.0.1', 9092)]>: Connection complete.
[2019-03-22 03:47:50,042] kafka.client.WARNING MainThread Timeout to send to wakeup socket!
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 885, in wakeup
self._wake_w.sendall(b'x')
socket.timeout: timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/project", line 11, in <module>
load_entry_point('project', 'console_scripts', 'project')()
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1137, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/usr/src/app/project/__init__.py", line 961, in pipe
fut = producer.send(**args)
File "/usr/local/lib/python3.6/site-packages/kafka/producer/kafka.py", line 587, in send
self._sender.wakeup()
File "/usr/local/lib/python3.6/site-packages/kafka/producer/sender.py", line 315, in wakeup
self._client.wakeup()
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 888, in wakeup
raise Errors.KafkaTimeoutError()
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
[2019-03-22 03:47:50,046] kafka.producer.kafka.INFO Dummy-3 Closing the Kafka producer with 0 secs timeout.
[2019-03-22 03:47:50,046] kafka.producer.kafka.INFO Dummy-3 Proceeding to force close the producer since pending requests could not be completed within timeout 0.
[2019-03-22 03:48:50,099] kafka.client.WARNING kafka-python-producer-1-network-thread Timeout to send to wakeup socket!
[2019-03-22 03:48:50,099] kafka.producer.sender.ERROR kafka-python-producer-1-network-thread Uncaught error in kafka producer I/O thread
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 885, in wakeup
self._wake_w.sendall(b'x')
socket.timeout: timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kafka/producer/sender.py", line 60, in run
self.run_once()
File "/usr/local/lib/python3.6/site-packages/kafka/producer/sender.py", line 147, in run_once
(self._client.send(node_id, request)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 531, in send
self.wakeup()
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 888, in wakeup
raise Errors.KafkaTimeoutError()
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
[2019-03-22 03:48:50,100] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Produced messages to topic-partition TopicPartition(topic='topic.1', partition=19) with base offset None and error IllegalStateError: Producer is closed forcefully..
[2019-03-22 03:48:50,102] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Produced messages to topic-partition TopicPartition(topic='topic.1', partition=19) with base offset None and error IllegalStateError: Producer is closed forcefully..
[2019-03-22 03:48:50,104] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Produced messages to topic-partition TopicPartition(topic='topic.1', partition=19) with base offset None and error IllegalStateError: Producer is closed forcefully..
[2019-03-22 03:48:50,105] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Produced messages to topic-partition TopicPartition(topic='topic.1', partition=19) with base offset None and error IllegalStateError: Producer is closed forcefully..
[2019-03-22 03:48:50,105] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Produced messages to topic-partition TopicPartition(topic='topic.1', partition=7) with base offset None and error IllegalStateError: Producer is closed forcefully..
[2019-03-22 03:48:50,107] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Produced messages to topic-partition TopicPartition(topic='topic.1', partition=7) with base offset None and error IllegalStateError: Producer is closed forcefully..
[2019-03-22 03:48:50,109] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Produced messages to topic-partition TopicPartition(topic='topic.1', partition=7) with base offset None and error IllegalStateError: Producer is closed forcefully..
[2019-03-22 03:48:50,109] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Produced messages to topic-partition TopicPartition(topic='topic.1', partition=7) with base offset None and error IllegalStateError: Producer is closed forcefully..
[2019-03-22 03:48:50,109] kafka.client.WARNING Dummy-3 Unable to send to wakeup socket!
[2019-03-22 03:48:50,110] kafka.conn.INFO kafka-python-producer-1-network-thread <BrokerConnection node_id=3 host=kafka-3:9092 <connected> [IPv4 ('0.0.0.3', 9092)]>: Closing connection.
[2019-03-22 03:48:50,110] kafka.conn.INFO kafka-python-producer-1-network-thread <BrokerConnection node_id=1 host=kafka-1:9092 <connected> [IPv4 ('0.0.0.1', 9092)]>: Closing connection.
[2019-03-22 03:48:50,110] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Produced messages to topic-partition TopicPartition(topic='topic.1', partition=19) with base offset -1 and error Cancelled: <BrokerConnection node_id=1 host=kafka-1:9092 <disconnected> [IPv4 ('0.0.0.1', 9092)]>.
[2019-03-22 03:48:50,111] kafka.producer.record_accumulator.WARNING kafka-python-producer-1-network-thread Batch is already closed -- ignoring batch.done()
[2019-03-22 03:48:50,111] kafka.future.ERROR kafka-python-producer-1-network-thread Error processing errback
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kafka/future.py", line 79, in _call_backs
f(value)
File "/usr/local/lib/python3.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce
self._complete_batch(batch, error, -1, None)
File "/usr/local/lib/python3.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch
self._accumulator.deallocate(batch)
File "/usr/local/lib/python3.6/site-packages/kafka/producer/record_accumulator.py", line 506, in deallocate
self._incomplete.remove(batch)
File "/usr/local/lib/python3.6/site-packages/kafka/producer/record_accumulator.py", line 586, in remove
return self._incomplete.remove(batch)
KeyError: <kafka.producer.record_accumulator.ProducerBatch object at 0x7f1a616eae10>
[2019-03-22 03:48:50,122] kafka.producer.kafka.INFO Dummy-3 Kafka producer closed
[2019-03-22 03:48:50,128] kafka.coordinator.INFO Dummy-3 Stopping heartbeat thread