Skip to content

Synkro Integration

The main integration point between @synkro/agents and @synkro/core is agent.asHandler(). It converts any agent into a HandlerFunction that plugs into Synkro’s event system — giving the agent distributed locking, deduplication, retries, and dead letter queues for free.

agent.asHandler()

const handler = agent.asHandler();

The returned handler:

  1. Reads the agent input from ctx.payload.input (if it is a string), or JSON-serializes the entire payload as input.
  2. Calls agent.run() with the extracted input and the Synkro requestId.
  3. Writes the result back via ctx.setPayload():
{
agentOutput: "The agent's final response text",
agentStatus: "completed",
agentTokenUsage: { promptTokens: 150, completionTokens: 42, totalTokens: 192 },
agentToolCalls: 2
}

Event handler example

  1. Create the agent

    import { createAgent, createTool, OpenAIProvider } from "@synkro/agents";
    import { Synkro } from "@synkro/core";
    const synkro = await Synkro.start({
    transport: "redis",
    connectionUrl: "redis://localhost:6379",
    });
    const summarizeTool = createTool({
    name: "get_article",
    description: "Fetch an article by URL",
    parameters: {
    type: "object",
    properties: { url: { type: "string" } },
    required: ["url"],
    },
    execute: async (input: { url: string }) => {
    const res = await fetch(input.url);
    return { text: await res.text() };
    },
    });
    const agent = createAgent({
    name: "summarizer",
    systemPrompt: "You summarize articles. Use get_article to fetch content, then return a concise summary.",
    provider: new OpenAIProvider({ apiKey: process.env.OPENAI_API_KEY! }),
    model: { model: "gpt-4o" },
    tools: [summarizeTool],
    });
  2. Register as a Synkro handler

    synkro.on("article:summarize", agent.asHandler());
  3. Publish an event

    await synkro.publish("article:summarize", {
    input: "Summarize the article at https://example.com/ai-news",
    });

Workflow step example

Agents can serve as steps in multi-step Synkro workflows. Each step receives the previous step’s output via the payload.

import { createAgent, createPipeline, OpenAIProvider } from "@synkro/agents";
import { Synkro } from "@synkro/core";
const synkro = await Synkro.start({
transport: "redis",
connectionUrl: "redis://localhost:6379",
});
const researcher = createAgent({
name: "researcher",
systemPrompt: "Research the given topic and produce detailed findings.",
provider: new OpenAIProvider({ apiKey: process.env.OPENAI_API_KEY! }),
model: { model: "gpt-4o" },
});
const writer = createAgent({
name: "writer",
systemPrompt: "Write a polished blog post from the provided research.",
provider: new OpenAIProvider({ apiKey: process.env.OPENAI_API_KEY! }),
model: { model: "gpt-4o" },
});
// createPipeline builds a SynkroWorkflow
const pipeline = createPipeline({
name: "content-pipeline",
steps: [
{ agent: researcher },
{ agent: writer },
],
onSuccess: "pipeline:completed",
});
// Register as a Synkro workflow
synkro.workflow(pipeline);
// Trigger the pipeline
await synkro.publish("content-pipeline", {
input: "Write about the future of edge computing",
});

How pipelines work

createPipeline converts a list of agent steps into a SynkroWorkflow. Each step:

  1. Extracts input from the payload — for the first step, it reads payload.input; for subsequent steps, it reads payload.agentOutput (the previous agent’s response).
  2. Runs the agent with agent.run().
  3. Writes results to the payload via ctx.setPayload() for the next step.

You can customize input extraction per step with inputMapper:

const pipeline = createPipeline({
name: "custom-pipeline",
steps: [
{
agent: researcher,
inputMapper: (payload) => {
const p = payload as { topic: string; style: string };
return `Research "${p.topic}" in a ${p.style} style`;
},
},
{ agent: writer },
],
});

Pipeline configuration

FieldTypeDescription
namestringWorkflow name (used as the event channel).
stepsAgentStep[]Ordered list of agent steps.
registryAgentRegistryOptional registry for resolving agents by name string.
onSuccessstringEvent to publish when the pipeline completes successfully.
onFailurestringEvent to publish when a step fails.
onCompletestringEvent to publish on completion (success or failure).

What you get for free

When agents run inside Synkro’s event system, they inherit all of Synkro’s infrastructure:

FeatureDescription
Distributed lockingOnly one instance processes each event at a time.
DeduplicationPrevents duplicate processing of the same event.
RetriesConfigure retry policies with backoff on the handler or agent config.
Dead letter queueFailed agent runs are captured for inspection and replay.
Graceful shutdownIn-flight agent runs complete before the process exits.