diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8e9b69a8f08b42..7d62019996ac83 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -46,6 +46,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import atexit +import contextlib import os from concurrent.futures import _base import queue @@ -68,19 +69,41 @@ class _ThreadWakeup: def __init__(self): self._closed = False self._reader, self._writer = mp.Pipe(duplex=False) + # bpo-39995: Event used to ensure that the pipe is not closed while + # sending or receiving bytes. Initialize event as True. + self._close_event = threading.Event() + self._close_event.set() def close(self): - if not self._closed: - self._closed = True - self._writer.close() - self._reader.close() + if self._closed: + return + + self._close_event.wait() + + self._closed = True + self._writer.close() + self._reader.close() + + @contextlib.contextmanager + def _block_close(self): + self._close_event.clear() + # If close() is called from a different thread, + # it blocks until the close event is set again. + try: + yield + finally: + self._close_event.set() def wakeup(self): - if not self._closed: + with self._block_close(): + if self._closed: + return self._writer.send_bytes(b"") def clear(self): - if not self._closed: + with self._block_close(): + if self._closed: + return while self._reader.poll(): self._reader.recv_bytes() diff --git a/Misc/NEWS.d/next/Library/2020-04-28-17-57-47.bpo-39995.4cGpi8.rst b/Misc/NEWS.d/next/Library/2020-04-28-17-57-47.bpo-39995.4cGpi8.rst new file mode 100644 index 00000000000000..4e4a87939e2edc --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-04-28-17-57-47.bpo-39995.4cGpi8.rst @@ -0,0 +1,3 @@ +Fix a race condition in concurrent.futures._ThreadWakeup: it now uses an +internal threading event to ensure that the pipe is not closed while sending or +receiving bytes. Patch by Kyle Stanley and Victor Stinner.