From d4549413221399b4674b10b12a0818d4777a8c20 Mon Sep 17 00:00:00 2001 From: Benjamin Edwards Date: Sun, 19 Apr 2020 07:40:55 -0700 Subject: [PATCH 1/4] Implement channel_send_wait --- Lib/test/test__xxsubinterpreters.py | 31 ++++++++++++ Modules/_xxsubinterpretersmodule.c | 77 ++++++++++++++++++++++++++--- 2 files changed, 102 insertions(+), 6 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 30f8f98acc9dd3..f7642e43299a69 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -1337,6 +1337,37 @@ def test_run_string_arg_resolved(self): self.assertEqual(obj, b'spam') self.assertEqual(out.strip(), 'send') + def test_send_wait_revc(self): + cid = interpreters.channel_create() + interp = interpreters.create() + def run(): + _run_output(interp, dedent(f""" + import _xxsubinterpreters as _interpreters + import time + _interpreters.channel_send_wait({cid}, b"send") + #assert False + #while True: + # try: + # obj = _interpreters.channel_recv({cid}) + # break + # except _interpreters.ChannelEmptyError: + # time.sleep(0.1) + """)) + t = threading.Thread(target=run) + #import pdb; pdb.set_trace() + t.start() + #interpreters.channel_send_wait(cid, b"send", timeout=1) + #import pdb; pdb.set_trace() + while True: + try: + obj = interpreters.channel_recv(cid) + #import pdb; pdb.set_trace() + assert obj == b"send" + break + except interpreters.ChannelEmptyError: + time.sleep(1) + t.join() + # close def test_close_single_user(self): diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index fa35e14c554012..898f9717dff601 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -353,6 +353,8 @@ struct _channelitem; typedef struct _channelitem { _PyCrossInterpreterData *data; + /* The lock is owned by the sender. */ + PyThread_type_lock lock; struct _channelitem *next; } _channelitem; @@ -393,6 +395,9 @@ _channelitem_free_all(_channelitem *item) while (item != NULL) { _channelitem *last = item; item = item->next; + if (last->lock != NULL) { + PyThread_release_lock(last->lock); + } _channelitem_free(last); } } @@ -402,6 +407,9 @@ _channelitem_popped(_channelitem *item) { _PyCrossInterpreterData *data = item->data; item->data = NULL; + if (item->lock != NULL) { + PyThread_release_lock(item->lock); + } _channelitem_free(item); return data; } @@ -443,13 +451,18 @@ _channelqueue_free(_channelqueue *queue) } static int -_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data) +_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data, + PyThread_type_lock lock) { _channelitem *item = _channelitem_new(); if (item == NULL) { return -1; } item->data = data; + item->lock = lock; + if (lock != NULL) { + PyThread_acquire_lock(item->lock, WAIT_LOCK); + } queue->count += 1; if (queue->first == NULL) { @@ -761,7 +774,7 @@ _channel_free(_PyChannelState *chan) static int _channel_add(_PyChannelState *chan, int64_t interp, - _PyCrossInterpreterData *data) + _PyCrossInterpreterData *data, PyThread_type_lock lock) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -774,7 +787,7 @@ _channel_add(_PyChannelState *chan, int64_t interp, goto done; } - if (_channelqueue_put(chan->queue, data) != 0) { + if (_channelqueue_put(chan->queue, data, lock) != 0) { goto done; } @@ -1285,7 +1298,8 @@ _channel_destroy(_channels *channels, int64_t id) } static int -_channel_send(_channels *channels, int64_t id, PyObject *obj) +_channel_send(_channels *channels, int64_t id, PyObject *obj, + PyThread_type_lock lock) { PyInterpreterState *interp = _get_current(); if (interp == NULL) { @@ -1319,7 +1333,7 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj) } // Add the data to the channel. - int res = _channel_add(chan, PyInterpreterState_GetID(interp), data); + int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, lock); PyThread_release_lock(mutex); if (res != 0) { _PyCrossInterpreterData_Release(data); @@ -2337,7 +2351,7 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } - if (_channel_send(&_globals.channels, cid, obj) != 0) { + if (_channel_send(&_globals.channels, cid, obj, NULL) != 0) { return NULL; } Py_RETURN_NONE; @@ -2361,6 +2375,55 @@ channel_recv(PyObject *self, PyObject *args, PyObject *kwds) return _channel_recv(&_globals.channels, cid); } +static PyObject * +channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds) +{ + static char *kwlist[] = {"cid", "obj", "timeout", NULL}; + int64_t cid; + PyObject *obj; + int64_t timeout = 0; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|l:channel_send_wait", + kwlist, channel_id_converter, + &cid, &obj)) { + return NULL; + } + + // Create the lock that will be released when the data is recieved. + PyThread_type_lock lock = PyThread_allocate_lock(); + if (_channel_send(&_globals.channels, cid, obj, lock) != 0) { + return NULL; + } + + long long microseconds; + if (timeout > 0) { + microseconds = timeout * 1000000; + } + else { + microseconds = -1; + } + PyLockStatus lock_rc; + Py_BEGIN_ALLOW_THREADS + lock_rc = PyThread_acquire_lock_timed(lock, microseconds, 0); + Py_END_ALLOW_THREADS + + PyThread_free_lock(lock); + if (lock_rc == PY_LOCK_ACQUIRED) { + Py_RETURN_TRUE; + } + else { + Py_RETURN_FALSE; + } +} + +PyDoc_STRVAR(channel_send_wait_doc, + "channel_send_wait(cid, obj, timeout)\n\ +\n\ +Add the object's data to the channel's queue and wait until it's removed.\n\ +\n\ +If the timeout is set as:\n\ + * <= 0 then wait forever until the object is removed from the queue.\n\ + * > 0 then wait until the object is removed or for timeout seconds."); + PyDoc_STRVAR(channel_recv_doc, "channel_recv(cid) -> obj\n\ \n\ @@ -2481,6 +2544,8 @@ static PyMethodDef module_functions[] = { METH_NOARGS, channel_list_all_doc}, {"channel_send", (PyCFunction)(void(*)(void))channel_send, METH_VARARGS | METH_KEYWORDS, channel_send_doc}, + {"channel_send_wait", (PyCFunction)(void (*)(void))channel_send_wait, + METH_VARARGS | METH_KEYWORDS, channel_send_wait_doc}, {"channel_recv", (PyCFunction)(void(*)(void))channel_recv, METH_VARARGS | METH_KEYWORDS, channel_recv_doc}, {"channel_close", (PyCFunction)(void(*)(void))channel_close, From 65d741ce8398332d0a8bec4c1b3bb677193c3f07 Mon Sep 17 00:00:00 2001 From: Benjamin Edwards Date: Sun, 19 Apr 2020 11:30:04 -0700 Subject: [PATCH 2/4] Implement channel-send-wait tests --- Lib/test/test__xxsubinterpreters.py | 161 ++++++++++++++++++++++++---- Modules/_xxsubinterpretersmodule.c | 98 +++++++++++++---- 2 files changed, 216 insertions(+), 43 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index f7642e43299a69..28ebaf057071c4 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -1337,36 +1337,127 @@ def test_run_string_arg_resolved(self): self.assertEqual(obj, b'spam') self.assertEqual(out.strip(), 'send') - def test_send_wait_revc(self): + def test_channel_send_wait_same_interpreter(self): + cid = interpreters.channel_create() + + received = interpreters.channel_send_wait(cid, b"send", timeout=0) + self.assertFalse(received) + + obj = interpreters.channel_recv(cid) + self.assertEqual(obj, b"send") + + def test_channel_send_wait_different_interpreters(self): cid = interpreters.channel_create() interp = interpreters.create() + + _run_output(interp, dedent(f""" + import _xxsubinterpreters as _interpreters + import time + import math + + start = time.time() + rc = _interpreters.channel_send_wait({cid}, b"send", timeout=1) + end = time.time() + + assert not rc + assert math.floor(end-start) == 1 + """)) + + obj = interpreters.channel_recv(cid) + self.assertEqual(obj, b"send") + + def test_channel_send_wait_different_threads_and_interpreters(self): + cid = interpreters.channel_create() + interp = interpreters.create() + + thread_exc = None def run(): - _run_output(interp, dedent(f""" - import _xxsubinterpreters as _interpreters - import time - _interpreters.channel_send_wait({cid}, b"send") - #assert False - #while True: - # try: - # obj = _interpreters.channel_recv({cid}) - # break - # except _interpreters.ChannelEmptyError: - # time.sleep(0.1) - """)) + try: + out = _run_output(interp, dedent(f""" + import _xxsubinterpreters as _interpreters + import time + + rc = _interpreters.channel_send_wait({cid}, b"send") + assert rc + """)) + except Exception as e: + nonlocal thread_exc + thread_exc = e t = threading.Thread(target=run) - #import pdb; pdb.set_trace() t.start() - #interpreters.channel_send_wait(cid, b"send", timeout=1) - #import pdb; pdb.set_trace() - while True: + time.sleep(0.5) + + obj = interpreters.channel_recv(cid) + self.assertEqual(obj, b"send") + t.join() + assert thread_exc is None, f"{thread_exc}" + + def test_channel_send_wait_no_timeout(self): + cid = interpreters.channel_create() + interp = interpreters.create() + + thread_exc = None + def run(): try: - obj = interpreters.channel_recv(cid) - #import pdb; pdb.set_trace() - assert obj == b"send" - break - except interpreters.ChannelEmptyError: - time.sleep(1) + out = _run_output(interp, dedent(f""" + import _xxsubinterpreters as _interpreters + import time + + rc = _interpreters.channel_send_wait({cid}, b"send", timeout=10) + assert rc + """)) + except Exception as e: + nonlocal thread_exc + thread_exc = e + t = threading.Thread(target=run) + t.start() + time.sleep(0.5) + + obj = interpreters.channel_recv(cid) + self.assertEqual(obj, b"send") t.join() + assert thread_exc is None, f"{thread_exc}" + + def test_invalid_channel_send_wait(self): + does_not_exist_cid = 1000 + closed_cid = interpreters.channel_create() + interpreters.channel_close(closed_cid) + + with self.assertRaises(interpreters.ChannelNotFoundError): + interpreters.channel_send_wait(does_not_exist_cid, b"error") + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send_wait(closed_cid, b"error") + + #cid = interpreters.channel_create() + #interp = interpreters.create() + #def run(): + # _run_output(interp, dedent(f""" + # import _xxsubinterpreters as _interpreters + # import time + # _interpreters.channel_send_wait({cid}, b"send") + # #assert False + # #while True: + # # try: + # # obj = _interpreters.channel_recv({cid}) + # # break + # # except _interpreters.ChannelEmptyError: + # # time.sleep(0.1) + # """)) + #t = threading.Thread(target=run) + ##import pdb; pdb.set_trace() + #t.start() + ##interpreters.channel_send_wait(cid, b"send", timeout=1) + ##import pdb; pdb.set_trace() + #while True: + # try: + # obj = interpreters.channel_recv(cid) + # #import pdb; pdb.set_trace() + # assert obj == b"send" + # break + # except interpreters.ChannelEmptyError: + # time.sleep(1) + #t.join() # close @@ -1550,6 +1641,30 @@ def test_close_used_multiple_times_by_single_user(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) + def test_close_while_sender_waiting(self): + cid = interpreters.channel_create() + interp = interpreters.create() + + thread_exc = None + def run(): + try: + out = _run_output(interp, dedent(f""" + import _xxsubinterpreters as _interpreters + + rc = _interpreters.channel_send_wait({cid}, b"send") + assert rc + """)) + except Exception as e: + nonlocal thread_exc + thread_exc = e + + t = threading.Thread(target=run) + t.start() + time.sleep(0.1) + interpreters.channel_close(cid, force=True) + t.join() + assert thread_exc is None, f"{thread_exc}" + class ChannelReleaseTests(TestBase): diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index 898f9717dff601..583349b27d6cab 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -347,6 +347,13 @@ channel_exceptions_init(PyObject *ns) return 0; } +//typedef struct _channelitem_track { +// PyThread_type_lock lock; +// int lock_has_been_freed; +// PyThread_type_lock mutex; +// int released_by_channel_closing; +//} + /* the channel queue */ struct _channelitem; @@ -354,7 +361,7 @@ struct _channelitem; typedef struct _channelitem { _PyCrossInterpreterData *data; /* The lock is owned by the sender. */ - PyThread_type_lock lock; + void *lock; struct _channelitem *next; } _channelitem; @@ -389,6 +396,61 @@ _channelitem_free(_channelitem *item) PyMem_Free(item); } +static void * +_channelitem_lock_allocate(void) +{ + PyThread_type_lock *lock = calloc(sizeof(*lock), 0); + *lock = PyThread_allocate_lock(); + assert(*lock != NULL); + + return lock; +} + +static void +_channelitem_lock_acquire(void *channelitem_lock) +{ + PyThread_type_lock *lock = (PyThread_type_lock *)channelitem_lock; + + assert(lock != NULL && *lock != NULL); + PyThread_acquire_lock(*lock, WAIT_LOCK); +} + +static PyLockStatus +_channelitem_lock_acquire_timed(void * channelitem_lock, int timeout) +{ + PyThread_type_lock *lock = (PyThread_type_lock *)channelitem_lock; + + assert(lock != NULL && *lock != NULL); + PyLockStatus lock_rc; + Py_BEGIN_ALLOW_THREADS + lock_rc = PyThread_acquire_lock_timed(*lock, timeout, 0); + Py_END_ALLOW_THREADS + + PyThread_free_lock(*lock); + *lock = NULL; + + if (lock_rc == PY_LOCK_ACQUIRED) { + free(lock); + lock = NULL; + } + + return lock_rc; +} + +static void +_channelitem_lock_release(void *channelitem_lock) +{ + PyThread_type_lock *lock = (PyThread_type_lock *)channelitem_lock; + + assert(lock != NULL); + if (*lock != NULL) { + PyThread_release_lock(*lock); + } else { + free(lock); + lock = NULL; + } +} + static void _channelitem_free_all(_channelitem *item) { @@ -396,7 +458,7 @@ _channelitem_free_all(_channelitem *item) _channelitem *last = item; item = item->next; if (last->lock != NULL) { - PyThread_release_lock(last->lock); + _channelitem_lock_release(last->lock); } _channelitem_free(last); } @@ -408,7 +470,7 @@ _channelitem_popped(_channelitem *item) _PyCrossInterpreterData *data = item->data; item->data = NULL; if (item->lock != NULL) { - PyThread_release_lock(item->lock); + _channelitem_lock_release(item->lock); } _channelitem_free(item); return data; @@ -452,16 +514,16 @@ _channelqueue_free(_channelqueue *queue) static int _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data, - PyThread_type_lock lock) + void *channelitem_lock) { _channelitem *item = _channelitem_new(); if (item == NULL) { return -1; } item->data = data; - item->lock = lock; - if (lock != NULL) { - PyThread_acquire_lock(item->lock, WAIT_LOCK); + item->lock = channelitem_lock; + if (item->lock != NULL) { + _channelitem_lock_acquire(item->lock); } queue->count += 1; @@ -774,7 +836,7 @@ _channel_free(_PyChannelState *chan) static int _channel_add(_PyChannelState *chan, int64_t interp, - _PyCrossInterpreterData *data, PyThread_type_lock lock) + _PyCrossInterpreterData *data, void *channelitem_lock) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -787,7 +849,7 @@ _channel_add(_PyChannelState *chan, int64_t interp, goto done; } - if (_channelqueue_put(chan->queue, data, lock) != 0) { + if (_channelqueue_put(chan->queue, data, channelitem_lock) != 0) { goto done; } @@ -1299,7 +1361,7 @@ _channel_destroy(_channels *channels, int64_t id) static int _channel_send(_channels *channels, int64_t id, PyObject *obj, - PyThread_type_lock lock) + void *channelitem_lock) { PyInterpreterState *interp = _get_current(); if (interp == NULL) { @@ -1333,7 +1395,7 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj, } // Add the data to the channel. - int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, lock); + int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, channelitem_lock); PyThread_release_lock(mutex); if (res != 0) { _PyCrossInterpreterData_Release(data); @@ -2381,32 +2443,28 @@ channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds) static char *kwlist[] = {"cid", "obj", "timeout", NULL}; int64_t cid; PyObject *obj; - int64_t timeout = 0; + int64_t timeout = -1; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|l:channel_send_wait", kwlist, channel_id_converter, - &cid, &obj)) { + &cid, &obj, &timeout)) { return NULL; } // Create the lock that will be released when the data is recieved. - PyThread_type_lock lock = PyThread_allocate_lock(); + void *lock = _channelitem_lock_allocate(); if (_channel_send(&_globals.channels, cid, obj, lock) != 0) { return NULL; } long long microseconds; - if (timeout > 0) { + if (timeout >= 0) { microseconds = timeout * 1000000; } else { microseconds = -1; } - PyLockStatus lock_rc; - Py_BEGIN_ALLOW_THREADS - lock_rc = PyThread_acquire_lock_timed(lock, microseconds, 0); - Py_END_ALLOW_THREADS + PyLockStatus lock_rc = _channelitem_lock_acquire_timed(lock, microseconds); - PyThread_free_lock(lock); if (lock_rc == PY_LOCK_ACQUIRED) { Py_RETURN_TRUE; } From 93235257c10b414becf5e8b5b1e92f40adaa2e47 Mon Sep 17 00:00:00 2001 From: Benjamin Edwards Date: Sun, 19 Apr 2020 12:35:06 -0700 Subject: [PATCH 3/4] Make thread safe --- Lib/test/test__xxsubinterpreters.py | 2 +- Modules/_xxsubinterpretersmodule.c | 175 ++++++++++++++++------------ 2 files changed, 99 insertions(+), 78 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 28ebaf057071c4..e23cd0f370fd3e 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -1652,7 +1652,7 @@ def run(): import _xxsubinterpreters as _interpreters rc = _interpreters.channel_send_wait({cid}, b"send") - assert rc + assert not rc """)) except Exception as e: nonlocal thread_exc diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index 583349b27d6cab..8b7f4b29bcb060 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -347,12 +347,88 @@ channel_exceptions_init(PyObject *ns) return 0; } -//typedef struct _channelitem_track { -// PyThread_type_lock lock; -// int lock_has_been_freed; -// PyThread_type_lock mutex; -// int released_by_channel_closing; -//} +typedef struct _channelitem_tracker { + PyThread_type_lock lock; + PyThread_type_lock mutex; + int lock_has_been_freed; + int received; +} _channelitem_tracker; + +static void * +_channelitem_tracker_new(void) +{ + _channelitem_tracker *tracker = calloc(sizeof(*tracker), 0); + tracker->lock = PyThread_allocate_lock(); + tracker->mutex = PyThread_allocate_lock(); + tracker->lock_has_been_freed = 0; + tracker->received = 0; + + return tracker; +} + +static void +_channelitem_tracker_added_to_queue(void *tracker) +{ + _channelitem_tracker *track = (_channelitem_tracker *)tracker; + + assert(track != NULL); + PyThread_acquire_lock(track->lock, WAIT_LOCK); +} + +static void +_channelitem_tracker_removed_from_queue(void *tracker, int received) +{ + _channelitem_tracker *track = (_channelitem_tracker *)tracker; + + assert(track != NULL); + PyThread_type_lock mutex = track->mutex; + PyThread_acquire_lock(mutex, WAIT_LOCK); + if (track->lock_has_been_freed == 0) { + PyThread_release_lock(track->lock); + track->received = received; + } else { + free(track); + track = NULL; + } + PyThread_release_lock(mutex); + + if (track == NULL) { + PyThread_free_lock(mutex); + } +} + +static PyLockStatus +_channelitem_tracker_wait_till_recieved(void *tracker, int timeout) +{ + _channelitem_tracker *track = (_channelitem_tracker *)tracker; + + assert(track != NULL); + PyLockStatus lock_rc; + Py_BEGIN_ALLOW_THREADS + lock_rc = PyThread_acquire_lock_timed(track->lock, timeout, 0); + Py_END_ALLOW_THREADS + + PyThread_type_lock mutex = track->mutex; + PyThread_acquire_lock(mutex, WAIT_LOCK); + PyThread_free_lock(track->lock); + track->lock_has_been_freed = 1; + + if (lock_rc == PY_LOCK_ACQUIRED) { + if (track->received == 0) { + lock_rc = PY_LOCK_FAILURE; + } + + free(track); + track = NULL; + } + PyThread_release_lock(mutex); + + if (track == NULL) { + PyThread_free_lock(mutex); + } + + return lock_rc; +} /* the channel queue */ @@ -361,7 +437,7 @@ struct _channelitem; typedef struct _channelitem { _PyCrossInterpreterData *data; /* The lock is owned by the sender. */ - void *lock; + void *tracker; struct _channelitem *next; } _channelitem; @@ -396,69 +472,14 @@ _channelitem_free(_channelitem *item) PyMem_Free(item); } -static void * -_channelitem_lock_allocate(void) -{ - PyThread_type_lock *lock = calloc(sizeof(*lock), 0); - *lock = PyThread_allocate_lock(); - assert(*lock != NULL); - - return lock; -} - -static void -_channelitem_lock_acquire(void *channelitem_lock) -{ - PyThread_type_lock *lock = (PyThread_type_lock *)channelitem_lock; - - assert(lock != NULL && *lock != NULL); - PyThread_acquire_lock(*lock, WAIT_LOCK); -} - -static PyLockStatus -_channelitem_lock_acquire_timed(void * channelitem_lock, int timeout) -{ - PyThread_type_lock *lock = (PyThread_type_lock *)channelitem_lock; - - assert(lock != NULL && *lock != NULL); - PyLockStatus lock_rc; - Py_BEGIN_ALLOW_THREADS - lock_rc = PyThread_acquire_lock_timed(*lock, timeout, 0); - Py_END_ALLOW_THREADS - - PyThread_free_lock(*lock); - *lock = NULL; - - if (lock_rc == PY_LOCK_ACQUIRED) { - free(lock); - lock = NULL; - } - - return lock_rc; -} - -static void -_channelitem_lock_release(void *channelitem_lock) -{ - PyThread_type_lock *lock = (PyThread_type_lock *)channelitem_lock; - - assert(lock != NULL); - if (*lock != NULL) { - PyThread_release_lock(*lock); - } else { - free(lock); - lock = NULL; - } -} - static void _channelitem_free_all(_channelitem *item) { while (item != NULL) { _channelitem *last = item; item = item->next; - if (last->lock != NULL) { - _channelitem_lock_release(last->lock); + if (last->tracker != NULL) { + _channelitem_tracker_removed_from_queue(last->tracker, 0); } _channelitem_free(last); } @@ -469,8 +490,8 @@ _channelitem_popped(_channelitem *item) { _PyCrossInterpreterData *data = item->data; item->data = NULL; - if (item->lock != NULL) { - _channelitem_lock_release(item->lock); + if (item->tracker != NULL) { + _channelitem_tracker_removed_from_queue(item->tracker, 1); } _channelitem_free(item); return data; @@ -514,16 +535,16 @@ _channelqueue_free(_channelqueue *queue) static int _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data, - void *channelitem_lock) + void *tracker) { _channelitem *item = _channelitem_new(); if (item == NULL) { return -1; } item->data = data; - item->lock = channelitem_lock; - if (item->lock != NULL) { - _channelitem_lock_acquire(item->lock); + item->tracker = tracker; + if (tracker != NULL) { + _channelitem_tracker_added_to_queue(item->tracker); } queue->count += 1; @@ -836,7 +857,7 @@ _channel_free(_PyChannelState *chan) static int _channel_add(_PyChannelState *chan, int64_t interp, - _PyCrossInterpreterData *data, void *channelitem_lock) + _PyCrossInterpreterData *data, void *tracker) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -849,7 +870,7 @@ _channel_add(_PyChannelState *chan, int64_t interp, goto done; } - if (_channelqueue_put(chan->queue, data, channelitem_lock) != 0) { + if (_channelqueue_put(chan->queue, data, tracker) != 0) { goto done; } @@ -1361,7 +1382,7 @@ _channel_destroy(_channels *channels, int64_t id) static int _channel_send(_channels *channels, int64_t id, PyObject *obj, - void *channelitem_lock) + void *tracker) { PyInterpreterState *interp = _get_current(); if (interp == NULL) { @@ -1395,7 +1416,7 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj, } // Add the data to the channel. - int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, channelitem_lock); + int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, tracker); PyThread_release_lock(mutex); if (res != 0) { _PyCrossInterpreterData_Release(data); @@ -2451,8 +2472,8 @@ channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds) } // Create the lock that will be released when the data is recieved. - void *lock = _channelitem_lock_allocate(); - if (_channel_send(&_globals.channels, cid, obj, lock) != 0) { + void *tracker = _channelitem_tracker_new(); + if (_channel_send(&_globals.channels, cid, obj, tracker) != 0) { return NULL; } @@ -2463,7 +2484,7 @@ channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds) else { microseconds = -1; } - PyLockStatus lock_rc = _channelitem_lock_acquire_timed(lock, microseconds); + PyLockStatus lock_rc = _channelitem_tracker_wait_till_recieved(tracker, microseconds); if (lock_rc == PY_LOCK_ACQUIRED) { Py_RETURN_TRUE; From 82f4f60b3f08d030d1f9fd47a7868628292e3b04 Mon Sep 17 00:00:00 2001 From: Benjamin Edwards Date: Sat, 25 Apr 2020 12:13:04 -0700 Subject: [PATCH 4/4] Add helpful comments --- Lib/test/test__xxsubinterpreters.py | 31 ----- Modules/_xxsubinterpretersmodule.c | 207 ++++++++++++++++++---------- 2 files changed, 137 insertions(+), 101 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index e23cd0f370fd3e..0bec9f9e3b3050 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -1349,7 +1349,6 @@ def test_channel_send_wait_same_interpreter(self): def test_channel_send_wait_different_interpreters(self): cid = interpreters.channel_create() interp = interpreters.create() - _run_output(interp, dedent(f""" import _xxsubinterpreters as _interpreters import time @@ -1429,36 +1428,6 @@ def test_invalid_channel_send_wait(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_send_wait(closed_cid, b"error") - #cid = interpreters.channel_create() - #interp = interpreters.create() - #def run(): - # _run_output(interp, dedent(f""" - # import _xxsubinterpreters as _interpreters - # import time - # _interpreters.channel_send_wait({cid}, b"send") - # #assert False - # #while True: - # # try: - # # obj = _interpreters.channel_recv({cid}) - # # break - # # except _interpreters.ChannelEmptyError: - # # time.sleep(0.1) - # """)) - #t = threading.Thread(target=run) - ##import pdb; pdb.set_trace() - #t.start() - ##interpreters.channel_send_wait(cid, b"send", timeout=1) - ##import pdb; pdb.set_trace() - #while True: - # try: - # obj = interpreters.channel_recv(cid) - # #import pdb; pdb.set_trace() - # assert obj == b"send" - # break - # except interpreters.ChannelEmptyError: - # time.sleep(1) - #t.join() - # close def test_close_single_user(self): diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index 8b7f4b29bcb060..5153cf27ea25d9 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -347,83 +347,151 @@ channel_exceptions_init(PyObject *ns) return 0; } -typedef struct _channelitem_tracker { +typedef struct _channelitem_wait_lock { PyThread_type_lock lock; PyThread_type_lock mutex; - int lock_has_been_freed; - int received; -} _channelitem_tracker; - + int lock_is_allocated; + int is_sent; + int is_recv; +} _channelitem_wait_lock; + +/** + * Allocate a new wait lock for a channel item. + * + * This function allocates memory that must be freed by calling + * _channelitem_wait_lock_recv and _channelitem_wait_lock_wait. + */ static void * -_channelitem_tracker_new(void) -{ - _channelitem_tracker *tracker = calloc(sizeof(*tracker), 0); - tracker->lock = PyThread_allocate_lock(); - tracker->mutex = PyThread_allocate_lock(); - tracker->lock_has_been_freed = 0; - tracker->received = 0; - - return tracker; -} - +_channelitem_wait_lock_new(void) +{ + _channelitem_wait_lock *wait_lock = calloc(1, sizeof(*wait_lock)); + wait_lock->lock = PyThread_allocate_lock(); + wait_lock->mutex = PyThread_allocate_lock(); + wait_lock->lock_is_allocated = 1; + wait_lock->is_sent = 0; + wait_lock->is_recv = 0; + + return wait_lock; +} + +/** + * To update the wait lock's state when the channel item is sent by an + * interpreter. + * This function must be called by the interpeter sending the message. + * + * NOTE: This must be called before _channelitem_wait_lock_wait or + * _channelitem_wait_lock_recv can be called. + */ static void -_channelitem_tracker_added_to_queue(void *tracker) -{ - _channelitem_tracker *track = (_channelitem_tracker *)tracker; - - assert(track != NULL); - PyThread_acquire_lock(track->lock, WAIT_LOCK); -} - +_channelitem_wait_lock_sent(_channelitem_wait_lock *wait_lock) +{ + assert(wait_lock != NULL); + assert(wait_lock->is_sent == 0); + PyThread_acquire_lock(wait_lock->lock, WAIT_LOCK); + wait_lock->is_sent = 1; +} + +/** + * To update the wait lock's state when the channel item is removed from the + * channel's queue. + * This function must be called by the interpeter receiving the message. + * + * If received is: + * - 1: then the channel item was received by an interpreter. + * - 0: then the channel item was removed from the queue, or the queue was + * closed, before it could be received. + * + * If this function is called after _channelitem_wait_lock_wait has timed out + * then it is responsible for freeing the wait lock. + * + * NOTE: This must be called after _channelitem_wait_lock_sent has been called. + */ static void -_channelitem_tracker_removed_from_queue(void *tracker, int received) +_channelitem_wait_lock_recv(_channelitem_wait_lock *wait_lock, int received) { - _channelitem_tracker *track = (_channelitem_tracker *)tracker; + assert(wait_lock != NULL); + assert(wait_lock->is_sent == 1); + PyThread_type_lock mutex = wait_lock->mutex; - assert(track != NULL); - PyThread_type_lock mutex = track->mutex; PyThread_acquire_lock(mutex, WAIT_LOCK); - if (track->lock_has_been_freed == 0) { - PyThread_release_lock(track->lock); - track->received = received; + /* + * If the lock is still allocated it means that _channelitem_wait_lock_wait + * has not timed out. + */ + if (wait_lock->lock_is_allocated == 1) { + PyThread_release_lock(wait_lock->lock); + wait_lock->is_recv = received; + } else { - free(track); - track = NULL; + free(wait_lock); + wait_lock = NULL; } PyThread_release_lock(mutex); - if (track == NULL) { + if (wait_lock == NULL) { PyThread_free_lock(mutex); } } +/** + * Will wait until the channel item with the wait_lock has been revieved or + * for timeout microseconds. + * This function must be called by the interpeter sending the message. + * + * If timeout is: + * - < 0: then wait until the channelitem has been received. + * - >= 0: then wait until the channelitem has been received or timeout + * microseconds, whichever comes first. + * + * If channel item is received before this function has timed out then it is + * responsible for freeing the wait lock. + * + * This function returns: + * - PY_LOCK_ACQUIRED: The channel item was received by an iterpreter before + * this function timed out. + * - PY_LOCK_FAILURE: The channel item was not received by an interpreter + * before this function timed out. + * + * NOTE: This must be called after _channelitem_wait_lock_sent has been called. + */ static PyLockStatus -_channelitem_tracker_wait_till_recieved(void *tracker, int timeout) +_channelitem_wait_lock_wait(_channelitem_wait_lock *wait_lock, int timeout) { - _channelitem_tracker *track = (_channelitem_tracker *)tracker; - - assert(track != NULL); + assert(wait_lock != NULL); + assert(wait_lock->is_sent == 1); PyLockStatus lock_rc; Py_BEGIN_ALLOW_THREADS - lock_rc = PyThread_acquire_lock_timed(track->lock, timeout, 0); + lock_rc = PyThread_acquire_lock_timed(wait_lock->lock, timeout, 0); Py_END_ALLOW_THREADS - PyThread_type_lock mutex = track->mutex; + if (lock_rc == PY_LOCK_INTR) { + lock_rc = PY_LOCK_FAILURE; + } + + PyThread_type_lock mutex = wait_lock->mutex; PyThread_acquire_lock(mutex, WAIT_LOCK); - PyThread_free_lock(track->lock); - track->lock_has_been_freed = 1; + PyThread_free_lock(wait_lock->lock); + wait_lock->lock_is_allocated = 0; + /* + * If lock_rc is PY_LOCK_ACQUIRED then _channelitem_wait_lock_recv must + * have already been called. + */ if (lock_rc == PY_LOCK_ACQUIRED) { - if (track->received == 0) { + if (wait_lock->is_recv == 0) { + /* + * The channel item was removed from the queue but not by an + * interpreter. + */ lock_rc = PY_LOCK_FAILURE; } - free(track); - track = NULL; + free(wait_lock); + wait_lock = NULL; } PyThread_release_lock(mutex); - if (track == NULL) { + if (wait_lock == NULL) { PyThread_free_lock(mutex); } @@ -437,7 +505,7 @@ struct _channelitem; typedef struct _channelitem { _PyCrossInterpreterData *data; /* The lock is owned by the sender. */ - void *tracker; + void *wait_lock; struct _channelitem *next; } _channelitem; @@ -478,8 +546,8 @@ _channelitem_free_all(_channelitem *item) while (item != NULL) { _channelitem *last = item; item = item->next; - if (last->tracker != NULL) { - _channelitem_tracker_removed_from_queue(last->tracker, 0); + if (last->wait_lock != NULL) { + _channelitem_wait_lock_recv(last->wait_lock, 0); } _channelitem_free(last); } @@ -490,8 +558,8 @@ _channelitem_popped(_channelitem *item) { _PyCrossInterpreterData *data = item->data; item->data = NULL; - if (item->tracker != NULL) { - _channelitem_tracker_removed_from_queue(item->tracker, 1); + if (item->wait_lock != NULL) { + _channelitem_wait_lock_recv(item->wait_lock, 1); } _channelitem_free(item); return data; @@ -535,16 +603,16 @@ _channelqueue_free(_channelqueue *queue) static int _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data, - void *tracker) + void *wait_lock) { _channelitem *item = _channelitem_new(); if (item == NULL) { return -1; } item->data = data; - item->tracker = tracker; - if (tracker != NULL) { - _channelitem_tracker_added_to_queue(item->tracker); + item->wait_lock = wait_lock; + if (wait_lock != NULL) { + _channelitem_wait_lock_sent(item->wait_lock); } queue->count += 1; @@ -857,7 +925,7 @@ _channel_free(_PyChannelState *chan) static int _channel_add(_PyChannelState *chan, int64_t interp, - _PyCrossInterpreterData *data, void *tracker) + _PyCrossInterpreterData *data, void *wait_lock) { int res = -1; PyThread_acquire_lock(chan->mutex, WAIT_LOCK); @@ -870,7 +938,7 @@ _channel_add(_PyChannelState *chan, int64_t interp, goto done; } - if (_channelqueue_put(chan->queue, data, tracker) != 0) { + if (_channelqueue_put(chan->queue, data, wait_lock) != 0) { goto done; } @@ -1382,7 +1450,7 @@ _channel_destroy(_channels *channels, int64_t id) static int _channel_send(_channels *channels, int64_t id, PyObject *obj, - void *tracker) + void *wait_lock) { PyInterpreterState *interp = _get_current(); if (interp == NULL) { @@ -1416,7 +1484,7 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj, } // Add the data to the channel. - int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, tracker); + int res = _channel_add(chan, PyInterpreterState_GetID(interp), data, wait_lock); PyThread_release_lock(mutex); if (res != 0) { _PyCrossInterpreterData_Release(data); @@ -2458,6 +2526,11 @@ channel_recv(PyObject *self, PyObject *args, PyObject *kwds) return _channel_recv(&_globals.channels, cid); } +PyDoc_STRVAR(channel_recv_doc, +"channel_recv(cid) -> obj\n\ +\n\ +Return a new object from the data at the from of the channel's queue."); + static PyObject * channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds) { @@ -2471,9 +2544,8 @@ channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } - // Create the lock that will be released when the data is recieved. - void *tracker = _channelitem_tracker_new(); - if (_channel_send(&_globals.channels, cid, obj, tracker) != 0) { + void *wait_lock = _channelitem_wait_lock_new(); + if (_channel_send(&_globals.channels, cid, obj, wait_lock) != 0) { return NULL; } @@ -2484,7 +2556,7 @@ channel_send_wait(PyObject *self, PyObject *args, PyObject *kwds) else { microseconds = -1; } - PyLockStatus lock_rc = _channelitem_tracker_wait_till_recieved(tracker, microseconds); + PyLockStatus lock_rc = _channelitem_wait_lock_wait(wait_lock, microseconds); if (lock_rc == PY_LOCK_ACQUIRED) { Py_RETURN_TRUE; @@ -2500,13 +2572,8 @@ PyDoc_STRVAR(channel_send_wait_doc, Add the object's data to the channel's queue and wait until it's removed.\n\ \n\ If the timeout is set as:\n\ - * <= 0 then wait forever until the object is removed from the queue.\n\ - * > 0 then wait until the object is removed or for timeout seconds."); - -PyDoc_STRVAR(channel_recv_doc, -"channel_recv(cid) -> obj\n\ -\n\ -Return a new object from the data at the from of the channel's queue."); + * < 0 then wait forever until the object is removed from the queue.\n\ + * >= 0 then wait until the object is removed or for timeout seconds."); static PyObject * channel_close(PyObject *self, PyObject *args, PyObject *kwds)