Skip to content

Commit 15e010d

Browse files
hanxiaojina-bot
andauthored
feat(flow): block() now waits for specific threading event (#3790)
* feat(flow): block() now waits for specific threading event * style: fix overload and cli autocomplete Co-authored-by: Jina Dev Bot <dev-bot@jina.ai>
1 parent 7123ce2 commit 15e010d

File tree

4 files changed

+88
-5
lines changed

4 files changed

+88
-5
lines changed

docs/fundamentals/flow/flow-api.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,28 @@ prevent that, use `.block()` to suspend the current process.
3939
with f:
4040
f.block() # block the current process
4141
```
42+
43+
To terminate a blocked Flow, one can send use `threading.Event` or `multiprocessing.Event` to send the terminate signal.
44+
The following example terminates a Flow from another process.
45+
46+
```python
47+
import multiprocessing
48+
import time
49+
50+
from jina import Flow
51+
52+
ev = multiprocessing.Event()
53+
54+
def close_after_5s():
55+
time.sleep(5)
56+
ev.set()
57+
58+
f = Flow().add()
59+
with f:
60+
t = multiprocessing.Process(target=close_after_5s)
61+
t.start()
62+
f.block(stop_event=ev)
63+
```
4264
````
4365
## Visualize a Flow
4466

jina/flow/base.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import copy
44
import itertools
55
import json
6+
import multiprocessing
67
import os
78
import re
89
import sys
@@ -1117,6 +1118,9 @@ def __enter__(self):
11171118
return self.start()
11181119

11191120
def __exit__(self, exc_type, exc_val, exc_tb):
1121+
if hasattr(self, '_stop_event'):
1122+
self._stop_event.set()
1123+
11201124
super().__exit__(exc_type, exc_val, exc_tb)
11211125

11221126
# unset all envs to avoid any side-effect
@@ -1166,7 +1170,7 @@ def start(self):
11661170

11671171
return self
11681172

1169-
def _wait_until_all_ready(self) -> bool:
1173+
def _wait_until_all_ready(self):
11701174
results = {}
11711175
threads = []
11721176

@@ -1578,10 +1582,22 @@ def _get_address_table(self, address_table):
15781582
)
15791583
return address_table
15801584

1581-
def block(self):
1582-
"""Block the process until user hits KeyboardInterrupt"""
1585+
def block(
1586+
self, stop_event: Optional[Union[threading.Event, multiprocessing.Event]] = None
1587+
):
1588+
"""Block the Flow until `stop_event` is set or user hits KeyboardInterrupt
1589+
1590+
:param stop_event: a threading event or a multiprocessing event that onces set will resume the control Flow
1591+
to main thread.
1592+
"""
15831593
try:
1584-
threading.Event().wait()
1594+
if stop_event is None:
1595+
self._stop_event = (
1596+
threading.Event()
1597+
) #: this allows `.close` to close the Flow from another thread/proc
1598+
self._stop_event.wait()
1599+
else:
1600+
stop_event.wait()
15851601
except KeyboardInterrupt:
15861602
pass
15871603

jina/types/document/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ def __init__(
161161
):
162162
"""
163163
:param adjacency: the adjacency of this Document
164-
:param blob: the blob content of this Document
164+
:param blob: the blob content of thi Document
165165
:param buffer: the buffer bytes from this document
166166
:param chunks: the array of chunks of this document
167167
:param content: the value of the content depending on `:meth:`content_type`
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import multiprocessing
2+
import threading
3+
import time
4+
5+
from jina import Flow
6+
7+
8+
def test_closing_blocked_flow_from_another_thread_via_flow():
9+
def close_blocked_f(flow):
10+
time.sleep(3)
11+
flow.close()
12+
13+
f = Flow().add()
14+
with f:
15+
t = threading.Thread(target=close_blocked_f, args=(f,))
16+
t.start()
17+
f.block()
18+
19+
20+
def test_closing_blocked_flow_from_another_thread_via_event():
21+
ev = threading.Event()
22+
23+
def close_blocked_f():
24+
time.sleep(3)
25+
ev.set()
26+
27+
f = Flow().add()
28+
with f:
29+
t = threading.Thread(target=close_blocked_f)
30+
t.start()
31+
f.block(stop_event=ev)
32+
33+
34+
def test_closing_blocked_flow_from_another_process_via_event():
35+
ev = multiprocessing.Event()
36+
37+
def close_blocked_f():
38+
time.sleep(3)
39+
ev.set()
40+
41+
f = Flow().add()
42+
with f:
43+
t = multiprocessing.Process(target=close_blocked_f)
44+
t.start()
45+
f.block(stop_event=ev)

0 commit comments

Comments
 (0)