Transport System
The TransportManager interface
At the heart of Synkro’s transport layer is the TransportManager interface. Both built-in transports implement it, and you can provide your own.
interface TransportManager { // Messaging publishMessage(channel: string, message: string): Promise<void>; subscribeToChannel(channel: string, callback: (message: string) => void): void; unsubscribeFromChannel(channel: string): void;
// State / cache getCache(key: string): Promise<string | null>; setCache(key: string, value: string, ttlSeconds?: number): Promise<void>; setCacheIfNotExists(key: string, value: string, ttlSeconds?: number): Promise<boolean>; deleteCache(key: string): Promise<void>; incrementCache(key: string, ttlSeconds?: number): Promise<number>;
// List operations (used for DLQ, history) pushToList(key: string, value: string): Promise<void>; getListRange(key: string, start: number, stop: number): Promise<string[]>;
// Cleanup deleteKey(key: string): Promise<void>; disconnect(): Promise<void>;}The interface covers three responsibilities:
- Messaging — Publish and subscribe to channels for event delivery.
- State — Read and write key-value data for workflow state, distributed locks, and deduplication tokens.
- Lifecycle — Clean up resources when the orchestrator shuts down.
In-Memory transport
The in-memory transport is designed for development and testing. It requires no external services and runs entirely within a single Node.js process.
const synkro = await Synkro.start({ transport: "in-memory",});How it works
- Messaging uses a lightweight internal event emitter.
publishMessageemits to the channel;subscribeToChannelregisters a listener. - State is stored in a plain
Map<string, string>. TTL-based expiration is handled viasetTimeout. - Lists use in-memory arrays.
Limitations
- State is lost when the process exits.
- Only a single instance can subscribe to a channel. There is no cross-process communication.
- No distributed locking or deduplication — fine for dev, but not safe for concurrent production workloads.
Redis transport
The Redis transport is the production-ready option. It uses Redis as the backing store for messaging, state, and coordination.
const synkro = await Synkro.start({ transport: "redis", connectionUrl: "redis://localhost:6379",});How it works
- Messaging uses Redis Streams. Each channel maps to a stream, and consumers read using consumer groups. This guarantees at-least-once delivery and supports multiple instances consuming from the same stream.
- State uses standard Redis
GET/SETwith optional TTL. Workflow state, lock tokens, and deduplication keys are all stored as Redis keys. - Locking uses
SET NX(set-if-not-exists) with a TTL to implement distributed locks, preventing duplicate processing across instances. - Lists use Redis lists (
LPUSH/LRANGE) for dead letter queues and event history.
Requirements
- A running Redis instance (v6.2 or later recommended for full Streams support).
- The
connectionUrlmust be a valid Redis connection string.
Comparison
| Feature | In-Memory | Redis |
|---|---|---|
| External dependencies | None | Redis instance |
| Persistence | Process lifetime only | Survives restarts |
| Multi-instance support | No | Yes (consumer groups) |
| Distributed locking | No | Yes (SET NX) |
| Deduplication | No | Yes (key-based) |
| Dead letter queue | In-memory list | Redis list |
| Message delivery | Synchronous emit | At-least-once (Streams) |
| Ideal for | Development, testing, CI | Production, staging |
Custom transports
You can implement the TransportManager interface to back Synkro with any messaging or storage system — for example, PostgreSQL, Kafka, or SQS.
import type { TransportManager } from "@synkro/core";
class PostgresTransport implements TransportManager { async publishMessage(channel: string, message: string) { // Use LISTEN/NOTIFY or a polling table }
subscribeToChannel(channel: string, callback: (message: string) => void) { // Set up a listener }
// ... implement all remaining methods}Once implemented, you can pass your custom transport directly instead of using the string shorthand:
const synkro = await Synkro.start({ transport: new PostgresTransport(),});