diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b079e60b..21e498132 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,6 +7,10 @@ on: - "releases/*" jobs: + + nightly: + uses: ./.github/workflows/run-bench.yml + # Build and test the project build-lint-test: strategy: diff --git a/.github/workflows/run-bench.yml b/.github/workflows/run-bench.yml index 6dcad63e8..0cecd6166 100644 --- a/.github/workflows/run-bench.yml +++ b/.github/workflows/run-bench.yml @@ -55,17 +55,3 @@ jobs: - run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - run: poe run-bench --workflow-count 100 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} - - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 1000 --max-cached-workflows 100 --max-concurrent 100 ${{ inputs.sandbox-arg }} - - - run: poe run-bench --workflow-count 10000 --max-cached-workflows 10000 --max-concurrent 10000 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 10000 --max-cached-workflows 10000 --max-concurrent 10000 ${{ inputs.sandbox-arg }} - - - run: poe run-bench --workflow-count 10000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} - - run: poe run-bench --workflow-count 10000 --max-cached-workflows 1000 --max-concurrent 1000 ${{ inputs.sandbox-arg }} \ No newline at end of file diff --git a/scripts/run_bench.py b/scripts/run_bench.py index decbe4810..90a1db80c 100644 --- a/scripts/run_bench.py +++ b/scripts/run_bench.py @@ -31,6 +31,15 @@ async def bench_activity(name: str) -> str: return f"Hello, {name}!" +@workflow.defn +class DeadlockInterruptibleWorkflow: + @workflow.run + async def run(self) -> None: + # Infinite loop, which is interruptible via PyThreadState_SetAsyncExc + while True: + pass + + async def main(): logging.basicConfig( format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s", @@ -61,7 +70,7 @@ async def report_mem(): nonlocal max_mem while True: try: - await asyncio.sleep(0.8) + await asyncio.sleep(0.1) finally: # TODO(cretz): "vms" appears more accurate on Windows, but # rss is more accurate on Linux @@ -86,6 +95,13 @@ async def report_mem(): logger.debug("Starting %s workflows", args.workflow_count) pre_start_seconds = time.monotonic() handles = [ + await env.client.start_workflow( + DeadlockInterruptibleWorkflow.run, + id=f"deadlock-interruptible-workflow-{i}-{uuid.uuid4()}", + task_queue=task_queue, + ) + for i in range(1) + ] + [ await env.client.start_workflow( BenchWorkflow.run, f"user-{i}", @@ -101,7 +117,7 @@ async def report_mem(): async with Worker( env.client, task_queue=task_queue, - workflows=[BenchWorkflow], + workflows=[BenchWorkflow, DeadlockInterruptibleWorkflow], activities=[bench_activity], workflow_runner=SandboxedWorkflowRunner() if args.sandbox diff --git a/temporalio/testing/_workflow.py b/temporalio/testing/_workflow.py index 85c5404ea..2b2104dc9 100644 --- a/temporalio/testing/_workflow.py +++ b/temporalio/testing/_workflow.py @@ -89,7 +89,7 @@ async def start_local( ip: str = "127.0.0.1", port: Optional[int] = None, download_dest_dir: Optional[str] = None, - ui: bool = False, + ui: bool = True, runtime: Optional[temporalio.runtime.Runtime] = None, search_attributes: Sequence[temporalio.common.SearchAttributeKey] = (), dev_server_existing_path: Optional[str] = None, diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 37e6810c9..167f3e8b0 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -8,7 +8,6 @@ import os import sys import threading -from dataclasses import dataclass from datetime import timezone from types import TracebackType from typing import ( @@ -657,6 +656,7 @@ def attempt_deadlock_interruption(self) -> None: return deadlocked_thread_id = self.instance.get_thread_id() if deadlocked_thread_id: + print("🌈 _raise_in_thread: interrupting deadlock") temporalio.bridge.runtime.Runtime._raise_in_thread( deadlocked_thread_id, _InterruptDeadlockError ) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index f57f4b9fd..a19fcea33 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -6989,6 +6989,8 @@ async def run(self) -> None: try: while True: pass + except BaseException as e: + print(f"🌈 DeadlockInterruptibleWorkflow: {e.__class__.__name__}({e})") finally: global deadlock_interruptible_completed deadlock_interruptible_completed += 1 @@ -7019,6 +7021,7 @@ async def check_completed(): await assert_eventually(check_completed) completed_sec = time.monotonic() + await handle.result() # Confirm worker shutdown didn't hang assert time.monotonic() - completed_sec < 20