-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
bpo-40390: Implement channel_send_wait for subinterpreters #19715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -1337,6 +1337,97 @@ def test_run_string_arg_resolved(self): | |||||
self.assertEqual(obj, b'spam') | ||||||
self.assertEqual(out.strip(), 'send') | ||||||
|
||||||
def test_channel_send_wait_same_interpreter(self): | ||||||
cid = interpreters.channel_create() | ||||||
|
||||||
received = interpreters.channel_send_wait(cid, b"send", timeout=0) | ||||||
self.assertFalse(received) | ||||||
|
||||||
obj = interpreters.channel_recv(cid) | ||||||
self.assertEqual(obj, b"send") | ||||||
|
||||||
def test_channel_send_wait_different_interpreters(self): | ||||||
cid = interpreters.channel_create() | ||||||
interp = interpreters.create() | ||||||
_run_output(interp, dedent(f""" | ||||||
import _xxsubinterpreters as _interpreters | ||||||
import time | ||||||
import math | ||||||
|
||||||
start = time.time() | ||||||
rc = _interpreters.channel_send_wait({cid}, b"send", timeout=1) | ||||||
end = time.time() | ||||||
|
||||||
assert not rc | ||||||
assert math.floor(end-start) == 1 | ||||||
""")) | ||||||
|
||||||
obj = interpreters.channel_recv(cid) | ||||||
self.assertEqual(obj, b"send") | ||||||
|
||||||
def test_channel_send_wait_different_threads_and_interpreters(self): | ||||||
cid = interpreters.channel_create() | ||||||
interp = interpreters.create() | ||||||
|
||||||
thread_exc = None | ||||||
def run(): | ||||||
try: | ||||||
out = _run_output(interp, dedent(f""" | ||||||
import _xxsubinterpreters as _interpreters | ||||||
import time | ||||||
|
||||||
rc = _interpreters.channel_send_wait({cid}, b"send") | ||||||
assert rc | ||||||
""")) | ||||||
except Exception as e: | ||||||
nonlocal thread_exc | ||||||
thread_exc = e | ||||||
t = threading.Thread(target=run) | ||||||
t.start() | ||||||
time.sleep(0.5) | ||||||
|
||||||
obj = interpreters.channel_recv(cid) | ||||||
self.assertEqual(obj, b"send") | ||||||
t.join() | ||||||
assert thread_exc is None, f"{thread_exc}" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
def test_channel_send_wait_no_timeout(self): | ||||||
cid = interpreters.channel_create() | ||||||
interp = interpreters.create() | ||||||
|
||||||
thread_exc = None | ||||||
def run(): | ||||||
try: | ||||||
out = _run_output(interp, dedent(f""" | ||||||
import _xxsubinterpreters as _interpreters | ||||||
import time | ||||||
|
||||||
rc = _interpreters.channel_send_wait({cid}, b"send", timeout=10) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test function says "no_timeout". Did you mean "with_timeout"? "timeout_not_hit"? |
||||||
assert rc | ||||||
""")) | ||||||
except Exception as e: | ||||||
nonlocal thread_exc | ||||||
thread_exc = e | ||||||
t = threading.Thread(target=run) | ||||||
t.start() | ||||||
time.sleep(0.5) | ||||||
|
||||||
obj = interpreters.channel_recv(cid) | ||||||
self.assertEqual(obj, b"send") | ||||||
t.join() | ||||||
assert thread_exc is None, f"{thread_exc}" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
def test_invalid_channel_send_wait(self): | ||||||
does_not_exist_cid = 1000 | ||||||
closed_cid = interpreters.channel_create() | ||||||
interpreters.channel_close(closed_cid) | ||||||
|
||||||
with self.assertRaises(interpreters.ChannelNotFoundError): | ||||||
interpreters.channel_send_wait(does_not_exist_cid, b"error") | ||||||
|
||||||
with self.assertRaises(interpreters.ChannelClosedError): | ||||||
interpreters.channel_send_wait(closed_cid, b"error") | ||||||
|
||||||
# close | ||||||
|
||||||
def test_close_single_user(self): | ||||||
|
@@ -1519,6 +1610,30 @@ def test_close_used_multiple_times_by_single_user(self): | |||||
with self.assertRaises(interpreters.ChannelClosedError): | ||||||
interpreters.channel_recv(cid) | ||||||
|
||||||
def test_close_while_sender_waiting(self): | ||||||
cid = interpreters.channel_create() | ||||||
interp = interpreters.create() | ||||||
|
||||||
thread_exc = None | ||||||
def run(): | ||||||
try: | ||||||
out = _run_output(interp, dedent(f""" | ||||||
import _xxsubinterpreters as _interpreters | ||||||
|
||||||
rc = _interpreters.channel_send_wait({cid}, b"send") | ||||||
assert not rc | ||||||
""")) | ||||||
except Exception as e: | ||||||
nonlocal thread_exc | ||||||
thread_exc = e | ||||||
|
||||||
t = threading.Thread(target=run) | ||||||
t.start() | ||||||
time.sleep(0.1) | ||||||
interpreters.channel_close(cid, force=True) | ||||||
t.join() | ||||||
assert thread_exc is None, f"{thread_exc}" | ||||||
|
||||||
|
||||||
class ChannelReleaseTests(TestBase): | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be worth making it clear (e.g. in a comment) that we expect this to time out (since it is happening in the same thread where we expect
recv()
to be called later).