Skip to content

Workflows

Workflows let you define multi-step processes that execute sequentially. Each step is a handler that runs in order, with automatic state persistence and advancement.

Defining a workflow

Pass a workflows array when starting Synkro:

import { Synkro } from "@synkro/core";
const synkro = await Synkro.start({
transport: "redis",
connectionUrl: "redis://localhost:6379",
workflows: [
{
name: "ProcessOrder",
steps: [
{
type: "ValidateOrder",
handler: async (ctx) => {
const order = ctx.payload;
if (!order.items?.length) throw new Error("Empty order");
ctx.setPayload({ validated: true });
},
},
{
type: "ChargePayment",
handler: async (ctx) => {
const result = await chargeCard(ctx.payload);
ctx.setPayload({ transactionId: result.id });
},
retry: { maxRetries: 3, backoff: "exponential" },
},
{
type: "FulfillOrder",
handler: async (ctx) => {
await shipOrder(ctx.payload);
},
},
],
},
],
});

Starting a workflow

Publish an event whose name matches the workflow name:

const requestId = await synkro.publish("ProcessOrder", {
orderId: "abc-123",
items: [{ sku: "WIDGET", qty: 2 }],
});

The workflow starts at step 0 and advances automatically as each step completes.

Passing data between steps

Use ctx.setPayload() inside a step handler to merge data into the payload. The merged payload is passed to the next step:

// Step 1
handler: async (ctx) => {
ctx.setPayload({ validated: true });
}
// Step 2 receives { ...originalPayload, validated: true }
handler: async (ctx) => {
console.log(ctx.payload.validated); // true
}

setPayload performs a shallow merge — it spreads the provided object into the existing payload.

Querying workflow state

Inspect the current state of a running (or completed) workflow:

const state = await synkro.getWorkflowState(requestId, "ProcessOrder");

Returns a WorkflowState object or null if no state is found:

type WorkflowState = {
workflowName: string;
currentStep: number;
status: "running" | "completed" | "failed" | "cancelled";
};

Cancelling a workflow

Cancel a running workflow by its requestId and workflow name:

const cancelled = await synkro.cancelWorkflow(requestId, "ProcessOrder");
// true if the workflow was running and is now cancelled
// false if the workflow was not found or not in a running state

Cancellation sets the workflow status to "cancelled" and clears any active step timers.

Workflow graph

You can retrieve a DAG representation of any registered workflow:

const graph = synkro.getWorkflowGraph("ProcessOrder");
// { workflowName, nodes: WorkflowGraphNode[], edges: WorkflowGraphEdge[] }

This is useful for visualization tools or debugging.