diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index a76e2c9cf231ae..bc5c3eced06ef5 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -89,18 +89,25 @@ def _python_exit(): global _global_shutdown _global_shutdown = True items = list(_threads_wakeups.items()) - for _, thread_wakeup in items: - # call not protected by ProcessPoolExecutor._shutdown_lock - thread_wakeup.wakeup() + for _, (shutdown_lock, thread_wakeup) in items: + with shutdown_lock: + thread_wakeup.wakeup() for t, _ in items: t.join() + # Register for `_python_exit()` to be called just before joining all # non-daemon threads. This is used instead of `atexit.register()` for # compatibility with subinterpreters, which no longer support daemon threads. # See bpo-39812 for context. threading._register_atexit(_python_exit) + +# With the fork context, _thread_wakeups is propagated to children. +# Clear it after fork to avoid some situation that can cause some +# freeze when joining the workers. +mp.util.register_after_fork(_threads_wakeups, lambda obj: obj.clear()) + # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for # work while a larger number will make Future.cancel() succeed less frequently @@ -158,10 +165,8 @@ def __init__(self, work_id, fn, args, kwargs): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, - thread_wakeup): + def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): self.pending_work_items = pending_work_items - self.shutdown_lock = shutdown_lock self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) @@ -170,8 +175,7 @@ def _on_queue_feeder_error(self, e, obj): tb = traceback.format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) - with self.shutdown_lock: - self.thread_wakeup.wakeup() + self.thread_wakeup.wakeup() # work_item can be None if another process terminated. In this # case, the executor_manager_thread fails all work_items # with BrokenProcessPool @@ -390,8 +394,7 @@ def wait_result_broken_or_wakeup(self): elif wakeup_reader in ready: is_broken = False - with self.shutdown_lock: - self.thread_wakeup.clear() + self.thread_wakeup.clear() return result_item, is_broken, cause @@ -643,7 +646,6 @@ def __init__(self, max_workers=None, mp_context=None, self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, pending_work_items=self._pending_work_items, - shutdown_lock=self._shutdown_lock, thread_wakeup=self._executor_manager_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed @@ -658,7 +660,8 @@ def _start_executor_manager_thread(self): self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ - self._executor_manager_thread_wakeup + (self._shutdown_lock, + self._executor_manager_thread_wakeup) def _adjust_process_count(self): # if there's an idle process, we don't need to spawn a new one.