Events (Pub/Sub)
Events are the foundation of @synkro/core. Publishers emit events; handlers react to them. Handlers can be registered imperatively at runtime or declaratively in the configuration object.
Registering handlers
Imperative registration
Use synkro.on() to register a handler after the instance is created:
synkro.on("order:placed", async (ctx) => { console.log(`Order ${ctx.requestId} placed:`, ctx.payload);});You can optionally pass a retry configuration:
synkro.on( "order:placed", async (ctx) => { /* ... */ }, { maxRetries: 3, delayMs: 500, backoff: "exponential" },);Declarative registration
Pass an events array in the configuration to register handlers at startup:
const synkro = await Synkro.start({ transport: "redis", connectionUrl: "redis://localhost:6379", events: [ { type: "order:placed", handler: async (ctx) => { console.log("Order placed:", ctx.payload); }, retry: { maxRetries: 2 }, }, { type: "payment:failed", handler: async (ctx) => { console.log("Payment failed:", ctx.payload); }, schema: (payload) => { if (!payload || typeof payload !== "object") { throw new Error("Payload must be an object"); } }, }, ],});Each entry in the events array accepts:
| Field | Type | Description |
|---|---|---|
type | string | The event type to subscribe to. |
handler | HandlerFunction | The function to invoke. |
retry | RetryConfig | Optional retry configuration. |
schema | SchemaValidator | Optional per-event schema validator. |
filter | EventFilter | Optional filter — handler runs only when filter(payload) returns true. |
Publishing events
From the Synkro instance
const requestId = await synkro.publish("order:placed", { orderId: "abc-123", total: 49.99,});publish() returns a requestId (auto-generated UUID if not provided). You can pass your own:
await synkro.publish("order:placed", { orderId: "abc-123" }, "my-custom-id");From a handler
Every handler receives a ctx object with its own publish function, allowing handlers to emit follow-up events:
synkro.on("order:placed", async (ctx) => { await processOrder(ctx.payload); await ctx.publish("order:processed", { orderId: ctx.payload.orderId });});Removing handlers
Use synkro.off() to unsubscribe:
const handler = async (ctx) => { console.log(ctx.payload);};
synkro.on("order:placed", handler);
// Later:synkro.off("order:placed", handler);synkro.off("order:placed");Handler context
Every handler receives a HandlerCtx object:
type HandlerCtx<T = unknown> = { requestId: string; payload: T; publish: PublishFunction; setPayload: (data: Record<string, unknown>) => void;};| Property | Description |
|---|---|
requestId | Unique identifier for this event instance. |
payload | The event payload. |
publish | Publish a follow-up event. |
setPayload | Merge data into the payload (used in workflows to pass data between steps). |