API Reference
Synkro class
Synkro.start(options) static
Creates and initializes a Synkro instance.
static async start(options: SynkroOptions): Promise<Synkro>const synkro = await Synkro.start({ transport: "redis", connectionUrl: "redis://localhost:6379",});synkro.on(eventType, handler, retry?)
Registers a handler for the given event type.
on(eventType: string, handler: HandlerFunction, retry?: RetryConfig): voidsynkro.on("order:placed", async (ctx) => { console.log(ctx.payload);}, { maxRetries: 3 });synkro.off(eventType, handler?)
Removes a handler. If handler is omitted, removes all handlers for the event type.
off(eventType: string, handler?: HandlerFunction): voidsynkro.off("order:placed", myHandler);synkro.off("order:placed"); // remove allsynkro.publish(event, payload?, requestId?)
Publishes an event. If the event name matches a workflow, the workflow is started. Returns the requestId.
async publish(event: string, payload?: unknown, requestId?: string): Promise<string>const id = await synkro.publish("order:placed", { orderId: "abc" });synkro.use(middleware)
Adds a middleware function that wraps every handler execution.
use(middleware: MiddlewareFunction): voidsynkro.use(async (ctx, next) => { const start = Date.now(); await next(); console.log(`${ctx.eventType}: ${Date.now() - start}ms`);});synkro.publishDelayed(event, payload, delayMs)
Publishes an event after a delay. Returns the requestId for the pending timer.
publishDelayed(event: string, payload: unknown, delayMs: number): stringconst id = synkro.publishDelayed("reminder:send", { userId: "u1" }, 60000);synkro.schedule(eventType, intervalMs, payload?)
Creates a recurring schedule that publishes an event at a fixed interval. Returns a scheduleId.
schedule(eventType: string, intervalMs: number, payload?: unknown): stringconst scheduleId = synkro.schedule("health:check", 30000);synkro.unschedule(scheduleId)
Cancels a recurring schedule. Returns true if the schedule was found and cancelled.
unschedule(scheduleId: string): booleansynkro.unschedule(scheduleId);synkro.getWorkflowState(requestId, workflowName)
Returns the current state of a workflow instance, or null if not found.
async getWorkflowState(requestId: string, workflowName: string): Promise<WorkflowState | null>const state = await synkro.getWorkflowState(requestId, "ProcessOrder");// { workflowName: "ProcessOrder", currentStep: 1, status: "running" }synkro.cancelWorkflow(requestId, workflowName)
Cancels a running workflow. Returns true if cancelled, false if not found or not running.
async cancelWorkflow(requestId: string, workflowName: string): Promise<boolean>await synkro.cancelWorkflow(requestId, "ProcessOrder");synkro.getWorkflowGraph(workflowName)
Returns a DAG representation of a workflow, or null if not found.
getWorkflowGraph(workflowName: string): WorkflowGraph | nullconst graph = synkro.getWorkflowGraph("ProcessOrder");synkro.getEventMetrics(eventType)
Returns event processing metrics.
async getEventMetrics(eventType: string): Promise<EventMetrics>synkro.getDeadLetterItems(eventType, options?)
Returns failed messages from the dead letter queue.
async getDeadLetterItems(eventType: string, options?: { limit?: number }): Promise<DeadLetterItem[]>synkro.replayDeadLetterItem(item)
Re-publishes a dead letter item.
async replayDeadLetterItem(item: DeadLetterItem): Promise<string>synkro.clearDeadLetterQueue(eventType)
Removes all items from the DLQ for a given event type.
async clearDeadLetterQueue(eventType: string): Promise<void>synkro.introspect()
Returns a full snapshot of registered events, workflows, schedules, and workflow graphs.
introspect(): SynkroIntrospectionsynkro.stop()
Gracefully shuts down: clears timers, drains in-flight handlers, and disconnects the transport.
async stop(): Promise<void>await synkro.stop();Handler context
ctx.publish(event, payload?, requestId?)
Publish a follow-up event from within a handler.
publish: (event: string, payload?: unknown, requestId?: string) => Promise<string>ctx.setPayload(data)
Merge data into the current payload. Used in workflows to pass data between steps.
setPayload: (data: Record<string, unknown>) => voidTypes
SynkroOptions
type SynkroOptions = { transport?: "redis" | "in-memory" | TransportManager; connectionUrl?: string; debug?: boolean; logFormat?: "text" | "json"; events?: SynkroEvent[]; workflows?: SynkroWorkflow[]; handlers?: object[]; retention?: RetentionConfig; schemas?: Record<string, SchemaValidator>; drainTimeout?: number; deadLetterQueue?: boolean; middlewares?: MiddlewareFunction[];};SynkroEvent
type SynkroEvent<T = unknown> = { type: string; handler: HandlerFunction<T>; retry?: RetryConfig; schema?: SchemaValidator; filter?: EventFilter<T>;};SynkroWorkflow
type SynkroWorkflow = { name: string; steps: SynkroWorkflowStep[]; onComplete?: string; onSuccess?: string; onFailure?: string; timeoutMs?: number;};SynkroWorkflowStep
type SynkroWorkflowStep = { type: string; handler?: HandlerFunction; retry?: RetryConfig; onSuccess?: string; onFailure?: string; timeoutMs?: number;};HandlerCtx
type HandlerCtx<T = unknown> = { requestId: string; payload: T; publish: PublishFunction; setPayload: (data: Record<string, unknown>) => void;};HandlerFunction
type HandlerFunction<T = unknown> = (ctx: HandlerCtx<T>) => void | Promise<void>;PublishFunction
type PublishFunction = ( event: string, payload?: unknown, requestId?: string,) => Promise<string>;RetryConfig
type RetryConfig = { maxRetries: number; delayMs?: number; backoff?: "fixed" | "exponential"; jitter?: boolean; retryable?: (error: unknown) => boolean;};SchemaValidator
type SchemaValidator = (payload: unknown) => void;MiddlewareCtx
type MiddlewareCtx<T = unknown> = HandlerCtx<T> & { eventType: string;};MiddlewareFunction
type MiddlewareFunction = ( ctx: MiddlewareCtx, next: () => Promise<void>,) => Promise<void>;WorkflowState
type WorkflowState = { workflowName: string; currentStep: number; status: "running" | "completed" | "failed" | "cancelled";};WorkflowGraph
type WorkflowGraph = { workflowName: string; nodes: WorkflowGraphNode[]; edges: WorkflowGraphEdge[];};WorkflowGraphNode
type WorkflowGraphNode = { id: string; type: "step"; label: string; meta?: { retry?: RetryConfig; timeoutMs?: number };};WorkflowGraphEdge
type WorkflowGraphEdge = { from: string; to: string; label: "next" | "onSuccess" | "onFailure";};ScheduleInfo
type ScheduleInfo = { scheduleId: string; eventType: string; intervalMs: number; payload?: unknown; createdAt: string;};EventMetrics
type EventMetrics = { type: string; received: number; completed: number; failed: number;};DeadLetterItem
type DeadLetterItem = { eventType: string; requestId: string; payload: unknown; errors: Array<{ message: string; name?: string }>; failedAt: string; attempts: number;};RetentionConfig
type RetentionConfig = { lockTtl?: number; dedupTtl?: number; stateTtl?: number; metricsTtl?: number;};SynkroIntrospection
type SynkroIntrospection = { events: EventInfo[]; workflows: WorkflowInfo[]; schedules: ScheduleInfo[]; graphs: WorkflowGraph[];};EventFilter
type EventFilter<T = unknown> = (payload: T) => boolean;