Skip to content

Parallel Workflows

When workflow steps declare dependsOn, the engine runs independent steps concurrently and waits for all dependencies to complete before starting dependent steps.

How it works

  • Steps without dependsOn (root steps) start immediately in parallel.
  • A step with dependsOn: ["A", "B"] waits until both A and B complete before running.
  • The engine detects a parallel workflow automatically when any step has dependsOn.
  • Cycle detection and dependency validation run at registration time.

Basic example

A fan-out / fan-in pipeline where two steps run in parallel and a third step waits for both:

const synkro = await Synkro.start({
transport: "redis",
connectionUrl: "redis://localhost:6379",
workflows: [
{
name: "DataPipeline",
steps: [
{
type: "FetchUsers",
handler: async (ctx) => {
const users = await fetchUsers();
ctx.setPayload({ ...ctx.payload, users });
},
},
{
type: "FetchOrders",
handler: async (ctx) => {
const orders = await fetchOrders();
ctx.setPayload({ ...ctx.payload, orders });
},
},
{
type: "MergeData",
dependsOn: ["FetchUsers", "FetchOrders"],
handler: async (ctx) => {
await mergeData(ctx.payload);
},
},
],
},
],
});
await synkro.startWorkflow("DataPipeline", { source: "daily-sync" });

FetchUsers and FetchOrders start immediately in parallel. MergeData runs only after both complete.

Workflow state

Parallel workflows track progress through additional state fields:

type WorkflowState = {
workflowName: string;
currentStep: number; // always -1 for parallel workflows
status: "running" | "completed" | "failed" | "cancelled";
parallel?: boolean; // true when any step has dependsOn
completedSteps?: string[];
activeSteps?: string[];
};

Query the state with synkro.getWorkflowState(requestId, workflowName) to monitor which steps are active or completed.

Fail-fast semantics

If a parallel step fails and has no onFailure handler, the entire workflow fails immediately — remaining active steps are abandoned.

Steps with an onFailure target route to the failure handler instead of failing the workflow.

{
type: "FetchUsers",
handler: async (ctx) => { /* ... */ },
onFailure: "HandleFetchError", // routes here on failure instead of failing the workflow
},
{
type: "HandleFetchError",
handler: async (ctx) => { /* ... */ },
},

Combining with conditional routing

onSuccess and onFailure work within parallel workflows. When a step has onSuccess, the engine routes to the target step instead of unblocking dependent steps.

Visualizing the graph

getWorkflowGraph() returns edges with a "dependsOn" label for parallel workflows:

const graph = synkro.getWorkflowGraph("DataPipeline");
// graph.edges includes:
// { from: "FetchUsers", to: "MergeData", label: "dependsOn" }
// { from: "FetchOrders", to: "MergeData", label: "dependsOn" }

Validation