Synkro Integration
The main integration point between @synkro/agents and @synkro/core is agent.asHandler(). It converts any agent into a HandlerFunction that plugs into Synkro’s event system — giving the agent distributed locking, deduplication, retries, and dead letter queues for free.
agent.asHandler()
const handler = agent.asHandler();The returned handler:
- Reads the agent input from
ctx.payload.input(if it is a string), or JSON-serializes the entire payload as input. - Calls
agent.run()with the extracted input and the SynkrorequestId. - Writes the result back via
ctx.setPayload():
{ agentOutput: "The agent's final response text", agentStatus: "completed", agentTokenUsage: { promptTokens: 150, completionTokens: 42, totalTokens: 192 }, agentToolCalls: 2}Event handler example
-
Create the agent
import { createAgent, createTool, OpenAIProvider } from "@synkro/agents";import { Synkro } from "@synkro/core";const synkro = await Synkro.start({transport: "redis",connectionUrl: "redis://localhost:6379",});const summarizeTool = createTool({name: "get_article",description: "Fetch an article by URL",parameters: {type: "object",properties: { url: { type: "string" } },required: ["url"],},execute: async (input: { url: string }) => {const res = await fetch(input.url);return { text: await res.text() };},});const agent = createAgent({name: "summarizer",systemPrompt: "You summarize articles. Use get_article to fetch content, then return a concise summary.",provider: new OpenAIProvider({ apiKey: process.env.OPENAI_API_KEY! }),model: { model: "gpt-4o" },tools: [summarizeTool],}); -
Register as a Synkro handler
synkro.on("article:summarize", agent.asHandler()); -
Publish an event
await synkro.publish("article:summarize", {input: "Summarize the article at https://example.com/ai-news",});
Workflow step example
Agents can serve as steps in multi-step Synkro workflows. Each step receives the previous step’s output via the payload.
import { createAgent, createPipeline, OpenAIProvider } from "@synkro/agents";import { Synkro } from "@synkro/core";
const synkro = await Synkro.start({ transport: "redis", connectionUrl: "redis://localhost:6379",});
const researcher = createAgent({ name: "researcher", systemPrompt: "Research the given topic and produce detailed findings.", provider: new OpenAIProvider({ apiKey: process.env.OPENAI_API_KEY! }), model: { model: "gpt-4o" },});
const writer = createAgent({ name: "writer", systemPrompt: "Write a polished blog post from the provided research.", provider: new OpenAIProvider({ apiKey: process.env.OPENAI_API_KEY! }), model: { model: "gpt-4o" },});
// createPipeline builds a SynkroWorkflowconst pipeline = createPipeline({ name: "content-pipeline", steps: [ { agent: researcher }, { agent: writer }, ], onSuccess: "pipeline:completed",});
// Register as a Synkro workflowsynkro.workflow(pipeline);
// Trigger the pipelineawait synkro.publish("content-pipeline", { input: "Write about the future of edge computing",});How pipelines work
createPipeline converts a list of agent steps into a SynkroWorkflow. Each step:
- Extracts input from the payload — for the first step, it reads
payload.input; for subsequent steps, it readspayload.agentOutput(the previous agent’s response). - Runs the agent with
agent.run(). - Writes results to the payload via
ctx.setPayload()for the next step.
You can customize input extraction per step with inputMapper:
const pipeline = createPipeline({ name: "custom-pipeline", steps: [ { agent: researcher, inputMapper: (payload) => { const p = payload as { topic: string; style: string }; return `Research "${p.topic}" in a ${p.style} style`; }, }, { agent: writer }, ],});Pipeline configuration
| Field | Type | Description |
|---|---|---|
name | string | Workflow name (used as the event channel). |
steps | AgentStep[] | Ordered list of agent steps. |
registry | AgentRegistry | Optional registry for resolving agents by name string. |
onSuccess | string | Event to publish when the pipeline completes successfully. |
onFailure | string | Event to publish when a step fails. |
onComplete | string | Event to publish on completion (success or failure). |
What you get for free
When agents run inside Synkro’s event system, they inherit all of Synkro’s infrastructure:
| Feature | Description |
|---|---|
| Distributed locking | Only one instance processes each event at a time. |
| Deduplication | Prevents duplicate processing of the same event. |
| Retries | Configure retry policies with backoff on the handler or agent config. |
| Dead letter queue | Failed agent runs are captured for inspection and replay. |
| Graceful shutdown | In-flight agent runs complete before the process exits. |