Skip to content

Nexus: worker, workflow-backed operations, and workflow caller #813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 107 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
c215d26
Nexus
dandavison Apr 19, 2025
5b76325
Nexus workflow caller
dandavison Jun 10, 2025
f463a7d
Nexus: squashed commit
dandavison Jun 12, 2025
381ac3a
Option 1 for workflow_run_operation_handler
dandavison Jun 21, 2025
01740e5
Revert "Option 1 for workflow_run_operation_handler"
dandavison Jun 21, 2025
025cf2b
Adjust imports
dandavison Jun 21, 2025
04033c5
Revert "Adjust imports"
dandavison Jun 21, 2025
6df6920
TODO
dandavison Jun 21, 2025
def9147
Option 2 for workflow_run_operation_handler
dandavison Jun 21, 2025
b02d7ee
Failing test: first of two workflows incorrectly delivers result
dandavison Jun 22, 2025
0ed7774
WIP: Option 3 for workflow_run_operation_handler
dandavison Jun 22, 2025
ce5f481
TemporalNexusOperationContext should not be an ABC
dandavison Jun 22, 2025
cb67db5
Remove unused output_type
dandavison Jun 22, 2025
9f217d5
Cleanup
dandavison Jun 22, 2025
f702d59
Make WorkflowOperationToken generic, parameterized by output type
dandavison Jun 22, 2025
362f5ca
Option 4 for WorkflowRunOperationHandler
dandavison Jun 22, 2025
f89747d
Cleanup
dandavison Jun 22, 2025
8613152
Refactor test
dandavison Jun 22, 2025
98122f1
Failing test: request ID is not used for non-backing workflow
dandavison Jun 22, 2025
86493c6
Bug fix: wire request_id through as top-level start_workflow param
dandavison Jun 22, 2025
f60a763
Rename: TemporalOperationContext
dandavison Jun 22, 2025
1c9c407
Rename: cancel_operation
dandavison Jun 22, 2025
a0c9b46
Cleanup
dandavison Jun 22, 2025
427c052
Do not allow Nexus operation to set client used for starting workflow
dandavison Jun 22, 2025
14afbd4
Make task queue optional when starting workflows
dandavison Jun 22, 2025
189dd22
Add nexus_task_poller_behavior
dandavison Jun 22, 2025
3b85beb
Handle PollShutdownError
dandavison Jun 22, 2025
bee22d7
Respond to upstream: default to async
dandavison Jun 23, 2025
8742bbc
Cleanup; changes from review comments
dandavison Jun 22, 2025
67421da
Respond to upstream: handler factory instead of sync_operation_handler
dandavison Jun 23, 2025
c085b86
Switch workflow_run_operation_handler to standard factory
dandavison Jun 23, 2025
5cbda5f
Do not support passing client to cancel_operation
dandavison Jun 23, 2025
a3a619c
RTU: bridge Rust
dandavison Jun 23, 2025
535e729
Fix: make all methods `async def` on WorkflowRunOperationHandler
dandavison Jun 24, 2025
f81808f
Get rid of TypeGuard
dandavison Jun 24, 2025
1cf5a45
Support passing result_type when getting workflow handle from token
dandavison Jun 24, 2025
1e71487
Implement fetch_result handler
dandavison Jun 24, 2025
00cc31f
Cleanup
dandavison Jun 24, 2025
226565e
Tests: clean up type annotation warnings
dandavison Jun 24, 2025
b9a4320
Improve type annotation warnings
dandavison Jun 24, 2025
0812b0a
Cleanup
dandavison Jun 24, 2025
4131b9a
Import nexus.handler.logger in worker
dandavison Jun 24, 2025
a73d908
Do not issue warnings when user is not using type annotations
dandavison Jun 24, 2025
f138275
Remove redundant validation
dandavison Jun 24, 2025
7a7935a
Respond to code review comments
dandavison Jun 24, 2025
e33a879
Don't swallow exceptions when encoding failures
dandavison Jun 24, 2025
4906679
Catch BaseException at top-level in worker
dandavison Jun 24, 2025
966b1b8
Fail worker on broken executor
dandavison Jun 24, 2025
6707341
Revert "Catch BaseException at top-level in worker"
dandavison Jun 24, 2025
d1d256c
Cleanup
dandavison Jun 24, 2025
3e5233a
Change context method name: .current() -> .get()
dandavison Jun 24, 2025
35edfdd
Rename: TemporalNexusOperationContext
dandavison Jun 24, 2025
e56af35
Expose contextvar object directly
dandavison Jun 24, 2025
0c5676b
Mark methods as private
dandavison Jun 24, 2025
dad354e
Add run-time type check
dandavison Jun 24, 2025
bb065ef
Make start_workflow a static function
dandavison Jun 24, 2025
ad9ab2d
Remove accidental exports
dandavison Jun 24, 2025
e4a6141
Docstrings
dandavison Jun 24, 2025
c1f75d9
Comment, cleanup
dandavison Jun 24, 2025
b2bf52e
TODO
dandavison Jun 24, 2025
03d2208
TODOs
dandavison Jun 24, 2025
5979ed8
Get rid of spurious type parameters
dandavison Jun 25, 2025
19379eb
Add worker logging
dandavison Jun 25, 2025
757bea9
Type-level enforcement of the two ways to use WorkflowRunOperationHan…
dandavison Jun 25, 2025
ec2b122
Respond to upstream: SyncOperation.from_callable
dandavison Jun 25, 2025
3b4e991
-> WorkflowRunOperation.from_callable()
dandavison Jun 25, 2025
4590473
TODO
dandavison Jun 25, 2025
55e3a80
Parameterize workflow_run_operation tests
dandavison Jun 25, 2025
1696d1c
Failing test case
dandavison Jun 25, 2025
adfe18b
Test: clean up imports
dandavison Jun 25, 2025
7c3a5e2
Respond to upstream: sync_operation_handler
dandavison Jun 25, 2025
bf84ec4
New workflow_run_operation_handler
dandavison Jun 25, 2025
bc46d89
Delete reference to obsolete __nexus_service_metadata__
dandavison Jun 26, 2025
69e3f54
TODO
dandavison Jun 26, 2025
1ba08a7
Use get_callable_name utility
dandavison Jun 26, 2025
65ad14d
Fix test: 'not an async def` message changed
dandavison Jun 26, 2025
7254ac2
Refactor
dandavison Jun 26, 2025
8634adb
Reorganize: temporalio.nexus.handler -> temporalo.nexus
dandavison Jun 26, 2025
d4a50b4
Fix signatures of start_method on workflow caller side
dandavison Jun 26, 2025
c7dd4ff
`from temporalio import nexus` everywhere
dandavison Jun 26, 2025
5fd466d
Qualify client.WorkflowHandle in temporalio.nexus
dandavison Jun 26, 2025
7840a9e
Fixup: no coverage for these
dandavison Jun 26, 2025
3a912c7
Rename: nexus.WorkflowHandle
dandavison Jun 26, 2025
fc4b8c9
nexus.WorkflowHandle.{to,from}_token()
dandavison Jun 26, 2025
b1e1f1a
Respond to upstream: sync_operation_handler -> sync_operation
dandavison Jun 26, 2025
237a95d
workflow_run_operation_handler -> workflow_run_operation
dandavison Jun 26, 2025
26abae9
from nexus import workflow_run_operation
dandavison Jun 26, 2025
7edfb2e
Respond to upstream: operation_handler is not in the public API
dandavison Jun 26, 2025
5e4321d
New nexus operation context API
dandavison Jun 26, 2025
b9416b6
Fix broken test
dandavison Jun 26, 2025
7c91320
Fix another test
dandavison Jun 26, 2025
72334f5
Move Failure tests utility
dandavison Jun 26, 2025
e98c394
Fix test
dandavison Jun 26, 2025
422d1fb
RTU: relocate OperationError
dandavison Jun 26, 2025
dfa105c
Copy get_types utility from nexusrpc
dandavison Jun 26, 2025
d504af6
Fixup: eliminate references to WorkflowOperationToken
dandavison Jun 26, 2025
960ed8f
Move start_workflow to WorkflowRunOperationContext
dandavison Jun 26, 2025
b5957d7
Wire through additional context type in union
dandavison Jun 26, 2025
466228f
Eliminate unnecessary modeling of callable types
dandavison Jun 26, 2025
90d42f8
Fix passing Nexus context headers/request ID from worker
dandavison Jun 26, 2025
65f8d0a
Always passthrough nexusrpc
dandavison Jun 26, 2025
cc12038
Revert disabling of sandbox for nexus workflow tests
dandavison Jun 26, 2025
a5f909a
Passthrough 3rd-party imports in tests helpers module
dandavison Jun 26, 2025
2c84d5d
uv.lock
dandavison Jun 26, 2025
a515944
Strengthen warning note
dandavison Jun 26, 2025
5254ff2
Docstrings, comments
dandavison Jun 26, 2025
6311d4b
Type-level cleanup/evolution in workflow caller
dandavison Jun 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ informal introduction to the features and their implementation.
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
- [Worker Shutdown](#worker-shutdown)
- [Testing](#testing-1)
- [Nexus](#nexus)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see this section

- [Workflow Replay](#workflow-replay)
- [Observability](#observability)
- [Metrics](#metrics)
Expand Down Expand Up @@ -1308,6 +1309,7 @@ affect calls activity code might make to functions on the `temporalio.activity`
* `cancel()` can be invoked to simulate a cancellation of the activity
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity


### Workflow Replay

Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
Expand Down
13 changes: 12 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = [
"workflow",
]
dependencies = [
"nexus-rpc",
"protobuf>=3.20,<6",
"python-dateutil>=2.8.2,<3 ; python_version < '3.11'",
"types-protobuf>=3.20",
Expand Down Expand Up @@ -44,7 +45,7 @@ dev = [
"psutil>=5.9.3,<6",
"pydocstyle>=6.3.0,<7",
"pydoctor>=24.11.1,<25",
"pyright==1.1.377",
"pyright==1.1.400",
"pytest~=7.4",
"pytest-asyncio>=0.21,<0.22",
"pytest-timeout~=2.2",
Expand All @@ -53,6 +54,8 @@ dev = [
"twine>=4.0.1,<5",
"ruff>=0.5.0,<0.6",
"maturin>=1.8.2",
"pytest-cov>=6.1.1",
"httpx>=0.28.1",
"pytest-pretty>=1.3.0",
]

Expand Down Expand Up @@ -162,6 +165,7 @@ exclude = [
"tests/worker/workflow_sandbox/testmodules/proto",
"temporalio/bridge/worker.py",
"temporalio/contrib/opentelemetry.py",
"temporalio/contrib/pydantic.py",
"temporalio/converter.py",
"temporalio/testing/_workflow.py",
"temporalio/worker/_activity.py",
Expand All @@ -173,6 +177,10 @@ exclude = [
"tests/api/test_grpc_stub.py",
"tests/conftest.py",
"tests/contrib/test_opentelemetry.py",
"tests/contrib/pydantic/models.py",
"tests/contrib/pydantic/models_2.py",
"tests/contrib/pydantic/test_pydantic.py",
"tests/contrib/pydantic/workflows.py",
"tests/test_converter.py",
"tests/test_service.py",
"tests/test_workflow.py",
Expand Down Expand Up @@ -208,3 +216,6 @@ exclude = [
[tool.uv]
# Prevent uv commands from building the package by default
package = false

[tool.uv.sources]
nexus-rpc = { path = "../nexus-sdk-python", editable = true }
Comment on lines +220 to +221
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure not to merge until this is proper dependency

32 changes: 31 additions & 1 deletion temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use temporal_sdk_core_api::worker::{
};
use temporal_sdk_core_api::Worker;
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion};
use temporal_sdk_core_protos::temporal::api::history::v1::History;
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -60,6 +60,7 @@ pub struct WorkerConfig {
graceful_shutdown_period_millis: u64,
nondeterminism_as_workflow_fail: bool,
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
nexus_task_poller_behavior: PollerBehavior,
}

#[derive(FromPyObject)]
Expand Down Expand Up @@ -565,6 +566,18 @@ impl WorkerRef {
})
}

fn poll_nexus_task<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let worker = self.worker.as_ref().unwrap().clone();
self.runtime.future_into_py(py, async move {
let bytes = match worker.poll_nexus_task().await {
Ok(task) => task.encode_to_vec(),
Err(PollError::ShutDown) => return Err(PollShutdownError::new_err(())),
Err(err) => return Err(PyRuntimeError::new_err(format!("Poll failure: {}", err))),
};
Ok(bytes)
})
}

fn complete_workflow_activation<'p>(
&self,
py: Python<'p>,
Expand Down Expand Up @@ -599,6 +612,22 @@ impl WorkerRef {
})
}

fn complete_nexus_task<'p>(&self,
py: Python<'p>,
proto: &Bound<'_, PyBytes>,
) -> PyResult<Bound<'p, PyAny>> {
let worker = self.worker.as_ref().unwrap().clone();
let completion = NexusTaskCompletion::decode(proto.as_bytes())
.map_err(|err| PyValueError::new_err(format!("Invalid proto: {}", err)))?;
self.runtime.future_into_py(py, async move {
worker
.complete_nexus_task(completion)
.await
.context("Completion failure")
.map_err(Into::into)
})
}

fn record_activity_heartbeat(&self, proto: &Bound<'_, PyBytes>) -> PyResult<()> {
enter_sync!(self.runtime);
let heartbeat = ActivityHeartbeat::decode(proto.as_bytes())
Expand Down Expand Up @@ -696,6 +725,7 @@ fn convert_worker_config(
})
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
)
.nexus_task_poller_behavior(conf.nexus_task_poller_behavior)
.build()
.map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}")))
}
Expand Down
18 changes: 17 additions & 1 deletion temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import temporalio.bridge.client
import temporalio.bridge.proto
import temporalio.bridge.proto.activity_task
import temporalio.bridge.proto.nexus
import temporalio.bridge.proto.workflow_activation
import temporalio.bridge.proto.workflow_completion
import temporalio.bridge.runtime
Expand All @@ -35,7 +36,7 @@
from temporalio.bridge.temporal_sdk_bridge import (
CustomSlotSupplier as BridgeCustomSlotSupplier,
)
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError # type: ignore


@dataclass
Expand All @@ -60,6 +61,7 @@ class WorkerConfig:
graceful_shutdown_period_millis: int
nondeterminism_as_workflow_fail: bool
nondeterminism_as_workflow_fail_for_types: Set[str]
nexus_task_poller_behavior: PollerBehavior


@dataclass
Expand Down Expand Up @@ -216,6 +218,14 @@ async def poll_activity_task(
await self._ref.poll_activity_task()
)

async def poll_nexus_task(
self,
) -> temporalio.bridge.proto.nexus.NexusTask:
"""Poll for a nexus task."""
return temporalio.bridge.proto.nexus.NexusTask.FromString(
await self._ref.poll_nexus_task()
)

async def complete_workflow_activation(
self,
comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
Expand All @@ -229,6 +239,12 @@ async def complete_activity_task(
"""Complete an activity task."""
await self._ref.complete_activity_task(comp.SerializeToString())

async def complete_nexus_task(
self, comp: temporalio.bridge.proto.nexus.NexusTaskCompletion
) -> None:
"""Complete a nexus task."""
await self._ref.complete_nexus_task(comp.SerializeToString())

def record_activity_heartbeat(
self, comp: temporalio.bridge.proto.ActivityHeartbeat
) -> None:
Expand Down
53 changes: 50 additions & 3 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,17 @@ async def start_workflow(
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
request_eager_start: bool = False,
stack_level: int = 2,
priority: temporalio.common.Priority = temporalio.common.Priority.default,
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
# The following options should not be considered part of the public API. They
# are deliberately not exposed in overloads, and are not subject to any
# backwards compatibility guarantees.
nexus_completion_callbacks: Sequence[NexusCompletionCallback] = [],
workflow_event_links: Sequence[
temporalio.api.common.v1.Link.WorkflowEvent
] = [],
request_id: Optional[str] = None,
stack_level: int = 2,
) -> WorkflowHandle[Any, Any]:
"""Start a workflow and return its handle.

Expand Down Expand Up @@ -529,7 +537,6 @@ async def start_workflow(
name, result_type_from_type_hint = (
temporalio.workflow._Definition.get_name_and_result_type(workflow)
)

return await self._impl.start_workflow(
StartWorkflowInput(
workflow=name,
Expand Down Expand Up @@ -557,6 +564,9 @@ async def start_workflow(
rpc_timeout=rpc_timeout,
request_eager_start=request_eager_start,
priority=priority,
nexus_completion_callbacks=nexus_completion_callbacks,
workflow_event_links=workflow_event_links,
request_id=request_id,
)
)

Expand Down Expand Up @@ -5193,6 +5203,10 @@ class StartWorkflowInput:
rpc_timeout: Optional[timedelta]
request_eager_start: bool
priority: temporalio.common.Priority
# The following options are experimental and unstable.
nexus_completion_callbacks: Sequence[NexusCompletionCallback]
workflow_event_links: Sequence[temporalio.api.common.v1.Link.WorkflowEvent]
request_id: Optional[str]
Comment on lines +5207 to +5209
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to mention here these are unstable/experimental

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to mention here these are unstable/experimental

Done

versioning_override: Optional[temporalio.common.VersioningOverride] = None


Expand Down Expand Up @@ -5807,8 +5821,26 @@ async def _build_start_workflow_execution_request(
self, input: StartWorkflowInput
) -> temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest:
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest()
req.request_eager_execution = input.request_eager_start
await self._populate_start_workflow_execution_request(req, input)
# _populate_start_workflow_execution_request is used for both StartWorkflowInput
# and UpdateWithStartStartWorkflowInput. UpdateWithStartStartWorkflowInput does
# not have the following two fields so they are handled here.
req.request_eager_execution = input.request_eager_start
if input.request_id:
req.request_id = input.request_id

req.completion_callbacks.extend(
temporalio.api.common.v1.Callback(
nexus=temporalio.api.common.v1.Callback.Nexus(
url=callback.url, header=callback.header
)
)
for callback in input.nexus_completion_callbacks
)
req.links.extend(
temporalio.api.common.v1.Link(workflow_event=link)
for link in input.workflow_event_links
)
return req

async def _build_signal_with_start_workflow_execution_request(
Expand Down Expand Up @@ -7231,6 +7263,21 @@ def api_key(self, value: Optional[str]) -> None:
self.service_client.update_api_key(value)


@dataclass(frozen=True)
class NexusCompletionCallback:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to mention this is unstable/experimental and also not really for user use (I understand exposing because it's exposed in the interceptor)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@dataclass(frozen=True)
class NexusCompletionCallback:
    """Nexus callback to attach to events such as workflow completion.

    .. warning::
        This option is experimental and unstable.
    """

"""Nexus callback to attach to events such as workflow completion.

.. warning::
This option is experimental and unstable.
"""

url: str
"""Callback URL."""

header: Mapping[str, str]
"""Header to attach to callback request."""


async def _encode_user_metadata(
converter: temporalio.converter.DataConverter,
summary: Optional[Union[str, temporalio.api.common.v1.Payload]],
Expand Down
2 changes: 1 addition & 1 deletion temporalio/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum, IntEnum
from enum import IntEnum
from typing import (
Any,
Callable,
Expand Down
26 changes: 26 additions & 0 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,12 @@ def _error_to_failure(
failure.child_workflow_execution_failure_info.retry_state = (
temporalio.api.enums.v1.RetryState.ValueType(error.retry_state or 0)
)
# TODO(nexus-prerelease): test coverage for this
elif isinstance(error, temporalio.exceptions.NexusOperationError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For symmetry reasons, I suspect we also need to convert NexusHandlerError (in theory a failure conversion from a failure should be able to convert back to a failure)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to add test coverage per the comment, and for what you're saying. Maybe this is something to resolve when we merge the workflow caller.

failure.nexus_operation_execution_failure_info.SetInParent()
failure.nexus_operation_execution_failure_info.operation_token = (
error.operation_token
)

def from_failure(
self,
Expand Down Expand Up @@ -1006,6 +1012,26 @@ def from_failure(
if child_info.retry_state
else None,
)
elif failure.HasField("nexus_handler_failure_info"):
nexus_handler_failure_info = failure.nexus_handler_failure_info
err = temporalio.exceptions.NexusHandlerError(
failure.message or "Nexus handler error",
type=nexus_handler_failure_info.type,
retryable={
temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE: True,
temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE: False,
}.get(nexus_handler_failure_info.retry_behavior),
)
elif failure.HasField("nexus_operation_execution_failure_info"):
nexus_op_failure_info = failure.nexus_operation_execution_failure_info
err = temporalio.exceptions.NexusOperationError(
failure.message or "Nexus operation error",
scheduled_event_id=nexus_op_failure_info.scheduled_event_id,
endpoint=nexus_op_failure_info.endpoint,
service=nexus_op_failure_info.service,
operation=nexus_op_failure_info.operation,
operation_token=nexus_op_failure_info.operation_token,
)
else:
err = temporalio.exceptions.FailureError(failure.message or "Failure error")
err._failure = failure
Expand Down
Loading
Loading