Skip to main content

Workflow message passing - TypeScript SDK

A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. The Workflow implementation defines these endpoints via handler methods that can react to incoming messages and return values. Temporal Clients use messages to read Workflow state and control its execution. See Workflow message passing for a general overview of this topic. This page introduces these features for the Temporal Typescript SDK.

Write message handlers​

info

The code that follows is part of a working message-passing sample.

Follow these guidelines when writing your message handlers:

  • Define a message type as a global variable using defineQuery, defineSignal, or defineUpdate. This is what your client code will use to send a message to the workflow.
  • Message handlers are defined by calling workflow.setHandler in your Workflow function.
  • The parameters and return values of handlers and the main Workflow function must be serializable.
  • Prefer using a single object over multiple input parameters. A single object allows you to add fields without changing the signature.

Query handlers​

A Query is a synchronous operation that retrieves state from a Workflow Execution:

export enum Language {
ARABIC = 'ARABIC',
CHINESE = 'CHINESE',
ENGLISH = 'ENGLISH',
FRENCH = 'FRENCH',
HINDI = 'HINDI',
PORTUGUESE = 'PORTUGUESE',
SPANISH = 'SPANISH',
}

interface GetLanguagesInput {
includeUnsupported: boolean;
}

// πŸ‘‰ Use the object returned by defineQuery to set the query handler in
// Workflow code, and when sending the Query in Client code.
export const getLanguages = wf.defineQuery<Language[], [GetLanguagesInput]>('getLanguages');

export async function greetingWorkflow(): Promise<string> {
const greetings: Partial<Record<Language, string>> = {
[Language.CHINESE]: 'δ½ ε₯½οΌŒδΈ–η•Œ',
[Language.ENGLISH]: 'Hello, world',
};

wf.setHandler(getLanguages, (input: GetLanguagesInput): Language[] => {
// πŸ‘‰ A Query handler returns a value: it must not mutate the Workflow state
// and can't perform async operations.
if (input.includeUnsupported) {
return Object.values(Language);
} else {
return Object.keys(greetings) as Language[];
}
});

...
}
  • A Query handler cannot be async. You can't perform async operations like executing an Activity in a Query handler.

  • setHandler can take QueryHandlerOptions (such as description) as described in the API reference docs for workflow.setHandler.

Signal handlers​

A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:

// πŸ‘‰ Use the object returned by defineSignal to set the Signal handler in
// Workflow code, and to send the Signal from Client code.
export const approve = wf.defineSignal<[ApproveInput]>('approve');

export async function greetingWorkflow(): Promise<string> {
let approvedForRelease = false;
let approverName: string | undefined;

wf.setHandler(approve, (input) => {
// πŸ‘‰ A Signal handler mutates the Workflow state but cannot return a value.
approvedForRelease = true;
approverName = input.name;
});

...
}
...
  • The handler cannot return a value. The response is sent immediately from the server, without waiting for the Workflow to process the Signal.

  • Signal (and Update) handlers can be async. This allows you to use Activities, Child Workflows, durable workflow.sleep Timers, workflow.condition conditions, and more. See Async handlers and Workflow message passing for guidelines on safely using async Signal and Update handlers.

  • Delay calling workflow.setHandler until the Workflow initialization needed by Signal (or Update) handlers has finished. This is safe because the SDK buffers messages when there are no registered handlers for them. Note that workflow.setHandler will immediately invoke the handler of buffered Signals (or Updates) with matching types. This could lead to out-of-order processing of messages with different types.

  • setHandler can take SignalHandlerOptions (such as description and unfinishedPolicy) as described in the API reference docs for workflow.setHandler.

Update handlers and validators​

An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:

// πŸ‘‰ Use the object returned by defineUpdate to set the Update handler in
// Workflow code, and to send Updates from Client code.
export const setLanguage = wf.defineUpdate<Language, [Language]>('setLanguage');

export async function greetingWorkflow(): Promise<string> {
const greetings: Partial<Record<Language, string>> = {
[Language.CHINESE]: 'δ½ ε₯½οΌŒδΈ–η•Œ',
[Language.ENGLISH]: 'Hello, world',
};

let language = Language.ENGLISH;

wf.setHandler(
setLanguage,
(newLanguage: Language) => {
// πŸ‘‰ An Update handler can mutate the Workflow state and return a value.
const previousLanguage = language;
language = newLanguage;
return previousLanguage;
},
{
validator: (newLanguage: Language) => {
// πŸ‘‰ Update validators are optional
if (!(newLanguage in greetings)) {
throw new Error(`${newLanguage} is not supported`);
}
},
}
);

...
}
  • setHandler can take UpdateHandlerOptions (such as validator, description and unfinishedPolicy) as described in the API reference docs for workflow.setHandler.

  • About validators:

    • Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you don't need a validator.
    • To set a validator, pass the validator function in UpdateHandlerOptions when calling workflow.setHandler. The validator must be a non-async function that accepts the same argument types as the handler and returns void.
  • Accepting and rejecting Updates with validators:

    • To reject an Update, throw an error of any type in the validator.
    • Without a validator, Updates are always accepted.
  • Validators and Event History:

    • The WorkflowExecutionUpdateAccepted event is written into History whether the acceptance was automatic or due to a validator function not throwing an error.
    • When a Validator throws an error, the Update is rejected and WorkflowExecutionUpdateAccepted won't be added to the Event History. The caller receives an "Update failed" error.
  • Use workflow.currentUpdateInfo to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once.

  • Update (and Signal) handlers can be async, letting them use Activities, Child Workflows, durable workflow.sleep Timers, workflow.condition conditions, and more. See Async handlers and Workflow message passing for safe usage guidelines.

  • Delay calling workflow.setHandler until the Workflow initialization needed by Update (or Signal) handlers has finished. This is safe because the SDK buffers messages when there are no registered handlers for them. Note that workflow.setHandler will immediately invoke the handler of buffered Updates (or Signals) with matching types. This could lead to out-of-order processing of messages with different types.

Send messages​

To send Queries, Signals, or Updates, you call methods on a WorkflowHandle object:

For example:

const handle = await client.workflow.start(greetingWorkflow, {
taskQueue: 'my-task-queue',
args: [myArg],
workflowId: 'my-workflow-id',
});

To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.

Using Continue-as-New and Updates
  • Temporal does not support Continue-as-New functionality within Update handlers.
  • Complete all handlers before using Continue-as-New.
  • Use Continue-as-New from your main Workflow Definition method, just as you would complete or fail a Workflow Execution.

Send a Query​

Use WorkflowHandle.query to send a Query to a Workflow Execution:

const supportedLanguages = await handle.query(getLanguages, {
includeUnsupported: false,
});
  • Sending a Query doesn’t add events to a Workflow's Event History.

  • You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not safe and, therefore, not supported.

  • A Worker must be online and polling the Task Queue to process a Query.

Send a Signal​

You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that haven’t closed.

Send a Signal from a Client​

Use WorkflowHandle.signal to send a Signal:

await handle.signal(greetingWorkflow.approve, { name: 'me' });
  • The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.

  • The WorkflowExecutionSignaled Event appears in the Workflow's Event History.

Send a Signal from a Workflow​

A Workflow can send a Signal to another Workflow, in which case it's called an External Signal. Use getExternalWorkflowHandle:

import { getExternalWorkflowHandle } from '@temporalio/workflow';
import { joinSignal } from './other-workflow';

export async function yourWorkflowThatSignals() {
const handle = getExternalWorkflowHandle('workflow-id-123');
await handle.signal(joinSignal, { userId: 'user-1', groupId: 'group-1' });
}

When an External Signal is sent:

The getExternalWorkflowHandle method helps ensure that Workflows remain deterministic. Recall that one aspect of deterministic Workflows means not directly making network calls from the Workflow. This means that developers cannot use a Temporal Client directly within the Workflow code to send Signals or start other Workflows. Instead, to communicate between Workflows, we use getExternalWorkflowHandle to both ensure that Workflows remain deterministic and also that these interactions are recorded as Events in the Workflow's Event History.

Signal-With-Start​

Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running. Use Client.workflow.signalWithStart:

import { Client } from '@temporalio/client';
import { joinSignal, yourWorkflow } from './workflows';

const client = new Client();

await client.workflow.signalWithStart(yourWorkflow, {
workflowId: 'workflow-id-123',
taskQueue: 'my-taskqueue',
args: [{ foo: 1 }],
signal: joinSignal,
signalArgs: [{ userId: 'user-1', groupId: 'group-1' }],
});

Signal-With-Start is limited to Client use. It cannot be called from a Workflow.

Send an Update​

An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.

A client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start.

  • WorkflowExecutionUpdateAccepted is added to the Event History when the Worker confirms that the Update passed validation.
  • WorkflowExecutionUpdateCompleted is added to the Event History when the Worker confirms that the Update has finished.

To send an Update to a Workflow Execution, you can:

  • Call WorkflowHandle.executeUpdate and wait for the Update to complete. This code fetches an Update result:

    let previousLanguage = await handle.executeUpdate(setLanguage, {
    args: [Language.CHINESE],
    });
  • Send WorkflowHandle.startUpdate to receive an WorkflowUpdateHandle as soon as the Update is accepted or rejected.

    • Use this UpdateHandle later to fetch your results.
    • async Update handlers normally perform long-running asynchronous operations, such as calling an Activity.
    • startUpdate only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.

    For example:

    const updateHandle = await handle.startUpdate(setLanguage, {
    args: [Language.ENGLISH],
    waitForStage: WorkflowUpdateStage.ACCEPTED,
    });
    previousLanguage = await updateHandle.result();

    For more details, see the "Async handlers" section.

To obtain an Update handle, you can:

Update-With-Start​

Stability

In Public Preview in Temporal Cloud.

Minimum Temporal Server version Temporal Server version 1.26

Update-with-Start lets you send an Update that checks whether an already-running Workflow with that ID exists:

  • If the Workflow exists, the Update is processed.
  • If the Workflow does not exist, a new Workflow Execution is started with the given ID, and the Update is processed immediately after.

Use executeUpdateWithStart to start an Update and wait for the result in one go.

Alternatively, use startUpdateWithStart to start an Update and receive a WorkflowUpdateHandle, and then use await updateHandle.result() to retrieve the result from the Update.

These calls return once the requested Update wait stage has been reached, or when the request times out.

You will need to provide a WithStartWorkflowOperation to define the Workflow that will be started if necessary, and its arguments. You must specify a WorkflowIdConflictPolicy when creating the WithStartWorkflowOperation. Note that a WithStartWorkflowOperation can only be used once.

Here's an example taken from the early-return sample:

const startWorkflowOperation = WithStartWorkflowOperation.create(transactionWorkflow, {
workflowId,
args: [transactionID],
taskQueue: 'early-return',
workflowIdConflictPolicy: 'FAIL',
});

const earlyConfirmation = await client.workflow.executeUpdateWithStart(getTransactionConfirmation, {
startWorkflowOperation,
});
const wfHandle = await startWorkflowOperation.workflowHandle();
const finalReport = await wfHandle.result();
SEND MESSAGES WITHOUT TYPE SAFETY

In real-world development, sometimes you may be unable to import message type objects defined by defineQuery, defineSignal, or defineUpdate. When you don't have access to the Workflow Definition or it isn't written in Typescript, you can still use APIs that aren't type-safe, and dynamic method invocation. Pass message type names instead of message type objects to:

Pass Workflow IDs to these APIs to get Workflow handles:

Message handler patterns​

This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.

tip

For additional information, see Inject work into the main Workflow, Ensuring your messages are processed exactly once, and this sample demonstrating safe async message handling.

Use async handlers​

Signal and Update handlers can be async functions. Using async allows you to use await with Activities, Child Workflows, durable workflow.sleep Timers, workflow.condition conditions, etc. This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await calls. It's essential to understand the things that could go wrong in order to use async handlers safely. See Workflow message passing for guidance on safe usage of async Signal and Update handlers, the Safe message handlers sample, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.

The following code executes an Activity that makes a network call to a remote service. It modifies the Update handler from earlier on this page, turning it into an async function:

// πŸ‘‰ Use the objects returned by defineUpdate to set the Update handler in
// Workflow code, and to send Updates from Client code.
export const setLanguageUsingActivity = wf.defineUpdate<Language, [Language]>('setLanguageUsingActivity');

export async function greetingWorkflow(): Promise<string> {
const greetings: Partial<Record<Language, string>> = {
[Language.CHINESE]: 'δ½ ε₯½οΌŒδΈ–η•Œ',
[Language.ENGLISH]: 'Hello, world',
};

let language = Language.ENGLISH;

const lock = new Mutex();
wf.setHandler(setLanguageUsingActivity, async (newLanguage) => {
// πŸ‘‰ An Update handler can mutate the Workflow state and return a value.
// πŸ‘‰ Since this update handler is async, it can execute an activity.
if (!(newLanguage in greetings)) {
// πŸ‘‰ Do the following with the lock held to ensure that multiple calls to set_language are processed in order.
await lock.runExclusive(async () => {
if (!(newLanguage in greetings)) {
const greeting = await callGreetingService(newLanguage);
if (!greeting) {
// πŸ‘‰ An update validator cannot be async, so cannot be used to check that the remote
// call_greeting_service supports the requested language. Raising ApplicationError
// will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be
// added to history.
throw new wf.ApplicationFailure(`${newLanguage} is not supported by the greeting service`);
}
greetings[newLanguage] = greeting;
}
});
}
const previousLanguage = language;
language = newLanguage;
return previousLanguage;
});
...
}

After updating the code to use async, your Update handler can schedule an Activity and await the result. Although an async Signal handler can also execute an Activity, using an Update handler allows the client to receive a result or error once the Activity completes. This lets your client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc.

Add wait conditions to block​

Sometimes, async Signal or Update handlers need to meet certain conditions before they should continue. You can use workflow.condition to prevent the code from proceeding until a condition is true. You specify the condition by passing a function that returns true or false. This is an important feature that helps you control your handler logic.

Here are three important use cases for workflow.condition:

  • Waiting for a Signal or Update to arrive
  • Waiting in a handler until it is appropriate to continue.
  • Waiting in the main Workflow until all active handlers have finished.

Wait for a Signal or Update to arrive​

It's common to use workflow.condition to wait for a particular Signal or Update to be sent by a Client:

export async function greetingWorkflow(): Promise<string> {
let approvedForRelease = false;
let approverName: string | undefined;

wf.setHandler(approve, (input) => {
approvedForRelease = true;
approverName = input.name;
});
...

await wf.condition(() => approvedForRelease);
...
}

Use wait conditions in handlers​

It's common to use a Workflow wait condition in a handler. For example, suppose your Workflow has a mutable variable readyForUpdateToExecute that indicates whether your Update handler should be allowed to start executing. You can use workflow.condition in the handler to make the handler pause until the condition is met:

let readyForUpdateToExecute = false;

wf.setHandler(myUpdate, async (input: MyUpdateInput): Promise<MyUpdateOutput> => {
await wf.condition(() => readyForUpdateToExecute);
...
});

Remember: handlers can execute before the main Workflow method starts.

You can also use wait conditions anywhere else in the handler to wait for a specific condition to become true. This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become true.

Ensure your handlers finish before the Workflow completes​

Workflow wait conditions can ensure your handler completes before a Workflow finishes. When your Workflow uses async Signal or Update handlers, your main Workflow method can return or Continue-as-New while a handler is still waiting on an async task, such as an Activity result. The Workflow completing may interrupt the handler before it finishes crucial work and cause client errors when trying retrieve Update results. Use workflow.condition and allHandlersFinished to address this problem and allow your Workflow to end smoothly:

export async function myWorkflow(): Promise<MyWorkflowOutput> {
await wf.condition(wf.allHandlersFinished);
return workflowOutput;
}

By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions. You can silence these warnings on a per-handler basis by setting the unfinishedPolicy in SignalHandlerOptions or UpdateHandlerOptions when calling workflow.setHandler

See Finishing handlers before the Workflow completes for more information.

Use a lock to prevent concurrent handler execution​

Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:

export async function myWorkflow(): Promise<MyWorkflowOutput> {
let x = 0;
let y = 0;
wf.setHandler(mySignal, async () => {
const data = await myActivity();
x = data.x;

// πŸ›πŸ› Bug!! If multiple instances of this handler are executing
// concurrently, then there may be times when the Workflow has x from one
// Activity execution and y from another.
await wf.sleep(500); // or await anything else

y = data.y;
});
...
}

Coordinating access using a lock (also known as a mutex) corrects this code. Locking makes sure that only one handler instance can execute a specific section of code at any given time:

import { Mutex } from 'async-mutex';  // https://github.com/DirtyHairy/async-mutex

...

export async function myWorkflow(): Promise<MyWorkflowOutput> {
let x = 0;
let y = 0;
const lock = new Mutex();

wf.setHandler(mySignal, async () => {
await lock.runExclusive(async () => {
const data = await myActivity();
x = data.x;

// βœ… OK: node's event loop may switch now to a different handler
// execution, or to the main workflow function, but no other execution of
// this handler can run until this execution finishes.
await wf.sleep(500); // or await anything else

y = data.y;
});
});
return {
name: 'hello',
};
}

Message handler troubleshooting​

When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:

Problems when sending a Signal​

When using Signal, the two errors described above are the only errors that will result from your requests.

For Queries and Updates, the client waits for a response from the Worker and therefore additional errors may occur during the handler Execution by the Worker.

Problems when sending an Update​

When working with Updates, you may encounter these problems:

  • No Workflow Workers are polling the Task Queue: Your request will be retried by the SDK Client indefinitely.

  • Update failed: You'll receive a client.WorkflowUpdateFailedError exception. There are two ways this can happen:

    • The Update was rejected by an Update validator defined in the Workflow alongside the Update handler.

    • The Update failed after having been accepted.

    Update failures are like Workflow failures. Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. These might include:

    • A failed Child Workflow
    • A failed Activity (if the Activity retries have been set to a finite number)
    • The Workflow author raising ApplicationFailure
  • The handler caused the Workflow Task to fail: A Workflow Task Failure causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:

    • If the request hasn't been accepted by the server, you receive a client.ServiceError on which the cause.code attribute is gRPC status code 9 FAILED_PRECONDITION (after some retries).
    • If the request has been accepted, it is durable. Once the Workflow is healthy again after a code deploy, use an WorkflowUpdateHandle to fetch the Update result.
  • The Workflow finished while the Update handler execution was in progress: You'll receive a client.ServiceError on which the cause.code attribute is gRPC status code 5 NOT_FOUND. This happens if the Workflow finished while the Update handler execution was in progress, for example because

Problems when sending a Query​

When working with Queries, you may encounter these errors:

  • There is no Workflow Worker polling the Task Queue: You'll receive a client.ServiceError on which the cause.code attribute is gRPC status code 9 FAILED_PRECONDITION.

  • Query failed: You'll receive a client.QueryNotRegisteredError exception if something goes wrong during a Query. Any error in a Query handler will trigger this error. This differs from Signal and Update requests, where errors can lead to Workflow Task Failure instead.

  • The handler caused the Workflow Task to fail. This would happen, for example, if the Query handler blocks the thread for too long without yielding.

Define Signals and Queries statically or dynamically​

  • Handlers for both Signals and Queries can take arguments, which can be used inside setHandler logic.
  • Only Signal Handlers can mutate state, and only Query Handlers can return values.

Define Signals and Queries statically​

If you know the name of your Signals and Queries upfront, we recommend declaring them outside the Workflow Definition.

signals-queries/src/workflows.ts

import * as wf from '@temporalio/workflow';

export const unblockSignal = wf.defineSignal('unblock');
export const isBlockedQuery = wf.defineQuery<boolean>('isBlocked');

export async function unblockOrCancel(): Promise<void> {
let isBlocked = true;
wf.setHandler(unblockSignal, () => void (isBlocked = false));
wf.setHandler(isBlockedQuery, () => isBlocked);
wf.log.info('Blocked');
try {
await wf.condition(() => !isBlocked);
wf.log.info('Unblocked');
} catch (err) {
if (err instanceof wf.CancelledFailure) {
wf.log.info('Cancelled');
}
throw err;
}
}

This technique helps provide type safety because you can export the type signature of the Signal or Query to be called by the Client.

Define Signals and Queries dynamically​

For more flexible use cases, you might want a dynamic Signal (such as a generated ID). You can handle it in two ways:

  • Avoid making it dynamic by collapsing all Signals into one handler and move the ID to the payload.
  • Actually make the Signal name dynamic by inlining the Signal definition per handler.
import * as wf from '@temporalio/workflow';

// "fat handler" solution
wf.setHandler(`genericSignal`, (payload) => {
switch (payload.taskId) {
case taskAId:
// do task A things
break;
case taskBId:
// do task B things
break;
default:
throw new Error('Unexpected task.');
}
});

// "inline definition" solution
wf.setHandler(wf.defineSignal(`task-${taskAId}`), (payload) => {
/* do task A things */
});
wf.setHandler(wf.defineSignal(`task-${taskBId}`), (payload) => {
/* do task B things */
});

// utility "inline definition" helper
const inlineSignal = (signalName, handler) =>
wf.setHandler(wf.defineSignal(signalName), handler);
inlineSignal(`task-${taskBId}`, (payload) => {
/* do task B things */
});

API Design FAQs

Why not "new Signal" and "new Query"?

The semantic of defineSignal and defineQuery is intentional. They return Signal and Query definitions, not unique instances of Signals and Queries themselves The following is their entire source code:

/**
* Define a signal method for a Workflow.
*/
export function defineSignal<Args extends any[] = []>(
name: string,
): SignalDefinition<Args> {
return {
type: 'signal',
name,
};
}

/**
* Define a query method for a Workflow.
*/
export function defineQuery<Ret, Args extends any[] = []>(
name: string,
): QueryDefinition<Ret, Args> {
return {
type: 'query',
name,
};
}

Signals and Queries are instantiated only in setHandler and are specific to particular Workflow Executions.

These distinctions might seem minor, but they model how Temporal works under the hood, because Signals and Queries are messages identified by "just strings" and don't have meaning independent of the Workflow having a listener to handle them. This will be clearer if you refer to the Client-side APIs.

Why setHandler and not OTHER_API?

We named it setHandler instead of subscribe because a Signal or Query can have only one "handler" at a time, whereas subscribe could imply an Observable with multiple consumers and is a higher-level construct.

wf.setHandler(MySignal, handlerFn1);
wf.setHandler(MySignal, handlerFn2); // replaces handlerFn1

If you are familiar with RxJS, you are free to wrap your Signals and Queries into Observables if you want, or you could dynamically reassign the listener based on your business logic or Workflow state.