From 126412278f0b2eda9dd255a7c2f6104fb732e30d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 28 Dec 2019 14:01:35 -0800 Subject: [PATCH 1/2] Do not block on sender thread join after timeout in producer.close() --- kafka/producer/kafka.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 67b9e1971..a63f6a716 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -483,14 +483,10 @@ def close(self, timeout=None): self._sender.join(timeout) if self._sender is not None and self._sender.is_alive(): - log.info("Proceeding to force close the producer since pending" " requests could not be completed within timeout %s.", timeout) self._sender.force_close() - # Only join the sender thread when not calling from callback. - if not invoked_from_callback: - self._sender.join() self._metrics.close() try: From 67bf2e6582fd9c5c7f62bf680f12a5574f78ad95 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 28 Dec 2019 15:23:21 -0800 Subject: [PATCH 2/2] Retain blocking on sender thread for del(producer) --- kafka/producer/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index a63f6a716..d3a531ecc 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -444,7 +444,7 @@ def _unregister_cleanup(self): self._cleanup = None def __del__(self): - self.close(timeout=0) + self.close() def close(self, timeout=None): """Close this producer.