From 301d21836cab4e98635b999f9e86c930844442c1 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 5 Mar 2024 17:05:09 -0700 Subject: [PATCH 01/11] Expand options for when an interpreter is destroyed. --- Lib/test/support/interpreters/queues.py | 125 ++++++++++-- Lib/test/test_interpreters/test_queues.py | 216 ++++++++++++++++++-- Modules/_xxinterpqueuesmodule.c | 227 +++++++++++++++------- 3 files changed, 473 insertions(+), 95 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 5849a1cc15e447..706dea7be33af5 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -12,12 +12,17 @@ ) __all__ = [ + 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', 'create', 'list_all', 'Queue', 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', + 'ItemInterpreterDestroyed', ] +_NOT_SET = object() + + class QueueEmpty(QueueError, queue.Empty): """Raised from get_nowait() when the queue is empty. @@ -32,26 +37,80 @@ class QueueFull(QueueError, queue.Full): """ +class ItemInterpreterDestroyed(QueueError): + """Raised from get() and get_nowait().""" + + _SHARED_ONLY = 0 _PICKLED = 1 -def create(maxsize=0, *, syncobj=False): + +class UnboundItem: + """Represents a Queue item no longer bound to an interpreter. + + An item is unbound when the interpreter that added it to the queue + is destroyed. + """ + + __slots__ = () + + def __new__(cls): + return UNBOUND + + def __repr__(self): + return f'interpreters.queues.UNBOUND' + + +UNBOUND = object.__new__(UnboundItem) +UNBOUND_ERROR = object() +UNBOUND_REMOVE = object() + + +def _serialize_unbound(unbound): + if unbound is UNBOUND_REMOVE: + unbound = 'remove' + elif unbound is UNBOUND_ERROR: + unbound = 'error' + elif unbound is UNBOUND: + unbound = 'replace' + else: + raise NotImplementedError(f'unsupported unbound replacement {unbound !r}') + return unbound + + +def _resolve_unbound(unbound): + if unbound == 'remove': + raise RuntimeError('"remove" not possible here') + elif unbound == 'error': + raise ItemInterpreterDestroyed("item's original interpreter destroyed") + elif unbound == 'replace': + return UNBOUND + else: + raise NotImplementedError(f'{unbound!r} unsupported') + + +def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. "syncobj" sets the default for Queue.put() and Queue.put_nowait(). + + "unbounditems" likewise sets the default. See Queue.pop() for + supported values. The default value is UNBOUND, which replaces + the unbound item. """ fmt = _SHARED_ONLY if syncobj else _PICKLED - qid = _queues.create(maxsize, fmt) - return Queue(qid, _fmt=fmt) + unbound = _serialize_unbound(unbounditems) + qid = _queues.create(maxsize, fmt, unbound) + return Queue(qid, _fmt=fmt, _unbound=unbound) def list_all(): """Return a list of all open queues.""" - return [Queue(qid, _fmt=fmt) - for qid, fmt in _queues.list_all()] + return [Queue(qid, _fmt=fmt, _unbound=unbound) + for qid, fmt, unbound in _queues.list_all()] _known_queues = weakref.WeakValueDictionary() @@ -59,20 +118,26 @@ def list_all(): class Queue: """A cross-interpreter queue.""" - def __new__(cls, id, /, *, _fmt=None): + def __new__(cls, id, /, *, _fmt=None, _unbound=None): # There is only one instance for any given ID. if isinstance(id, int): id = int(id) else: raise TypeError(f'id must be an int, got {id!r}') if _fmt is None: - _fmt, = _queues.get_queue_defaults(id) + if _unbound is None: + _fmt, _unbound = _queues.get_queue_defaults(id) + else: + _fmt, _ = _queues.get_queue_defaults(id) + elif _unbound is None: + _, _unbound = _queues.get_queue_defaults(id) try: self = _known_queues[id] except KeyError: self = super().__new__(cls) self._id = id self._fmt = _fmt + self._unbound = _unbound _known_queues[id] = self _queues.bind(id) return self @@ -124,6 +189,7 @@ def qsize(self): def put(self, obj, timeout=None, *, syncobj=None, + unbound=None, _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. @@ -152,11 +218,32 @@ def put(self, obj, timeout=None, *, actually is. That's a slightly different and stronger promise than just (initial) equality, which is all "syncobj=False" can promise. + + "unbound" controls the behavior of Queue.get() for the given + object if the current interpreter (calling put()) is later + destroyed. + + If "unbound" is None (the default) then it uses the + queue's default, set with create_queue(). + + If "unbound" is UNBOUND_ERROR then get() will raise an + ItemInterpreterDestroyed exception if the original interpreter + has been destroyed. + + If "unbound" is UNBOUND_REMOVE then the item will be removed + from the queue as soon as the original interpreter is destroyed. + + If "unbound" is UNBOUND then it is returned by get() in place + of the unbound item. """ if syncobj is None: fmt = self._fmt else: fmt = _SHARED_ONLY if syncobj else _PICKLED + if unbound is None: + unbound = self._unbound + else: + unbound = _serialize_unbound(unbound) if timeout is not None: timeout = int(timeout) if timeout < 0: @@ -166,7 +253,7 @@ def put(self, obj, timeout=None, *, obj = pickle.dumps(obj) while True: try: - _queues.put(self._id, obj, fmt) + _queues.put(self._id, obj, fmt, unbound) except QueueFull as exc: if timeout is not None and time.time() >= end: raise # re-raise @@ -174,14 +261,18 @@ def put(self, obj, timeout=None, *, else: break - def put_nowait(self, obj, *, syncobj=None): + def put_nowait(self, obj, *, syncobj=None, unbound=None): if syncobj is None: fmt = self._fmt else: fmt = _SHARED_ONLY if syncobj else _PICKLED + if unbound is None: + unbound = self._unbound + else: + unbound = _serialize_unbound(unbound) if fmt is _PICKLED: obj = pickle.dumps(obj) - _queues.put(self._id, obj, fmt) + _queues.put(self._id, obj, fmt, unbound) def get(self, timeout=None, *, _delay=10 / 1000, # 10 milliseconds @@ -189,6 +280,10 @@ def get(self, timeout=None, *, """Return the next object from the queue. This blocks while the queue is empty. + + If the next item's original interpreter has been destroyed + then the "next object" is determined by the value of the + "unbound" argument to put(). """ if timeout is not None: timeout = int(timeout) @@ -197,13 +292,16 @@ def get(self, timeout=None, *, end = time.time() + timeout while True: try: - obj, fmt = _queues.get(self._id) + obj, fmt, unbound = _queues.get(self._id) except QueueEmpty as exc: if timeout is not None and time.time() >= end: raise # re-raise time.sleep(_delay) else: break + if unbound is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unbound) if fmt == _PICKLED: obj = pickle.loads(obj) else: @@ -217,9 +315,12 @@ def get_nowait(self): is the same as get(). """ try: - obj, fmt = _queues.get(self._id) + obj, fmt, unbound = _queues.get(self._id) except QueueEmpty as exc: raise # re-raise + if unbound is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unbound) if fmt == _PICKLED: obj = pickle.loads(obj) else: diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index d16d294b82d044..9a9716bc21e362 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -19,7 +19,7 @@ def get_num_queues(): class TestBase(_TestBase): def tearDown(self): - for qid, _ in _queues.list_all(): + for qid, _, _ in _queues.list_all(): try: _queues.destroy(qid) except Exception: @@ -40,7 +40,7 @@ def test_highlevel_reloaded(self): importlib.reload(queues) def test_create_destroy(self): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, 'replace') _queues.destroy(qid) self.assertEqual(get_num_queues(), 0) with self.assertRaises(queues.QueueNotFoundError): @@ -54,7 +54,7 @@ def test_not_destroyed(self): '-c', dedent(f""" import {_queues.__name__} as _queues - _queues.create(2, 0) + _queues.create(2, 0, 'replace') """), ) self.assertEqual(stdout, '') @@ -65,13 +65,13 @@ def test_not_destroyed(self): def test_bind_release(self): with self.subTest('typical'): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, 'replace') _queues.bind(qid) _queues.release(qid) self.assertEqual(get_num_queues(), 0) with self.subTest('bind too much'): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, 'replace') _queues.bind(qid) _queues.bind(qid) _queues.release(qid) @@ -79,7 +79,7 @@ def test_bind_release(self): self.assertEqual(get_num_queues(), 0) with self.subTest('nested'): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, 'replace') _queues.bind(qid) _queues.bind(qid) _queues.release(qid) @@ -87,7 +87,7 @@ def test_bind_release(self): self.assertEqual(get_num_queues(), 0) with self.subTest('release without binding'): - qid = _queues.create(2, 0) + qid = _queues.create(2, 0, 'replace') with self.assertRaises(queues.QueueError): _queues.release(qid) @@ -427,26 +427,206 @@ def test_put_get_different_interpreters(self): self.assertNotEqual(id(obj2), int(out)) def test_put_cleared_with_subinterpreter(self): - interp = interpreters.create() - queue = queues.create() - - out = _run_output( - interp, - dedent(f""" + def common(queue, unbound=None, presize=0): + if not unbound: + extraargs = '' + elif unbound is queues.UNBOUND: + extraargs = ', unbound=queues.UNBOUND' + elif unbound is queues.UNBOUND_ERROR: + extraargs = ', unbound=queues.UNBOUND_ERROR' + elif unbound is queues.UNBOUND_REMOVE: + extraargs = ', unbound=queues.UNBOUND_REMOVE' + else: + raise NotImplementedError(repr(unbound)) + interp = interpreters.create() + + _run_output(interp, dedent(f""" from test.support.interpreters import queues queue = queues.Queue({queue.id}) obj1 = b'spam' obj2 = b'eggs' - queue.put(obj1, syncobj=True) - queue.put(obj2, syncobj=True) + queue.put(obj1, syncobj=True{extraargs}) + queue.put(obj2, syncobj=True{extraargs}) """)) - self.assertEqual(queue.qsize(), 2) + self.assertEqual(queue.qsize(), presize + 2) + + if presize == 0: + obj1 = queue.get() + self.assertEqual(obj1, b'spam') + self.assertEqual(queue.qsize(), presize + 1) + + return interp + + with self.subTest('default'): # UNBOUND + queue = queues.create() + interp = common(queue) + del interp + obj1 = queue.get() + self.assertIs(obj1, queues.UNBOUND) + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + with self.subTest('UNBOUND'): + queue = queues.create() + interp = common(queue, queues.UNBOUND) + del interp + obj1 = queue.get() + self.assertIs(obj1, queues.UNBOUND) + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + with self.subTest('UNBOUND_ERROR'): + queue = queues.create() + interp = common(queue, queues.UNBOUND_ERROR) + + del interp + self.assertEqual(queue.qsize(), 1) + with self.assertRaises(queues.ItemInterpreterDestroyed): + queue.get() + + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + with self.subTest('UNBOUND_REMOVE'): + queue = queues.create() + + interp = common(queue, queues.UNBOUND_REMOVE) + del interp + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + queue.put(b'ham', unbound=queues.UNBOUND_REMOVE) + self.assertEqual(queue.qsize(), 1) + interp = common(queue, queues.UNBOUND_REMOVE, 1) + self.assertEqual(queue.qsize(), 3) + queue.put(42, unbound=queues.UNBOUND_REMOVE) + self.assertEqual(queue.qsize(), 4) + del interp + self.assertEqual(queue.qsize(), 2) + obj1 = queue.get() + obj2 = queue.get() + self.assertEqual(obj1, b'ham') + self.assertEqual(obj2, 42) + self.assertEqual(queue.qsize(), 0) + with self.assertRaises(queues.QueueEmpty): + queue.get_nowait() + + def test_put_cleared_with_subinterpreter_mixed(self): + queue = queues.create() + interp = interpreters.create() + _run_output(interp, dedent(f""" + from test.support.interpreters import queues + queue = queues.Queue({queue.id}) + queue.put(1, syncobj=True, unbound=queues.UNBOUND) + queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR) + queue.put(3, syncobj=True) + queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE) + queue.put(5, syncobj=True, unbound=queues.UNBOUND) + """)) + self.assertEqual(queue.qsize(), 5) + + del interp + self.assertEqual(queue.qsize(), 4) obj1 = queue.get() - self.assertEqual(obj1, b'spam') + self.assertIs(obj1, queues.UNBOUND) + self.assertEqual(queue.qsize(), 3) + + with self.assertRaises(queues.ItemInterpreterDestroyed): + queue.get() + self.assertEqual(queue.qsize(), 2) + + obj2 = queue.get() + self.assertIs(obj2, queues.UNBOUND) self.assertEqual(queue.qsize(), 1) - del interp + obj3 = queue.get() + self.assertIs(obj3, queues.UNBOUND) + self.assertEqual(queue.qsize(), 0) + + def test_put_cleared_with_subinterpreter_multiple(self): + queue = queues.create() + interp1 = interpreters.create() + interp2 = interpreters.create() + + queue.put(1, syncobj=True) + _run_output(interp1, dedent(f""" + from test.support.interpreters import queues + queue = queues.Queue({queue.id}) + obj1 = queue.get() + queue.put(2, syncobj=True, unbound=queues.UNBOUND) + queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE) + """)) + _run_output(interp2, dedent(f""" + from test.support.interpreters import queues + queue = queues.Queue({queue.id}) + obj2 = queue.get() + obj1 = queue.get() + """)) + self.assertEqual(queue.qsize(), 0) + queue.put(3) + _run_output(interp1, dedent(""" + queue.put(4, syncobj=True, unbound=queues.UNBOUND) + # interp closed here + queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE) + queue.put(6, syncobj=True, unbound=queues.UNBOUND) + """)) + _run_output(interp2, dedent(""" + queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR) + # interp closed here + queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR) + queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE) + queue.put(8, syncobj=True, unbound=queues.UNBOUND) + """)) + _run_output(interp1, dedent(""" + queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE) + queue.put(10, syncobj=True, unbound=queues.UNBOUND) + """)) + self.assertEqual(queue.qsize(), 10) + + obj3 = queue.get() + self.assertEqual(obj3, 3) + self.assertEqual(queue.qsize(), 9) + + obj4 = queue.get() + self.assertEqual(obj4, 4) + self.assertEqual(queue.qsize(), 8) + + del interp1 + self.assertEqual(queue.qsize(), 6) + + # obj5 was removed + + obj6 = queue.get() + self.assertIs(obj6, queues.UNBOUND) + self.assertEqual(queue.qsize(), 5) + + obj7 = queue.get() + self.assertEqual(obj7, 7) + self.assertEqual(queue.qsize(), 4) + + del interp2 + self.assertEqual(queue.qsize(), 3) + + # obj1 + with self.assertRaises(queues.ItemInterpreterDestroyed): + queue.get() + self.assertEqual(queue.qsize(), 2) + + # obj2 was removed + + obj8 = queue.get() + self.assertIs(obj8, queues.UNBOUND) + self.assertEqual(queue.qsize(), 1) + + # obj9 was removed + + obj10 = queue.get() + self.assertIs(obj10, queues.UNBOUND) self.assertEqual(queue.qsize(), 0) def test_put_get_different_threads(self): diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index cb8b9e4a661d5a..ddcf2ef187f1cf 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -389,6 +389,63 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) } +/* unbound items ************************************************************/ + +#define UNBOUND_REMOVE 1 +#define UNBOUND_ERROR 2 +#define UNBOUND_REPLACE_DEFAULT 3 + +// It would also be possible to add UNBOUND_REPLACE where the replacement +// value is user-provided. There would be some limitations there, though. +// Another possibility would be something like UNBOUND_COPY, where the +// object is released but the underlying data is copied (with the "raw" +// allocator) and used when the item is popped off the queue. + +static int +encode_unbound(const char *op, int *p_unbound) +{ + int unbound = 0; + if (strcmp(op, "remove") == 0) { + unbound = UNBOUND_REMOVE; + } + else if (strcmp(op, "error") == 0) { + unbound = UNBOUND_ERROR; + } + else if (strcmp(op, "replace") == 0) { + unbound = UNBOUND_REPLACE_DEFAULT; + } + else { + PyErr_Format(PyExc_ValueError, + "unsupported unbound op %s", op); + return -1; + } + assert(unbound > 0); + *p_unbound = unbound; + return 0; +} + +static const char * +decode_unbound(int encoded) +{ + const char *op = NULL; + switch (encoded) { + case UNBOUND_REMOVE: + op = "remove"; + break; + case UNBOUND_ERROR: + op = "error"; + break; + case UNBOUND_REPLACE_DEFAULT: + op = "replace"; + break; + default: + Py_FatalError("unsupported unbound op"); + } + + return op; +} + + /* the basic queue **********************************************************/ struct _queueitem; @@ -396,40 +453,48 @@ struct _queueitem; typedef struct _queueitem { _PyCrossInterpreterData *data; int fmt; + int unbound; struct _queueitem *next; } _queueitem; static void _queueitem_init(_queueitem *item, - _PyCrossInterpreterData *data, int fmt) + _PyCrossInterpreterData *data, int fmt, int unbound) { *item = (_queueitem){ .data = data, .fmt = fmt, + .unbound = unbound, }; } +static void +_queueitem_clear_data(_queueitem *item) +{ + if (item->data == NULL) { + return; + } + // It was allocated in queue_put(). + (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); + item->data = NULL; +} + static void _queueitem_clear(_queueitem *item) { item->next = NULL; - - if (item->data != NULL) { - // It was allocated in queue_put(). - (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); - item->data = NULL; - } + _queueitem_clear_data(item); } static _queueitem * -_queueitem_new(_PyCrossInterpreterData *data, int fmt) +_queueitem_new(_PyCrossInterpreterData *data, int fmt, int unbound) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } - _queueitem_init(item, data, fmt); + _queueitem_init(item, data, fmt, unbound); return item; } @@ -452,10 +517,11 @@ _queueitem_free_all(_queueitem *item) static void _queueitem_popped(_queueitem *item, - _PyCrossInterpreterData **p_data, int *p_fmt) + _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unbound) { *p_data = item->data; *p_fmt = item->fmt; + *p_unbound = item->unbound; // We clear them here, so they won't be released in _queueitem_clear(). item->data = NULL; _queueitem_free(item); @@ -474,11 +540,14 @@ typedef struct _queue { _queueitem *first; _queueitem *last; } items; - int fmt; + struct { + int fmt; + int unbound; + } defaults; } _queue; static int -_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt) +_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unbound) { PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { @@ -490,7 +559,10 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt) .items = { .maxsize = maxsize, }, - .fmt = fmt, + .defaults = { + .fmt = fmt, + .unbound = unbound, + }, }; return 0; } @@ -571,7 +643,7 @@ _queue_unlock(_queue *queue) } static int -_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) +_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt, int unbound) { int err = _queue_lock(queue); if (err < 0) { @@ -587,7 +659,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) return ERR_QUEUE_FULL; } - _queueitem *item = _queueitem_new(data, fmt); + _queueitem *item = _queueitem_new(data, fmt, unbound); if (item == NULL) { _queue_unlock(queue); return -1; @@ -608,7 +680,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) static int _queue_next(_queue *queue, - _PyCrossInterpreterData **p_data, int *p_fmt) + _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unbound) { int err = _queue_lock(queue); if (err < 0) { @@ -627,7 +699,7 @@ _queue_next(_queue *queue, } queue->items.count -= 1; - _queueitem_popped(item, p_data, p_fmt); + _queueitem_popped(item, p_data, p_fmt, p_unbound); _queue_unlock(queue); return 0; @@ -692,19 +764,25 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid) while (next != NULL) { _queueitem *item = next; next = item->next; - if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) { - if (prev == NULL) { - queue->items.first = item->next; - } - else { - prev->next = item->next; + if (item->data != NULL + && _PyCrossInterpreterData_INTERPID(item->data) == interpid) + { + if (item->unbound == UNBOUND_REMOVE) { + if (prev == NULL) { + queue->items.first = item->next; + } + else { + prev->next = item->next; + } + _queueitem_free(item); + queue->items.count -= 1; + continue; } - _queueitem_free(item); - queue->items.count -= 1; - } - else { - prev = item; + + // We completely throw away the cross-interpreter data. + _queueitem_clear_data(item); } + prev = item; } _queue_unlock(queue); @@ -966,18 +1044,19 @@ _queues_decref(_queues *queues, int64_t qid) return res; } -struct queue_id_and_fmt { +struct queue_id_and_info { int64_t id; int fmt; + const char *unboundop; }; -static struct queue_id_and_fmt * -_queues_list_all(_queues *queues, int64_t *count) +static struct queue_id_and_info * +_queues_list_all(_queues *queues, int64_t *p_count) { - struct queue_id_and_fmt *qids = NULL; + struct queue_id_and_info *qids = NULL; PyThread_acquire_lock(queues->mutex, WAIT_LOCK); - struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt, - (Py_ssize_t)(queues->count)); + struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info, + (Py_ssize_t)(queues->count)); if (ids == NULL) { goto done; } @@ -985,9 +1064,10 @@ _queues_list_all(_queues *queues, int64_t *count) for (int64_t i=0; ref != NULL; ref = ref->next, i++) { ids[i].id = ref->qid; assert(ref->queue != NULL); - ids[i].fmt = ref->queue->fmt; + ids[i].fmt = ref->queue->defaults.fmt; + ids[i].unboundop = decode_unbound(ref->queue->defaults.unbound); } - *count = queues->count; + *p_count = queues->count; qids = ids; done: @@ -1021,13 +1101,13 @@ _queue_free(_queue *queue) // Create a new queue. static int64_t -queue_create(_queues *queues, Py_ssize_t maxsize, int fmt) +queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unbound) { _queue *queue = GLOBAL_MALLOC(_queue); if (queue == NULL) { return ERR_QUEUE_ALLOC; } - int err = _queue_init(queue, maxsize, fmt); + int err = _queue_init(queue, maxsize, fmt, unbound); if (err < 0) { GLOBAL_FREE(queue); return (int64_t)err; @@ -1056,7 +1136,7 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) +queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unbound) { // Look up the queue. _queue *queue = NULL; @@ -1079,7 +1159,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) } // Add the data to the queue. - int res = _queue_add(queue, data, fmt); + int res = _queue_add(queue, data, fmt, unbound); _queue_unmark_waiter(queue, queues->mutex); if (res != 0) { // We may chain an exception here: @@ -1094,7 +1174,8 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) // Pop the next object off the queue. Fail if empty. // XXX Support a "wait" mutex? static int -queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt) +queue_get(_queues *queues, int64_t qid, + PyObject **res, int *p_fmt, int *p_unbound) { int err; *res = NULL; @@ -1110,7 +1191,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt) // Pop off the next item from the queue. _PyCrossInterpreterData *data = NULL; - err = _queue_next(queue, &data, p_fmt); + err = _queue_next(queue, &data, p_fmt, p_unbound); _queue_unmark_waiter(queue, queues->mutex); if (err != 0) { return err; @@ -1397,15 +1478,22 @@ qidarg_converter(PyObject *arg, void *ptr) static PyObject * queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"maxsize", "fmt", NULL}; + static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL}; Py_ssize_t maxsize; int fmt; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist, - &maxsize, &fmt)) { + const char *unboundop; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "nis:create", kwlist, + &maxsize, &fmt, &unboundop)) + { + return NULL; + } + + int unbound; + if (encode_unbound(unboundop, &unbound) < 0) { return NULL; } - int64_t qid = queue_create(&_globals.queues, maxsize, fmt); + int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unbound); if (qid < 0) { (void)handle_queue_error((int)qid, self, qid); return NULL; @@ -1463,9 +1551,9 @@ static PyObject * queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) { int64_t count = 0; - struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count); + struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count); if (qids == NULL) { - if (count == 0) { + if (!PyErr_Occurred() && count == 0) { return PyList_New(0); } return NULL; @@ -1474,9 +1562,10 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) if (ids == NULL) { goto finally; } - struct queue_id_and_fmt *cur = qids; + struct queue_id_and_info *cur = qids; for (int64_t i=0; i < count; cur++, i++) { - PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt); + PyObject *item = Py_BuildValue("Lis", cur->id, cur->fmt, + cur->unboundop); if (item == NULL) { Py_SETREF(ids, NULL); break; @@ -1498,18 +1587,26 @@ Each corresponding default format is also included."); static PyObject * queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"qid", "obj", "fmt", NULL}; + static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL}; qidarg_converter_data qidarg; PyObject *obj; int fmt; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist, - qidarg_converter, &qidarg, &obj, &fmt)) { + const char *unboundop; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Ois:put", kwlist, + qidarg_converter, &qidarg, &obj, &fmt, + &unboundop)) + { return NULL; } int64_t qid = qidarg.id; + int unbound; + if (encode_unbound(unboundop, &unbound) < 0) { + return NULL; + } + /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj, fmt); + int err = queue_put(&_globals.queues, qid, obj, fmt, unbound); // This is the only place that raises QueueFull. if (handle_queue_error(err, self, qid)) { return NULL; @@ -1536,13 +1633,18 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) PyObject *obj = NULL; int fmt = 0; - int err = queue_get(&_globals.queues, qid, &obj, &fmt); + int unbound = 0; + int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unbound); // This is the only place that raises QueueEmpty. if (handle_queue_error(err, self, qid)) { return NULL; } - PyObject *res = Py_BuildValue("Oi", obj, fmt); + if (obj == NULL) { + const char *unboundop = decode_unbound(unbound); + return Py_BuildValue("Ois", Py_None, fmt, unboundop); + } + PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None); Py_DECREF(obj); return res; } @@ -1656,17 +1758,12 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds) if (handle_queue_error(err, self, qid)) { return NULL; } - int fmt = queue->fmt; + int fmt = queue->defaults.fmt; + const char *unboundop = decode_unbound(queue->defaults.unbound); _queue_unmark_waiter(queue, _globals.queues.mutex); - PyObject *fmt_obj = PyLong_FromLong(fmt); - if (fmt_obj == NULL) { - return NULL; - } - // For now queues only have one default. - PyObject *res = PyTuple_Pack(1, fmt_obj); - Py_DECREF(fmt_obj); - return res; + PyObject *defaults = Py_BuildValue("is", fmt, unboundop); + return defaults; } PyDoc_STRVAR(queuesmod_get_queue_defaults_doc, From cc5354d015f6a9017ff3331b753c6130e3b787ba Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 6 Mar 2024 13:41:06 -0700 Subject: [PATCH 02/11] Do not convert the unbound op between str and int. --- Lib/test/support/interpreters/queues.py | 76 ++++++++------ Lib/test/test_interpreters/test_queues.py | 15 +-- Modules/_xxinterpqueuesmodule.c | 122 ++++++++-------------- 3 files changed, 98 insertions(+), 115 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 706dea7be33af5..d9f2844a452c14 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -65,28 +65,37 @@ def __repr__(self): UNBOUND_ERROR = object() UNBOUND_REMOVE = object() +_UNBOUND_CONSTANT_TO_FLAG = { + UNBOUND_REMOVE: 1, + UNBOUND_ERROR: 2, + UNBOUND: 3, +} +_UNBOUND_FLAG_TO_CONSTANT = {v: k + for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} def _serialize_unbound(unbound): - if unbound is UNBOUND_REMOVE: - unbound = 'remove' - elif unbound is UNBOUND_ERROR: - unbound = 'error' - elif unbound is UNBOUND: - unbound = 'replace' - else: - raise NotImplementedError(f'unsupported unbound replacement {unbound !r}') - return unbound - - -def _resolve_unbound(unbound): - if unbound == 'remove': - raise RuntimeError('"remove" not possible here') - elif unbound == 'error': + op = unbound + try: + flag = _UNBOUND_CONSTANT_TO_FLAG[op] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {op!r}') + return flag, + + +def _resolve_unbound(flag): + try: + op = _UNBOUND_FLAG_TO_CONSTANT[flag] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') + if op is UNBOUND_REMOVE: + # "remove" not possible here + raise NotImplementedError + elif op is UNBOUND_ERROR: raise ItemInterpreterDestroyed("item's original interpreter destroyed") - elif unbound == 'replace': + elif op is UNBOUND: return UNBOUND else: - raise NotImplementedError(f'{unbound!r} unsupported') + raise NotImplementedError(repr(op)) def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): @@ -103,7 +112,8 @@ def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): """ fmt = _SHARED_ONLY if syncobj else _PICKLED unbound = _serialize_unbound(unbounditems) - qid = _queues.create(maxsize, fmt, unbound) + unboundop, = unbound + qid = _queues.create(maxsize, fmt, unboundop) return Queue(qid, _fmt=fmt, _unbound=unbound) @@ -126,11 +136,13 @@ def __new__(cls, id, /, *, _fmt=None, _unbound=None): raise TypeError(f'id must be an int, got {id!r}') if _fmt is None: if _unbound is None: - _fmt, _unbound = _queues.get_queue_defaults(id) + _fmt, op = _queues.get_queue_defaults(id) + _unbound = (op,) else: _fmt, _ = _queues.get_queue_defaults(id) elif _unbound is None: - _, _unbound = _queues.get_queue_defaults(id) + _, op = _queues.get_queue_defaults(id) + _unbound = (op,) try: self = _known_queues[id] except KeyError: @@ -241,9 +253,9 @@ def put(self, obj, timeout=None, *, else: fmt = _SHARED_ONLY if syncobj else _PICKLED if unbound is None: - unbound = self._unbound + unboundop, = self._unbound else: - unbound = _serialize_unbound(unbound) + unboundop, = _serialize_unbound(unbound) if timeout is not None: timeout = int(timeout) if timeout < 0: @@ -253,7 +265,7 @@ def put(self, obj, timeout=None, *, obj = pickle.dumps(obj) while True: try: - _queues.put(self._id, obj, fmt, unbound) + _queues.put(self._id, obj, fmt, unboundop) except QueueFull as exc: if timeout is not None and time.time() >= end: raise # re-raise @@ -267,12 +279,12 @@ def put_nowait(self, obj, *, syncobj=None, unbound=None): else: fmt = _SHARED_ONLY if syncobj else _PICKLED if unbound is None: - unbound = self._unbound + unboundop, = self._unbound else: - unbound = _serialize_unbound(unbound) + unboundop, = _serialize_unbound(unbound) if fmt is _PICKLED: obj = pickle.dumps(obj) - _queues.put(self._id, obj, fmt, unbound) + _queues.put(self._id, obj, fmt, unboundop) def get(self, timeout=None, *, _delay=10 / 1000, # 10 milliseconds @@ -292,16 +304,16 @@ def get(self, timeout=None, *, end = time.time() + timeout while True: try: - obj, fmt, unbound = _queues.get(self._id) + obj, fmt, unboundop = _queues.get(self._id) except QueueEmpty as exc: if timeout is not None and time.time() >= end: raise # re-raise time.sleep(_delay) else: break - if unbound is not None: + if unboundop is not None: assert obj is None, repr(obj) - return _resolve_unbound(unbound) + return _resolve_unbound(unboundop) if fmt == _PICKLED: obj = pickle.loads(obj) else: @@ -315,12 +327,12 @@ def get_nowait(self): is the same as get(). """ try: - obj, fmt, unbound = _queues.get(self._id) + obj, fmt, unboundop = _queues.get(self._id) except QueueEmpty as exc: raise # re-raise - if unbound is not None: + if unboundop is not None: assert obj is None, repr(obj) - return _resolve_unbound(unbound) + return _resolve_unbound(unboundop) if fmt == _PICKLED: obj = pickle.loads(obj) else: diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 9a9716bc21e362..7b4ad920007fdb 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -13,6 +13,9 @@ from .utils import _run_output, TestBase as _TestBase +REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND] + + def get_num_queues(): return len(_queues.list_all()) @@ -40,7 +43,7 @@ def test_highlevel_reloaded(self): importlib.reload(queues) def test_create_destroy(self): - qid = _queues.create(2, 0, 'replace') + qid = _queues.create(2, 0, REPLACE) _queues.destroy(qid) self.assertEqual(get_num_queues(), 0) with self.assertRaises(queues.QueueNotFoundError): @@ -54,7 +57,7 @@ def test_not_destroyed(self): '-c', dedent(f""" import {_queues.__name__} as _queues - _queues.create(2, 0, 'replace') + _queues.create(2, 0, {REPLACE}) """), ) self.assertEqual(stdout, '') @@ -65,13 +68,13 @@ def test_not_destroyed(self): def test_bind_release(self): with self.subTest('typical'): - qid = _queues.create(2, 0, 'replace') + qid = _queues.create(2, 0, REPLACE) _queues.bind(qid) _queues.release(qid) self.assertEqual(get_num_queues(), 0) with self.subTest('bind too much'): - qid = _queues.create(2, 0, 'replace') + qid = _queues.create(2, 0, REPLACE) _queues.bind(qid) _queues.bind(qid) _queues.release(qid) @@ -79,7 +82,7 @@ def test_bind_release(self): self.assertEqual(get_num_queues(), 0) with self.subTest('nested'): - qid = _queues.create(2, 0, 'replace') + qid = _queues.create(2, 0, REPLACE) _queues.bind(qid) _queues.bind(qid) _queues.release(qid) @@ -87,7 +90,7 @@ def test_bind_release(self): self.assertEqual(get_num_queues(), 0) with self.subTest('release without binding'): - qid = _queues.create(2, 0, 'replace') + qid = _queues.create(2, 0, REPLACE) with self.assertRaises(queues.QueueError): _queues.release(qid) diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index ddcf2ef187f1cf..47b42e1d8f95d4 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -402,47 +402,16 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) // allocator) and used when the item is popped off the queue. static int -encode_unbound(const char *op, int *p_unbound) +check_unbound(int unboundop) { - int unbound = 0; - if (strcmp(op, "remove") == 0) { - unbound = UNBOUND_REMOVE; - } - else if (strcmp(op, "error") == 0) { - unbound = UNBOUND_ERROR; - } - else if (strcmp(op, "replace") == 0) { - unbound = UNBOUND_REPLACE_DEFAULT; - } - else { - PyErr_Format(PyExc_ValueError, - "unsupported unbound op %s", op); - return -1; - } - assert(unbound > 0); - *p_unbound = unbound; - return 0; -} - -static const char * -decode_unbound(int encoded) -{ - const char *op = NULL; - switch (encoded) { + switch (unboundop) { case UNBOUND_REMOVE: - op = "remove"; - break; case UNBOUND_ERROR: - op = "error"; - break; case UNBOUND_REPLACE_DEFAULT: - op = "replace"; - break; + return 1; default: - Py_FatalError("unsupported unbound op"); + return 0; } - - return op; } @@ -453,18 +422,18 @@ struct _queueitem; typedef struct _queueitem { _PyCrossInterpreterData *data; int fmt; - int unbound; + int unboundop; struct _queueitem *next; } _queueitem; static void _queueitem_init(_queueitem *item, - _PyCrossInterpreterData *data, int fmt, int unbound) + _PyCrossInterpreterData *data, int fmt, int unboundop) { *item = (_queueitem){ .data = data, .fmt = fmt, - .unbound = unbound, + .unboundop = unboundop, }; } @@ -487,14 +456,14 @@ _queueitem_clear(_queueitem *item) } static _queueitem * -_queueitem_new(_PyCrossInterpreterData *data, int fmt, int unbound) +_queueitem_new(_PyCrossInterpreterData *data, int fmt, int unboundop) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } - _queueitem_init(item, data, fmt, unbound); + _queueitem_init(item, data, fmt, unboundop); return item; } @@ -517,11 +486,11 @@ _queueitem_free_all(_queueitem *item) static void _queueitem_popped(_queueitem *item, - _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unbound) + _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop) { *p_data = item->data; *p_fmt = item->fmt; - *p_unbound = item->unbound; + *p_unboundop = item->unboundop; // We clear them here, so they won't be released in _queueitem_clear(). item->data = NULL; _queueitem_free(item); @@ -542,12 +511,12 @@ typedef struct _queue { } items; struct { int fmt; - int unbound; + int unboundop; } defaults; } _queue; static int -_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unbound) +_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop) { PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { @@ -561,7 +530,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unbound) }, .defaults = { .fmt = fmt, - .unbound = unbound, + .unboundop = unboundop, }, }; return 0; @@ -643,7 +612,7 @@ _queue_unlock(_queue *queue) } static int -_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt, int unbound) +_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt, int unboundop) { int err = _queue_lock(queue); if (err < 0) { @@ -659,7 +628,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt, int unbound) return ERR_QUEUE_FULL; } - _queueitem *item = _queueitem_new(data, fmt, unbound); + _queueitem *item = _queueitem_new(data, fmt, unboundop); if (item == NULL) { _queue_unlock(queue); return -1; @@ -680,7 +649,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt, int unbound) static int _queue_next(_queue *queue, - _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unbound) + _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop) { int err = _queue_lock(queue); if (err < 0) { @@ -699,7 +668,7 @@ _queue_next(_queue *queue, } queue->items.count -= 1; - _queueitem_popped(item, p_data, p_fmt, p_unbound); + _queueitem_popped(item, p_data, p_fmt, p_unboundop); _queue_unlock(queue); return 0; @@ -767,7 +736,7 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid) if (item->data != NULL && _PyCrossInterpreterData_INTERPID(item->data) == interpid) { - if (item->unbound == UNBOUND_REMOVE) { + if (item->unboundop == UNBOUND_REMOVE) { if (prev == NULL) { queue->items.first = item->next; } @@ -1047,7 +1016,7 @@ _queues_decref(_queues *queues, int64_t qid) struct queue_id_and_info { int64_t id; int fmt; - const char *unboundop; + int unboundop; }; static struct queue_id_and_info * @@ -1065,7 +1034,7 @@ _queues_list_all(_queues *queues, int64_t *p_count) ids[i].id = ref->qid; assert(ref->queue != NULL); ids[i].fmt = ref->queue->defaults.fmt; - ids[i].unboundop = decode_unbound(ref->queue->defaults.unbound); + ids[i].unboundop = ref->queue->defaults.unboundop; } *p_count = queues->count; @@ -1101,13 +1070,13 @@ _queue_free(_queue *queue) // Create a new queue. static int64_t -queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unbound) +queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop) { _queue *queue = GLOBAL_MALLOC(_queue); if (queue == NULL) { return ERR_QUEUE_ALLOC; } - int err = _queue_init(queue, maxsize, fmt, unbound); + int err = _queue_init(queue, maxsize, fmt, unboundop); if (err < 0) { GLOBAL_FREE(queue); return (int64_t)err; @@ -1136,7 +1105,7 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unbound) +queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop) { // Look up the queue. _queue *queue = NULL; @@ -1159,7 +1128,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unbound) } // Add the data to the queue. - int res = _queue_add(queue, data, fmt, unbound); + int res = _queue_add(queue, data, fmt, unboundop); _queue_unmark_waiter(queue, queues->mutex); if (res != 0) { // We may chain an exception here: @@ -1175,7 +1144,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unbound) // XXX Support a "wait" mutex? static int queue_get(_queues *queues, int64_t qid, - PyObject **res, int *p_fmt, int *p_unbound) + PyObject **res, int *p_fmt, int *p_unboundop) { int err; *res = NULL; @@ -1191,7 +1160,7 @@ queue_get(_queues *queues, int64_t qid, // Pop off the next item from the queue. _PyCrossInterpreterData *data = NULL; - err = _queue_next(queue, &data, p_fmt, p_unbound); + err = _queue_next(queue, &data, p_fmt, p_unboundop); _queue_unmark_waiter(queue, queues->mutex); if (err != 0) { return err; @@ -1481,19 +1450,19 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL}; Py_ssize_t maxsize; int fmt; - const char *unboundop; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "nis:create", kwlist, + int unboundop; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist, &maxsize, &fmt, &unboundop)) { return NULL; } - - int unbound; - if (encode_unbound(unboundop, &unbound) < 0) { + if (!check_unbound(unboundop)) { + PyErr_Format(PyExc_ValueError, + "unsupported unboundop %d", unboundop); return NULL; } - int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unbound); + int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop); if (qid < 0) { (void)handle_queue_error((int)qid, self, qid); return NULL; @@ -1564,7 +1533,7 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) } struct queue_id_and_info *cur = qids; for (int64_t i=0; i < count; cur++, i++) { - PyObject *item = Py_BuildValue("Lis", cur->id, cur->fmt, + PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt, cur->unboundop); if (item == NULL) { Py_SETREF(ids, NULL); @@ -1591,22 +1560,22 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) qidarg_converter_data qidarg; PyObject *obj; int fmt; - const char *unboundop; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Ois:put", kwlist, + int unboundop; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist, qidarg_converter, &qidarg, &obj, &fmt, &unboundop)) { return NULL; } int64_t qid = qidarg.id; - - int unbound; - if (encode_unbound(unboundop, &unbound) < 0) { + if (!check_unbound(unboundop)) { + PyErr_Format(PyExc_ValueError, + "unsupported unboundop %d", unboundop); return NULL; } /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj, fmt, unbound); + int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop); // This is the only place that raises QueueFull. if (handle_queue_error(err, self, qid)) { return NULL; @@ -1633,16 +1602,15 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) PyObject *obj = NULL; int fmt = 0; - int unbound = 0; - int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unbound); + int unboundop = 0; + int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop); // This is the only place that raises QueueEmpty. if (handle_queue_error(err, self, qid)) { return NULL; } if (obj == NULL) { - const char *unboundop = decode_unbound(unbound); - return Py_BuildValue("Ois", Py_None, fmt, unboundop); + return Py_BuildValue("Oii", Py_None, fmt, unboundop); } PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None); Py_DECREF(obj); @@ -1759,10 +1727,10 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } int fmt = queue->defaults.fmt; - const char *unboundop = decode_unbound(queue->defaults.unbound); + int unboundop = queue->defaults.unboundop; _queue_unmark_waiter(queue, _globals.queues.mutex); - PyObject *defaults = Py_BuildValue("is", fmt, unboundop); + PyObject *defaults = Py_BuildValue("ii", fmt, unboundop); return defaults; } From ebd495019c521ddc4687b1e684216d7a74f1f847 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 6 Mar 2024 13:52:45 -0700 Subject: [PATCH 03/11] UNBOUND_REPLACE_DEFAULT -> UNBOUND_REPLACE --- Modules/_xxinterpqueuesmodule.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index 47b42e1d8f95d4..4b4e91c6b03de1 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -393,7 +393,7 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) #define UNBOUND_REMOVE 1 #define UNBOUND_ERROR 2 -#define UNBOUND_REPLACE_DEFAULT 3 +#define UNBOUND_REPLACE 3 // It would also be possible to add UNBOUND_REPLACE where the replacement // value is user-provided. There would be some limitations there, though. @@ -407,7 +407,7 @@ check_unbound(int unboundop) switch (unboundop) { case UNBOUND_REMOVE: case UNBOUND_ERROR: - case UNBOUND_REPLACE_DEFAULT: + case UNBOUND_REPLACE: return 1; default: return 0; From 594428b65f09e96d19101412b457412c7d00b0eb Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 6 Mar 2024 13:54:19 -0700 Subject: [PATCH 04/11] Add some asserts. --- Modules/_xxinterpqueuesmodule.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index 4b4e91c6b03de1..5e0c925e5a4238 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -430,6 +430,7 @@ static void _queueitem_init(_queueitem *item, _PyCrossInterpreterData *data, int fmt, int unboundop) { + assert(check_unbound(unboundop)); *item = (_queueitem){ .data = data, .fmt = fmt, @@ -518,6 +519,7 @@ typedef struct _queue { static int _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop) { + assert(check_unbound(unboundop)); PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_QUEUE_ALLOC; From b13cb4cfa88bb6a87ffc30f5b43fa1f3f539ebfb Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Wed, 6 Mar 2024 15:38:48 -0700 Subject: [PATCH 05/11] Keep track of each item's interpreter ID. --- Modules/_xxinterpqueuesmodule.c | 103 +++++++++++++++++++++++++------- 1 file changed, 80 insertions(+), 23 deletions(-) diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c index 5e0c925e5a4238..cdc7e636707c06 100644 --- a/Modules/_xxinterpqueuesmodule.c +++ b/Modules/_xxinterpqueuesmodule.c @@ -58,6 +58,19 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags) return res; } +static inline int64_t +_get_interpid(_PyCrossInterpreterData *data) +{ + int64_t interpid; + if (data != NULL) { + interpid = _PyCrossInterpreterData_INTERPID(data); + assert(!PyErr_Occurred()); + } + else { + interpid = PyInterpreterState_GetID(PyInterpreterState_Get()); + } + return interpid; +} static PyInterpreterState * _get_current_interp(void) @@ -420,6 +433,9 @@ check_unbound(int unboundop) struct _queueitem; typedef struct _queueitem { + /* The interpreter that added the item to the queue. + The actual bound interpid is found in item->data. */ + int64_t interpid; _PyCrossInterpreterData *data; int fmt; int unboundop; @@ -428,10 +444,20 @@ typedef struct _queueitem { static void _queueitem_init(_queueitem *item, - _PyCrossInterpreterData *data, int fmt, int unboundop) + int64_t interpid, _PyCrossInterpreterData *data, + int fmt, int unboundop) { + if (interpid < 0) { + interpid = _get_interpid(data); + } + else { + assert(data == NULL + || _PyCrossInterpreterData_INTERPID(data) < 0 + || interpid == _PyCrossInterpreterData_INTERPID(data)); + } assert(check_unbound(unboundop)); *item = (_queueitem){ + .interpid = interpid, .data = data, .fmt = fmt, .unboundop = unboundop, @@ -456,15 +482,17 @@ _queueitem_clear(_queueitem *item) _queueitem_clear_data(item); } + static _queueitem * -_queueitem_new(_PyCrossInterpreterData *data, int fmt, int unboundop) +_queueitem_new(int64_t interpid, _PyCrossInterpreterData *data, + int fmt, int unboundop) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } - _queueitem_init(item, data, fmt, unboundop); + _queueitem_init(item, interpid, data, fmt, unboundop); return item; } @@ -497,6 +525,34 @@ _queueitem_popped(_queueitem *item, _queueitem_free(item); } +static int +_queueitem_clear_interpreter(_queueitem *item) +{ + assert(item->interpid >= 0); + if (item->data == NULL) { + // Its interpreter was already cleared (or it was never bound). + // For UNBOUND_REMOVE it should have been freed at that time. + assert(item->unboundop != UNBOUND_REMOVE); + return 0; + } + assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid); + + switch (item->unboundop) { + case UNBOUND_REMOVE: + // The caller must free/clear it. + return 1; + case UNBOUND_ERROR: + case UNBOUND_REPLACE: + // We won't need the cross-interpreter data later + // so we completely throw it away. + _queueitem_clear_data(item); + return 0; + default: + Py_FatalError("not reachable"); + return -1; + } +} + /* the queue */ @@ -614,7 +670,8 @@ _queue_unlock(_queue *queue) } static int -_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt, int unboundop) +_queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data, + int fmt, int unboundop) { int err = _queue_lock(queue); if (err < 0) { @@ -630,7 +687,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt, int unboundop) return ERR_QUEUE_FULL; } - _queueitem *item = _queueitem_new(data, fmt, unboundop); + _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop); if (item == NULL) { _queue_unlock(queue); return -1; @@ -735,25 +792,22 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid) while (next != NULL) { _queueitem *item = next; next = item->next; - if (item->data != NULL - && _PyCrossInterpreterData_INTERPID(item->data) == interpid) - { - if (item->unboundop == UNBOUND_REMOVE) { - if (prev == NULL) { - queue->items.first = item->next; - } - else { - prev->next = item->next; - } - _queueitem_free(item); - queue->items.count -= 1; - continue; + int remove = (item->interpid == interpid) + ? _queueitem_clear_interpreter(item) + : 0; + if (remove) { + _queueitem_free(item); + if (prev == NULL) { + queue->items.first = next; } - - // We completely throw away the cross-interpreter data. - _queueitem_clear_data(item); + else { + prev->next = next; + } + queue->items.count -= 1; + } + else { + prev = item; } - prev = item; } _queue_unlock(queue); @@ -1128,9 +1182,12 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop) GLOBAL_FREE(data); return -1; } + assert(_PyCrossInterpreterData_INTERPID(data) == \ + PyInterpreterState_GetID(PyInterpreterState_Get())); // Add the data to the queue. - int res = _queue_add(queue, data, fmt, unboundop); + int64_t interpid = -1; // _queueitem_init() will set it. + int res = _queue_add(queue, interpid, data, fmt, unboundop); _queue_unmark_waiter(queue, queues->mutex); if (res != 0) { // We may chain an exception here: From be0db76e9de6788c7e4acc89bc05a565ea775942 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 12 Jul 2024 10:58:12 -0600 Subject: [PATCH 06/11] Add some extra info to the put() docstring. --- Lib/test/support/interpreters/queues.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 2b54cb05a7545e..b7f7070aee001c 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -236,14 +236,19 @@ def put(self, obj, timeout=None, *, destroyed. If "unbound" is None (the default) then it uses the - queue's default, set with create_queue(). + queue's default, set with create_queue(), + which is usually UNBOUND. If "unbound" is UNBOUND_ERROR then get() will raise an ItemInterpreterDestroyed exception if the original interpreter - has been destroyed. + has been destroyed. This does not otherwise affect the queue; + the next call to put() will work like normal, returning the next + item in the queue. If "unbound" is UNBOUND_REMOVE then the item will be removed from the queue as soon as the original interpreter is destroyed. + Be aware that this will introduce an imbalance between put() + and get() calls. If "unbound" is UNBOUND then it is returned by get() in place of the unbound item. From 8dcb8ff51877b0011eb33097eee381c0756b3a1f Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 12 Jul 2024 11:23:50 -0600 Subject: [PATCH 07/11] Clarify a comment. --- Modules/_interpqueuesmodule.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index a71b0806ac6ed3..c734c8ad37e80e 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -434,7 +434,9 @@ struct _queueitem; typedef struct _queueitem { /* The interpreter that added the item to the queue. - The actual bound interpid is found in item->data. */ + The actual bound interpid is found in item->data. + This is necessary because item->data might be NULL, + meaning the interpreter has been destroyed. */ int64_t interpid; _PyCrossInterpreterData *data; int fmt; @@ -482,7 +484,6 @@ _queueitem_clear(_queueitem *item) _queueitem_clear_data(item); } - static _queueitem * _queueitem_new(int64_t interpid, _PyCrossInterpreterData *data, int fmt, int unboundop) From 4053c1c25681faead4b5d5ad9148710e499dee90 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 12 Jul 2024 12:32:23 -0600 Subject: [PATCH 08/11] Drop an unused variable. --- Lib/test/support/interpreters/queues.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index b7f7070aee001c..71aebc4b3e0336 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -20,9 +20,6 @@ ] -_NOT_SET = object() - - class QueueEmpty(QueueError, queue.Empty): """Raised from get_nowait() when the queue is empty. From d64d298ae1d0ea4412b94ef1f5f201a7d36fcdce Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 12 Jul 2024 12:42:45 -0600 Subject: [PATCH 09/11] Fix some typos. --- Lib/test/support/interpreters/queues.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 71aebc4b3e0336..d77c949154cd1a 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -103,7 +103,7 @@ def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): "syncobj" sets the default for Queue.put() and Queue.put_nowait(). - "unbounditems" likewise sets the default. See Queue.pop() for + "unbounditems" likewise sets the default. See Queue.put() for supported values. The default value is UNBOUND, which replaces the unbound item. """ @@ -206,7 +206,7 @@ def put(self, obj, timeout=None, *, This blocks while the queue is full. If "syncobj" is None (the default) then it uses the - queue's default, set with create_queue().. + queue's default, set with create_queue(). If "syncobj" is false then all objects are supported, at the expense of worse performance. From f820c7052c81e896dfff5588098ea591773543a3 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 12 Jul 2024 13:30:26 -0600 Subject: [PATCH 10/11] Fix a docstring. --- Modules/_interpqueuesmodule.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index c734c8ad37e80e..8e82789198792f 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -1544,7 +1544,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(queuesmod_create_doc, -"create(maxsize, fmt) -> qid\n\ +"create(maxsize, fmt, unboundop) -> qid\n\ \n\ Create a new cross-interpreter queue and return its unique generated ID.\n\ It is a new reference as though bind() had been called on the queue.\n\ From 94f4f567e03e3b733ec9526566ec6ecae26f7524 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 12 Jul 2024 13:30:44 -0600 Subject: [PATCH 11/11] Fix list_all(). --- Lib/test/support/interpreters/queues.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index d77c949154cd1a..402ceffd1bb21c 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -116,8 +116,8 @@ def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): def list_all(): """Return a list of all open queues.""" - return [Queue(qid, _fmt=fmt, _unbound=unbound) - for qid, fmt, unbound in _queues.list_all()] + return [Queue(qid, _fmt=fmt, _unbound=(unboundop,)) + for qid, fmt, unboundop in _queues.list_all()] _known_queues = weakref.WeakValueDictionary()