Skip to content

API Reference

Synkro class

Synkro.start(options) static

Creates and initializes a Synkro instance.

static async start(options: SynkroOptions): Promise<Synkro>
const synkro = await Synkro.start({
transport: "redis",
connectionUrl: "redis://localhost:6379",
});

synkro.on(eventType, handler, retry?)

Registers a handler for the given event type.

on(eventType: string, handler: HandlerFunction, retry?: RetryConfig): void
synkro.on("order:placed", async (ctx) => {
console.log(ctx.payload);
}, { maxRetries: 3 });

synkro.off(eventType, handler?)

Removes a handler. If handler is omitted, removes all handlers for the event type.

off(eventType: string, handler?: HandlerFunction): void
synkro.off("order:placed", myHandler);
synkro.off("order:placed"); // remove all

synkro.publish(event, payload?, requestId?)

Publishes an event. If the event name matches a workflow, the workflow is started. Returns the requestId.

async publish(event: string, payload?: unknown, requestId?: string): Promise<string>
const id = await synkro.publish("order:placed", { orderId: "abc" });

synkro.use(middleware)

Adds a middleware function that wraps every handler execution.

use(middleware: MiddlewareFunction): void
synkro.use(async (ctx, next) => {
const start = Date.now();
await next();
console.log(`${ctx.eventType}: ${Date.now() - start}ms`);
});

synkro.publishDelayed(event, payload, delayMs)

Publishes an event after a delay. Returns the requestId for the pending timer.

publishDelayed(event: string, payload: unknown, delayMs: number): string
const id = synkro.publishDelayed("reminder:send", { userId: "u1" }, 60000);

synkro.schedule(eventType, intervalMs, payload?)

Creates a recurring schedule that publishes an event at a fixed interval. Returns a scheduleId.

schedule(eventType: string, intervalMs: number, payload?: unknown): string
const scheduleId = synkro.schedule("health:check", 30000);

synkro.unschedule(scheduleId)

Cancels a recurring schedule. Returns true if the schedule was found and cancelled.

unschedule(scheduleId: string): boolean
synkro.unschedule(scheduleId);

synkro.getWorkflowState(requestId, workflowName)

Returns the current state of a workflow instance, or null if not found.

async getWorkflowState(requestId: string, workflowName: string): Promise<WorkflowState | null>
const state = await synkro.getWorkflowState(requestId, "ProcessOrder");
// { workflowName: "ProcessOrder", currentStep: 1, status: "running" }

synkro.cancelWorkflow(requestId, workflowName)

Cancels a running workflow. Returns true if cancelled, false if not found or not running.

async cancelWorkflow(requestId: string, workflowName: string): Promise<boolean>
await synkro.cancelWorkflow(requestId, "ProcessOrder");

synkro.getWorkflowGraph(workflowName)

Returns a DAG representation of a workflow, or null if not found.

getWorkflowGraph(workflowName: string): WorkflowGraph | null
const graph = synkro.getWorkflowGraph("ProcessOrder");

synkro.getEventMetrics(eventType)

Returns event processing metrics.

async getEventMetrics(eventType: string): Promise<EventMetrics>

synkro.getDeadLetterItems(eventType, options?)

Returns failed messages from the dead letter queue.

async getDeadLetterItems(eventType: string, options?: { limit?: number }): Promise<DeadLetterItem[]>

synkro.replayDeadLetterItem(item)

Re-publishes a dead letter item.

async replayDeadLetterItem(item: DeadLetterItem): Promise<string>

synkro.clearDeadLetterQueue(eventType)

Removes all items from the DLQ for a given event type.

async clearDeadLetterQueue(eventType: string): Promise<void>

synkro.introspect()

Returns a full snapshot of registered events, workflows, schedules, and workflow graphs.

introspect(): SynkroIntrospection

synkro.stop()

Gracefully shuts down: clears timers, drains in-flight handlers, and disconnects the transport.

async stop(): Promise<void>
await synkro.stop();

Handler context

ctx.publish(event, payload?, requestId?)

Publish a follow-up event from within a handler.

publish: (event: string, payload?: unknown, requestId?: string) => Promise<string>

ctx.setPayload(data)

Merge data into the current payload. Used in workflows to pass data between steps.

setPayload: (data: Record<string, unknown>) => void

Types

SynkroOptions

type SynkroOptions = {
transport?: "redis" | "in-memory" | TransportManager;
connectionUrl?: string;
debug?: boolean;
logFormat?: "text" | "json";
events?: SynkroEvent[];
workflows?: SynkroWorkflow[];
handlers?: object[];
retention?: RetentionConfig;
schemas?: Record<string, SchemaValidator>;
drainTimeout?: number;
deadLetterQueue?: boolean;
middlewares?: MiddlewareFunction[];
};

SynkroEvent

type SynkroEvent<T = unknown> = {
type: string;
handler: HandlerFunction<T>;
retry?: RetryConfig;
schema?: SchemaValidator;
filter?: EventFilter<T>;
};

SynkroWorkflow

type SynkroWorkflow = {
name: string;
steps: SynkroWorkflowStep[];
onComplete?: string;
onSuccess?: string;
onFailure?: string;
timeoutMs?: number;
};

SynkroWorkflowStep

type SynkroWorkflowStep = {
type: string;
handler?: HandlerFunction;
retry?: RetryConfig;
onSuccess?: string;
onFailure?: string;
timeoutMs?: number;
};

HandlerCtx

type HandlerCtx<T = unknown> = {
requestId: string;
payload: T;
publish: PublishFunction;
setPayload: (data: Record<string, unknown>) => void;
};

HandlerFunction

type HandlerFunction<T = unknown> = (ctx: HandlerCtx<T>) => void | Promise<void>;

PublishFunction

type PublishFunction = (
event: string,
payload?: unknown,
requestId?: string,
) => Promise<string>;

RetryConfig

type RetryConfig = {
maxRetries: number;
delayMs?: number;
backoff?: "fixed" | "exponential";
jitter?: boolean;
retryable?: (error: unknown) => boolean;
};

SchemaValidator

type SchemaValidator = (payload: unknown) => void;

MiddlewareCtx

type MiddlewareCtx<T = unknown> = HandlerCtx<T> & {
eventType: string;
};

MiddlewareFunction

type MiddlewareFunction = (
ctx: MiddlewareCtx,
next: () => Promise<void>,
) => Promise<void>;

WorkflowState

type WorkflowState = {
workflowName: string;
currentStep: number;
status: "running" | "completed" | "failed" | "cancelled";
};

WorkflowGraph

type WorkflowGraph = {
workflowName: string;
nodes: WorkflowGraphNode[];
edges: WorkflowGraphEdge[];
};

WorkflowGraphNode

type WorkflowGraphNode = {
id: string;
type: "step";
label: string;
meta?: { retry?: RetryConfig; timeoutMs?: number };
};

WorkflowGraphEdge

type WorkflowGraphEdge = {
from: string;
to: string;
label: "next" | "onSuccess" | "onFailure";
};

ScheduleInfo

type ScheduleInfo = {
scheduleId: string;
eventType: string;
intervalMs: number;
payload?: unknown;
createdAt: string;
};

EventMetrics

type EventMetrics = {
type: string;
received: number;
completed: number;
failed: number;
};

DeadLetterItem

type DeadLetterItem = {
eventType: string;
requestId: string;
payload: unknown;
errors: Array<{ message: string; name?: string }>;
failedAt: string;
attempts: number;
};

RetentionConfig

type RetentionConfig = {
lockTtl?: number;
dedupTtl?: number;
stateTtl?: number;
metricsTtl?: number;
};

SynkroIntrospection

type SynkroIntrospection = {
events: EventInfo[];
workflows: WorkflowInfo[];
schedules: ScheduleInfo[];
graphs: WorkflowGraph[];
};

EventFilter

type EventFilter<T = unknown> = (payload: T) => boolean;