Skip to content

Events (Pub/Sub)

Events are the foundation of @synkro/core. Publishers emit events; handlers react to them. Handlers can be registered imperatively at runtime or declaratively in the configuration object.

Registering handlers

Imperative registration

Use synkro.on() to register a handler after the instance is created:

synkro.on("order:placed", async (ctx) => {
console.log(`Order ${ctx.requestId} placed:`, ctx.payload);
});

You can optionally pass a retry configuration:

synkro.on(
"order:placed",
async (ctx) => { /* ... */ },
{ maxRetries: 3, delayMs: 500, backoff: "exponential" },
);

Declarative registration

Pass an events array in the configuration to register handlers at startup:

const synkro = await Synkro.start({
transport: "redis",
connectionUrl: "redis://localhost:6379",
events: [
{
type: "order:placed",
handler: async (ctx) => {
console.log("Order placed:", ctx.payload);
},
retry: { maxRetries: 2 },
},
{
type: "payment:failed",
handler: async (ctx) => {
console.log("Payment failed:", ctx.payload);
},
schema: (payload) => {
if (!payload || typeof payload !== "object") {
throw new Error("Payload must be an object");
}
},
},
],
});

Each entry in the events array accepts:

FieldTypeDescription
typestringThe event type to subscribe to.
handlerHandlerFunctionThe function to invoke.
retryRetryConfigOptional retry configuration.
schemaSchemaValidatorOptional per-event schema validator.
filterEventFilterOptional filter — handler runs only when filter(payload) returns true.

Publishing events

From the Synkro instance

const requestId = await synkro.publish("order:placed", {
orderId: "abc-123",
total: 49.99,
});

publish() returns a requestId (auto-generated UUID if not provided). You can pass your own:

await synkro.publish("order:placed", { orderId: "abc-123" }, "my-custom-id");

From a handler

Every handler receives a ctx object with its own publish function, allowing handlers to emit follow-up events:

synkro.on("order:placed", async (ctx) => {
await processOrder(ctx.payload);
await ctx.publish("order:processed", { orderId: ctx.payload.orderId });
});

Removing handlers

Use synkro.off() to unsubscribe:

const handler = async (ctx) => {
console.log(ctx.payload);
};
synkro.on("order:placed", handler);
// Later:
synkro.off("order:placed", handler);

Handler context

Every handler receives a HandlerCtx object:

type HandlerCtx<T = unknown> = {
requestId: string;
payload: T;
publish: PublishFunction;
setPayload: (data: Record<string, unknown>) => void;
};
PropertyDescription
requestIdUnique identifier for this event instance.
payloadThe event payload.
publishPublish a follow-up event.
setPayloadMerge data into the payload (used in workflows to pass data between steps).