1
+ import asyncRetry from 'async-retry' ;
1
2
import { setTimeout as setTimeoutPromise } from 'timers/promises' ;
2
3
import { randomUUID } from 'crypto' ;
3
4
import { ExecutionContext } from 'ava' ;
4
5
import { firstValueFrom , Subject } from 'rxjs' ;
5
- import { WorkflowFailedError } from '@temporalio/client' ;
6
+ import { WorkflowFailedError , WorkflowHandle } from '@temporalio/client' ;
6
7
import * as activity from '@temporalio/activity' ;
7
8
import { msToNumber , tsToMs } from '@temporalio/common/lib/time' ;
8
9
import { TestWorkflowEnvironment } from '@temporalio/testing' ;
9
10
import { CancelReason } from '@temporalio/worker/lib/activity' ;
10
11
import * as workflow from '@temporalio/workflow' ;
11
- import { defineQuery , defineSignal } from '@temporalio/workflow' ;
12
+ import { condition , defineQuery , defineSignal , defineUpdate , setDefaultQueryHandler , setDefaultSignalHandler , setDefaultUpdateHandler , setHandler } from '@temporalio/workflow' ;
12
13
import { SdkFlags } from '@temporalio/workflow/lib/flags' ;
13
14
import {
14
15
ActivityCancellationType ,
@@ -25,6 +26,7 @@ import * as workflows from './workflows';
25
26
import { Context , createLocalTestEnvironment , helpers , makeTestFunction } from './helpers-integration' ;
26
27
import { overrideSdkInternalFlag } from './mock-internal-flags' ;
27
28
import { asSdkLoggerSink , loadHistory , RUN_TIME_SKIPPING_TESTS , waitUntil } from './helpers' ;
29
+ import { reservedPrefixes } from '@temporalio/common/src/reserved' ;
28
30
29
31
const test = makeTestFunction ( {
30
32
workflowsPath : __filename ,
@@ -1414,3 +1416,162 @@ test('Workflow can return root workflow', async (t) => {
1414
1416
t . deepEqual ( result , 'empty test-root-workflow-length' ) ;
1415
1417
} ) ;
1416
1418
} ) ;
1419
+
1420
+ test ( 'Cannot register activities using reserved prefixes' , async ( t ) => {
1421
+ const { createWorker } = helpers ( t ) ;
1422
+
1423
+ for ( const prefix of reservedPrefixes ) {
1424
+ const activityName = prefix + '_test' ;
1425
+ await t . throwsAsync (
1426
+ createWorker ( {
1427
+ activities : { [ activityName ] : ( ) => { } } ,
1428
+ } ) ,
1429
+ {
1430
+ name : 'ReservedPrefixError' ,
1431
+ message : `Cannot use activity name: '${ activityName } ', with reserved prefix: '${ prefix } '` ,
1432
+ }
1433
+ ) ;
1434
+ }
1435
+ } ) ;
1436
+
1437
+ test ( 'Cannot register task queues using reserved prefixes' , async ( t ) => {
1438
+ const { createWorker } = helpers ( t ) ;
1439
+
1440
+ for ( const prefix of reservedPrefixes ) {
1441
+ const taskQueue = prefix + '_test' ;
1442
+
1443
+ await t . throwsAsync (
1444
+ createWorker ( {
1445
+ taskQueue,
1446
+ } ) ,
1447
+ {
1448
+ name : 'ReservedPrefixError' ,
1449
+ message : `Cannot use task queue name: '${ taskQueue } ', with reserved prefix: '${ prefix } '` ,
1450
+ }
1451
+ ) ;
1452
+ }
1453
+ } ) ;
1454
+
1455
+ interface HandlerError {
1456
+ name : string ;
1457
+ message : string ;
1458
+ }
1459
+
1460
+ export async function workflowBadPrefixHandler ( prefix : string ) : Promise < HandlerError [ ] > {
1461
+ // Re-package errors, default payload converter has trouble converting native errors (no 'data' field).
1462
+ const expectedErrors : HandlerError [ ] = [ ] ;
1463
+ try {
1464
+ setHandler ( defineSignal ( prefix + '_signal' ) , ( ) => { } ) ;
1465
+ } catch ( e ) {
1466
+ if ( e instanceof Error ) {
1467
+ expectedErrors . push ( { name : e . name , message : e . message } ) ;
1468
+ }
1469
+ }
1470
+ try {
1471
+ setHandler ( defineUpdate ( prefix + '_update' ) , ( ) => { } ) ;
1472
+ } catch ( e ) {
1473
+ if ( e instanceof Error ) {
1474
+ expectedErrors . push ( { name : e . name , message : e . message } ) ;
1475
+ }
1476
+ }
1477
+ try {
1478
+ setHandler ( defineQuery ( prefix + '_query' ) , ( ) => { } ) ;
1479
+ } catch ( e ) {
1480
+ if ( e instanceof Error ) {
1481
+ expectedErrors . push ( { name : e . name , message : e . message } ) ;
1482
+ }
1483
+ }
1484
+ return expectedErrors ;
1485
+ }
1486
+
1487
+ test ( 'Workflow failure if define signals/updates/queries with reserved prefixes' , async ( t ) => {
1488
+ const { createWorker, executeWorkflow } = helpers ( t ) ;
1489
+ const worker = await createWorker ( ) ;
1490
+ await worker . runUntil ( async ( ) => {
1491
+ for ( const prefix of reservedPrefixes ) {
1492
+ const result = await executeWorkflow ( workflowBadPrefixHandler , {
1493
+ args : [ prefix ] ,
1494
+ } ) ;
1495
+ t . deepEqual ( result , [
1496
+ { name : 'ReservedPrefixError' , message : `Cannot use signal name: '${ prefix } _signal', with reserved prefix: '${ prefix } '` } ,
1497
+ { name : 'ReservedPrefixError' , message : `Cannot use update name: '${ prefix } _update', with reserved prefix: '${ prefix } '` } ,
1498
+ { name : 'ReservedPrefixError' , message : `Cannot use query name: '${ prefix } _query', with reserved prefix: '${ prefix } '` } ,
1499
+ ] ) ;
1500
+ }
1501
+ } ) ;
1502
+ } ) ;
1503
+
1504
+ export async function workflowWithDefaultHandlers ( ) : Promise < void > {
1505
+ let unblocked = false ;
1506
+ setHandler ( defineSignal ( 'unblock' ) , ( ) => {
1507
+ unblocked = true ;
1508
+ } ) ;
1509
+
1510
+ setDefaultQueryHandler ( ( ) => { } ) ;
1511
+ setDefaultSignalHandler ( ( ) => { } ) ;
1512
+ setDefaultUpdateHandler ( ( ) => { } ) ;
1513
+
1514
+ await condition ( ( ) => unblocked ) ;
1515
+ }
1516
+
1517
+ test ( 'Default handlers fail given reserved prefix' , async ( t ) => {
1518
+ const { createWorker, startWorkflow } = helpers ( t ) ;
1519
+ const worker = await createWorker ( ) ;
1520
+
1521
+ const assertWftFailure = async (
1522
+ handle : WorkflowHandle ,
1523
+ errMsg : string ,
1524
+ ) => {
1525
+ await asyncRetry (
1526
+ async ( ) => {
1527
+ const history = await handle . fetchHistory ( ) ;
1528
+ const wftFailedEvent = history . events ?. findLast ( ( ev ) => ev . workflowTaskFailedEventAttributes ) ;
1529
+ if ( wftFailedEvent === undefined ) {
1530
+ throw new Error ( 'No WFT failed event found' ) ;
1531
+ }
1532
+ const { failure } = wftFailedEvent . workflowTaskFailedEventAttributes ?? { } ;
1533
+ if ( ! failure ) {
1534
+ return t . fail ( 'Expected failure in workflowTaskFailedEventAttributes' ) ;
1535
+ }
1536
+ t . is ( failure . message , errMsg ) ;
1537
+ } ,
1538
+ { minTimeout : 300 , factor : 1 , retries : 10 }
1539
+ ) ;
1540
+ } ;
1541
+
1542
+ await worker . runUntil ( async ( ) => {
1543
+ for ( const prefix of reservedPrefixes ) {
1544
+
1545
+ // Reserved query
1546
+ let handle = await startWorkflow ( workflowWithDefaultHandlers ) ;
1547
+ const queryName = `${ prefix } _query` ;
1548
+ await t . throwsAsync ( handle . query ( queryName , { timeout : 1000 } ) , {
1549
+ // ReservedPrefixError transforms to a QueryNotRegisteredError on the way back from server
1550
+ name : 'QueryNotRegisteredError' ,
1551
+ message : `Cannot use query name: '${ queryName } ', with reserved prefix: '${ prefix } '` ,
1552
+ } , `Query ${ queryName } should fail` ) ;
1553
+ await handle . terminate ( ) ;
1554
+
1555
+
1556
+
1557
+ // Reserved signal
1558
+ handle = await startWorkflow ( workflowWithDefaultHandlers ) ;
1559
+ const signalName = `${ prefix } _signal` ;
1560
+ await handle . signal ( signalName ) ;
1561
+ await assertWftFailure ( handle , `Cannot use signal name: '${ signalName } ', with reserved prefix: '${ prefix } '` ) ;
1562
+ await handle . terminate ( ) ;
1563
+
1564
+
1565
+ // Reserved update
1566
+ handle = await startWorkflow ( workflowWithDefaultHandlers ) ;
1567
+ const updateName = `${ prefix } _update` ;
1568
+ handle . executeUpdate ( updateName ) . catch ( ( ) => {
1569
+ // Expect failure. The error caught here is a WorkflowNotFound because
1570
+ // the workflow will have already failed, so the update cannot go through.
1571
+ // We assert on the expected failure below.
1572
+ } ) ;
1573
+ await assertWftFailure ( handle , `Cannot use update name: '${ updateName } ', with reserved prefix: '${ prefix } '` ) ;
1574
+ await handle . terminate ( ) ;
1575
+ }
1576
+ } ) ;
1577
+ } ) ;
0 commit comments