Skip to content

Transport System

The TransportManager interface

At the heart of Synkro’s transport layer is the TransportManager interface. Both built-in transports implement it, and you can provide your own.

interface TransportManager {
// Messaging
publishMessage(channel: string, message: string): Promise<void>;
subscribeToChannel(channel: string, callback: (message: string) => void): void;
unsubscribeFromChannel(channel: string): void;
// State / cache
getCache(key: string): Promise<string | null>;
setCache(key: string, value: string, ttlSeconds?: number): Promise<void>;
setCacheIfNotExists(key: string, value: string, ttlSeconds?: number): Promise<boolean>;
deleteCache(key: string): Promise<void>;
incrementCache(key: string, ttlSeconds?: number): Promise<number>;
// List operations (used for DLQ, history)
pushToList(key: string, value: string): Promise<void>;
getListRange(key: string, start: number, stop: number): Promise<string[]>;
// Cleanup
deleteKey(key: string): Promise<void>;
disconnect(): Promise<void>;
}

The interface covers three responsibilities:

  1. Messaging — Publish and subscribe to channels for event delivery.
  2. State — Read and write key-value data for workflow state, distributed locks, and deduplication tokens.
  3. Lifecycle — Clean up resources when the orchestrator shuts down.

In-Memory transport

The in-memory transport is designed for development and testing. It requires no external services and runs entirely within a single Node.js process.

const synkro = await Synkro.start({
transport: "in-memory",
});

How it works

  • Messaging uses a lightweight internal event emitter. publishMessage emits to the channel; subscribeToChannel registers a listener.
  • State is stored in a plain Map<string, string>. TTL-based expiration is handled via setTimeout.
  • Lists use in-memory arrays.

Limitations

  • State is lost when the process exits.
  • Only a single instance can subscribe to a channel. There is no cross-process communication.
  • No distributed locking or deduplication — fine for dev, but not safe for concurrent production workloads.

Redis transport

The Redis transport is the production-ready option. It uses Redis as the backing store for messaging, state, and coordination.

const synkro = await Synkro.start({
transport: "redis",
connectionUrl: "redis://localhost:6379",
});

How it works

  • Messaging uses Redis Streams. Each channel maps to a stream, and consumers read using consumer groups. This guarantees at-least-once delivery and supports multiple instances consuming from the same stream.
  • State uses standard Redis GET/SET with optional TTL. Workflow state, lock tokens, and deduplication keys are all stored as Redis keys.
  • Locking uses SET NX (set-if-not-exists) with a TTL to implement distributed locks, preventing duplicate processing across instances.
  • Lists use Redis lists (LPUSH/LRANGE) for dead letter queues and event history.

Requirements

  • A running Redis instance (v6.2 or later recommended for full Streams support).
  • The connectionUrl must be a valid Redis connection string.

Comparison

FeatureIn-MemoryRedis
External dependenciesNoneRedis instance
PersistenceProcess lifetime onlySurvives restarts
Multi-instance supportNoYes (consumer groups)
Distributed lockingNoYes (SET NX)
DeduplicationNoYes (key-based)
Dead letter queueIn-memory listRedis list
Message deliverySynchronous emitAt-least-once (Streams)
Ideal forDevelopment, testing, CIProduction, staging

Custom transports

You can implement the TransportManager interface to back Synkro with any messaging or storage system — for example, PostgreSQL, Kafka, or SQS.

import type { TransportManager } from "@synkro/core";
class PostgresTransport implements TransportManager {
async publishMessage(channel: string, message: string) {
// Use LISTEN/NOTIFY or a polling table
}
subscribeToChannel(channel: string, callback: (message: string) => void) {
// Set up a listener
}
// ... implement all remaining methods
}

Once implemented, you can pass your custom transport directly instead of using the string shorthand:

const synkro = await Synkro.start({
transport: new PostgresTransport(),
});