-
Notifications
You must be signed in to change notification settings - Fork 104
Nexus: worker, workflow-backed operations, and workflow caller #813
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c215d26
5b76325
f463a7d
381ac3a
01740e5
025cf2b
04033c5
6df6920
def9147
b02d7ee
0ed7774
ce5f481
cb67db5
9f217d5
f702d59
362f5ca
f89747d
8613152
98122f1
86493c6
f60a763
1c9c407
a0c9b46
427c052
14afbd4
189dd22
3b85beb
bee22d7
8742bbc
67421da
c085b86
5cbda5f
a3a619c
535e729
f81808f
1cf5a45
1e71487
00cc31f
226565e
b9a4320
0812b0a
4131b9a
a73d908
f138275
7a7935a
e33a879
4906679
966b1b8
6707341
d1d256c
3e5233a
35edfdd
e56af35
0c5676b
dad354e
bb065ef
ad9ab2d
e4a6141
c1f75d9
b2bf52e
03d2208
5979ed8
19379eb
757bea9
ec2b122
3b4e991
4590473
55e3a80
1696d1c
adfe18b
7c3a5e2
bf84ec4
bc46d89
69e3f54
1ba08a7
65ad14d
7254ac2
8634adb
d4a50b4
c7dd4ff
5fd466d
7840a9e
3a912c7
fc4b8c9
b1e1f1a
237a95d
26abae9
7edfb2e
5e4321d
b9416b6
7c91320
72334f5
e98c394
422d1fb
dfa105c
d504af6
960ed8f
b5957d7
466228f
90d42f8
65f8d0a
cc12038
a5f909a
2c84d5d
a515944
5254ff2
6311d4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ keywords = [ | |
"workflow", | ||
] | ||
dependencies = [ | ||
"nexus-rpc", | ||
"protobuf>=3.20,<6", | ||
"python-dateutil>=2.8.2,<3 ; python_version < '3.11'", | ||
"types-protobuf>=3.20", | ||
|
@@ -44,7 +45,7 @@ dev = [ | |
"psutil>=5.9.3,<6", | ||
"pydocstyle>=6.3.0,<7", | ||
"pydoctor>=24.11.1,<25", | ||
"pyright==1.1.377", | ||
"pyright==1.1.400", | ||
"pytest~=7.4", | ||
"pytest-asyncio>=0.21,<0.22", | ||
"pytest-timeout~=2.2", | ||
|
@@ -53,6 +54,8 @@ dev = [ | |
"twine>=4.0.1,<5", | ||
"ruff>=0.5.0,<0.6", | ||
"maturin>=1.8.2", | ||
"pytest-cov>=6.1.1", | ||
"httpx>=0.28.1", | ||
"pytest-pretty>=1.3.0", | ||
] | ||
|
||
|
@@ -162,6 +165,7 @@ exclude = [ | |
"tests/worker/workflow_sandbox/testmodules/proto", | ||
"temporalio/bridge/worker.py", | ||
"temporalio/contrib/opentelemetry.py", | ||
"temporalio/contrib/pydantic.py", | ||
"temporalio/converter.py", | ||
"temporalio/testing/_workflow.py", | ||
"temporalio/worker/_activity.py", | ||
|
@@ -173,6 +177,10 @@ exclude = [ | |
"tests/api/test_grpc_stub.py", | ||
"tests/conftest.py", | ||
"tests/contrib/test_opentelemetry.py", | ||
"tests/contrib/pydantic/models.py", | ||
"tests/contrib/pydantic/models_2.py", | ||
"tests/contrib/pydantic/test_pydantic.py", | ||
"tests/contrib/pydantic/workflows.py", | ||
"tests/test_converter.py", | ||
"tests/test_service.py", | ||
"tests/test_workflow.py", | ||
|
@@ -208,3 +216,6 @@ exclude = [ | |
[tool.uv] | ||
# Prevent uv commands from building the package by default | ||
package = false | ||
|
||
[tool.uv.sources] | ||
nexus-rpc = { path = "../nexus-sdk-python", editable = true } | ||
Comment on lines
+220
to
+221
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make sure not to merge until this is proper dependency |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -464,9 +464,17 @@ async def start_workflow( | |
rpc_metadata: Mapping[str, str] = {}, | ||
rpc_timeout: Optional[timedelta] = None, | ||
request_eager_start: bool = False, | ||
stack_level: int = 2, | ||
priority: temporalio.common.Priority = temporalio.common.Priority.default, | ||
versioning_override: Optional[temporalio.common.VersioningOverride] = None, | ||
# The following options should not be considered part of the public API. They | ||
# are deliberately not exposed in overloads, and are not subject to any | ||
# backwards compatibility guarantees. | ||
nexus_completion_callbacks: Sequence[NexusCompletionCallback] = [], | ||
workflow_event_links: Sequence[ | ||
temporalio.api.common.v1.Link.WorkflowEvent | ||
] = [], | ||
request_id: Optional[str] = None, | ||
stack_level: int = 2, | ||
) -> WorkflowHandle[Any, Any]: | ||
"""Start a workflow and return its handle. | ||
|
||
|
@@ -529,7 +537,6 @@ async def start_workflow( | |
name, result_type_from_type_hint = ( | ||
temporalio.workflow._Definition.get_name_and_result_type(workflow) | ||
) | ||
|
||
return await self._impl.start_workflow( | ||
StartWorkflowInput( | ||
workflow=name, | ||
|
@@ -557,6 +564,9 @@ async def start_workflow( | |
rpc_timeout=rpc_timeout, | ||
request_eager_start=request_eager_start, | ||
priority=priority, | ||
nexus_completion_callbacks=nexus_completion_callbacks, | ||
workflow_event_links=workflow_event_links, | ||
request_id=request_id, | ||
) | ||
) | ||
|
||
|
@@ -5193,6 +5203,10 @@ class StartWorkflowInput: | |
rpc_timeout: Optional[timedelta] | ||
request_eager_start: bool | ||
priority: temporalio.common.Priority | ||
# The following options are experimental and unstable. | ||
nexus_completion_callbacks: Sequence[NexusCompletionCallback] | ||
workflow_event_links: Sequence[temporalio.api.common.v1.Link.WorkflowEvent] | ||
request_id: Optional[str] | ||
Comment on lines
+5207
to
+5209
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May want to mention here these are unstable/experimental There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Done |
||
versioning_override: Optional[temporalio.common.VersioningOverride] = None | ||
|
||
|
||
|
@@ -5807,8 +5821,26 @@ async def _build_start_workflow_execution_request( | |
self, input: StartWorkflowInput | ||
) -> temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest: | ||
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest() | ||
req.request_eager_execution = input.request_eager_start | ||
await self._populate_start_workflow_execution_request(req, input) | ||
# _populate_start_workflow_execution_request is used for both StartWorkflowInput | ||
# and UpdateWithStartStartWorkflowInput. UpdateWithStartStartWorkflowInput does | ||
# not have the following two fields so they are handled here. | ||
req.request_eager_execution = input.request_eager_start | ||
if input.request_id: | ||
req.request_id = input.request_id | ||
|
||
req.completion_callbacks.extend( | ||
temporalio.api.common.v1.Callback( | ||
nexus=temporalio.api.common.v1.Callback.Nexus( | ||
url=callback.url, header=callback.header | ||
) | ||
) | ||
for callback in input.nexus_completion_callbacks | ||
) | ||
req.links.extend( | ||
temporalio.api.common.v1.Link(workflow_event=link) | ||
for link in input.workflow_event_links | ||
) | ||
return req | ||
|
||
async def _build_signal_with_start_workflow_execution_request( | ||
|
@@ -7231,6 +7263,21 @@ def api_key(self, value: Optional[str]) -> None: | |
self.service_client.update_api_key(value) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class NexusCompletionCallback: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May want to mention this is unstable/experimental and also not really for user use (I understand exposing because it's exposed in the interceptor) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done @dataclass(frozen=True)
class NexusCompletionCallback:
"""Nexus callback to attach to events such as workflow completion.
.. warning::
This option is experimental and unstable.
""" |
||
"""Nexus callback to attach to events such as workflow completion. | ||
|
||
.. warning:: | ||
This option is experimental and unstable. | ||
""" | ||
|
||
url: str | ||
"""Callback URL.""" | ||
|
||
header: Mapping[str, str] | ||
"""Header to attach to callback request.""" | ||
|
||
|
||
async def _encode_user_metadata( | ||
converter: temporalio.converter.DataConverter, | ||
summary: Optional[Union[str, temporalio.api.common.v1.Payload]], | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -911,6 +911,12 @@ def _error_to_failure( | |
failure.child_workflow_execution_failure_info.retry_state = ( | ||
temporalio.api.enums.v1.RetryState.ValueType(error.retry_state or 0) | ||
) | ||
# TODO(nexus-prerelease): test coverage for this | ||
elif isinstance(error, temporalio.exceptions.NexusOperationError): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For symmetry reasons, I suspect we also need to convert There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to add test coverage per the comment, and for what you're saying. Maybe this is something to resolve when we merge the workflow caller. |
||
failure.nexus_operation_execution_failure_info.SetInParent() | ||
failure.nexus_operation_execution_failure_info.operation_token = ( | ||
error.operation_token | ||
) | ||
|
||
def from_failure( | ||
self, | ||
|
@@ -1006,6 +1012,26 @@ def from_failure( | |
if child_info.retry_state | ||
else None, | ||
) | ||
elif failure.HasField("nexus_handler_failure_info"): | ||
nexus_handler_failure_info = failure.nexus_handler_failure_info | ||
err = temporalio.exceptions.NexusHandlerError( | ||
failure.message or "Nexus handler error", | ||
type=nexus_handler_failure_info.type, | ||
retryable={ | ||
temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE: True, | ||
temporalio.api.enums.v1.NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE: False, | ||
}.get(nexus_handler_failure_info.retry_behavior), | ||
) | ||
elif failure.HasField("nexus_operation_execution_failure_info"): | ||
nexus_op_failure_info = failure.nexus_operation_execution_failure_info | ||
err = temporalio.exceptions.NexusOperationError( | ||
failure.message or "Nexus operation error", | ||
scheduled_event_id=nexus_op_failure_info.scheduled_event_id, | ||
endpoint=nexus_op_failure_info.endpoint, | ||
service=nexus_op_failure_info.service, | ||
operation=nexus_op_failure_info.operation, | ||
operation_token=nexus_op_failure_info.operation_token, | ||
) | ||
else: | ||
err = temporalio.exceptions.FailureError(failure.message or "Failure error") | ||
err._failure = failure | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not see this section