From 5d28a13c54174bf96bb826571553eaffac6fe6e0 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Wed, 29 Apr 2020 10:44:05 +0200 Subject: [PATCH 1/2] CLN remove some locks in ProcessPoolExecutor --- Lib/concurrent/futures/process.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index a76e2c9cf231ae..95d19873c7434b 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -158,10 +158,8 @@ def __init__(self, work_id, fn, args, kwargs): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, - thread_wakeup): + def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): self.pending_work_items = pending_work_items - self.shutdown_lock = shutdown_lock self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) @@ -170,8 +168,7 @@ def _on_queue_feeder_error(self, e, obj): tb = traceback.format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) - with self.shutdown_lock: - self.thread_wakeup.wakeup() + self.thread_wakeup.wakeup() # work_item can be None if another process terminated. In this # case, the executor_manager_thread fails all work_items # with BrokenProcessPool @@ -390,8 +387,7 @@ def wait_result_broken_or_wakeup(self): elif wakeup_reader in ready: is_broken = False - with self.shutdown_lock: - self.thread_wakeup.clear() + self.thread_wakeup.clear() return result_item, is_broken, cause @@ -643,7 +639,6 @@ def __init__(self, max_workers=None, mp_context=None, self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, pending_work_items=self._pending_work_items, - shutdown_lock=self._shutdown_lock, thread_wakeup=self._executor_manager_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed From 06c5cb4da818814fab8bd2bdeb4fd23ee82f5d1d Mon Sep 17 00:00:00 2001 From: tomMoral Date: Wed, 29 Apr 2020 11:17:46 +0200 Subject: [PATCH 2/2] ENH also protect thread_wakeup on shutdown --- Lib/concurrent/futures/process.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 95d19873c7434b..bc5c3eced06ef5 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -89,18 +89,25 @@ def _python_exit(): global _global_shutdown _global_shutdown = True items = list(_threads_wakeups.items()) - for _, thread_wakeup in items: - # call not protected by ProcessPoolExecutor._shutdown_lock - thread_wakeup.wakeup() + for _, (shutdown_lock, thread_wakeup) in items: + with shutdown_lock: + thread_wakeup.wakeup() for t, _ in items: t.join() + # Register for `_python_exit()` to be called just before joining all # non-daemon threads. This is used instead of `atexit.register()` for # compatibility with subinterpreters, which no longer support daemon threads. # See bpo-39812 for context. threading._register_atexit(_python_exit) + +# With the fork context, _thread_wakeups is propagated to children. +# Clear it after fork to avoid some situation that can cause some +# freeze when joining the workers. +mp.util.register_after_fork(_threads_wakeups, lambda obj: obj.clear()) + # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for # work while a larger number will make Future.cancel() succeed less frequently @@ -653,7 +660,8 @@ def _start_executor_manager_thread(self): self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ - self._executor_manager_thread_wakeup + (self._shutdown_lock, + self._executor_manager_thread_wakeup) def _adjust_process_count(self): # if there's an idle process, we don't need to spawn a new one.