Workflows
Workflows let you define multi-step processes that execute sequentially. Each step is a handler that runs in order, with automatic state persistence and advancement.
Defining a workflow
Pass a workflows array when starting Synkro:
import { Synkro } from "@synkro/core";
const synkro = await Synkro.start({ transport: "redis", connectionUrl: "redis://localhost:6379", workflows: [ { name: "ProcessOrder", steps: [ { type: "ValidateOrder", handler: async (ctx) => { const order = ctx.payload; if (!order.items?.length) throw new Error("Empty order"); ctx.setPayload({ validated: true }); }, }, { type: "ChargePayment", handler: async (ctx) => { const result = await chargeCard(ctx.payload); ctx.setPayload({ transactionId: result.id }); }, retry: { maxRetries: 3, backoff: "exponential" }, }, { type: "FulfillOrder", handler: async (ctx) => { await shipOrder(ctx.payload); }, }, ], }, ],});Starting a workflow
Publish an event whose name matches the workflow name:
const requestId = await synkro.publish("ProcessOrder", { orderId: "abc-123", items: [{ sku: "WIDGET", qty: 2 }],});The workflow starts at step 0 and advances automatically as each step completes.
Passing data between steps
Use ctx.setPayload() inside a step handler to merge data into the payload. The merged payload is passed to the next step:
// Step 1handler: async (ctx) => { ctx.setPayload({ validated: true });}
// Step 2 receives { ...originalPayload, validated: true }handler: async (ctx) => { console.log(ctx.payload.validated); // true}setPayload performs a shallow merge — it spreads the provided object into the existing payload.
Querying workflow state
Inspect the current state of a running (or completed) workflow:
const state = await synkro.getWorkflowState(requestId, "ProcessOrder");Returns a WorkflowState object or null if no state is found:
type WorkflowState = { workflowName: string; currentStep: number; status: "running" | "completed" | "failed" | "cancelled";};Cancelling a workflow
Cancel a running workflow by its requestId and workflow name:
const cancelled = await synkro.cancelWorkflow(requestId, "ProcessOrder");// true if the workflow was running and is now cancelled// false if the workflow was not found or not in a running stateCancellation sets the workflow status to "cancelled" and clears any active step timers.
Workflow graph
You can retrieve a DAG representation of any registered workflow:
const graph = synkro.getWorkflowGraph("ProcessOrder");// { workflowName, nodes: WorkflowGraphNode[], edges: WorkflowGraphEdge[] }This is useful for visualization tools or debugging.