Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.agentium.in/llms.txt

Use this file to discover all available pages before exploring further.

Resumable SSE + Graceful Drain

The problem

Two related production scenarios:
  1. Flaky network. A mobile client streaming an agent response loses signal mid-stream. When it reconnects, you want to resume from where it dropped, not restart from scratch.
  2. Graceful K8s rollout. A pod receives SIGTERM. You don’t want to drop in-flight agent runs on the floor; you want each one to finish its current step, persist a checkpoint, and exit cleanly so the next pod can pick up.
Agentium ships small primitives for both.

Resumable SSE

Concept

Every event emitted during an agent run is recorded in an in-memory ring buffer keyed by runId. Each event gets a monotonically increasing numeric id. When a client reconnects with the standard Last-Event-ID HTTP header, the server replays any events the client missed before resuming.
Client A run-1 stream:
  event id=1  →  delivered
  event id=2  →  delivered
  event id=3  →  client disconnects
  event id=4  →  buffered
  event id=5  →  buffered

Client A reconnects:  GET /run/run-1  Last-Event-ID: 2

Server replays:
  id=3 → delivered
  id=4 → delivered
  id=5 → delivered
  (continues with new events)

InMemoryEventLog

import { InMemoryEventLog } from "@agentium/transport";

const log = new InMemoryEventLog({
  maxEventsPerRun: 1024,  // ring buffer size per run; default 1024
  ttlMs: 5 * 60_000,      // drop buffer N ms after finalize(); default 5min
});

API

interface SSEEventLog {
  record(runId: string, event: { event?: string; payload: unknown }): SSEEvent;
  since(runId: string, afterId: number): SSEEvent[];
  all(runId: string): SSEEvent[];
  finalize(runId: string): void;  // schedule buffer deletion after ttlMs
  drop(runId: string): void;      // delete buffer immediately
}

interface SSEEvent {
  id: number;
  event?: string;
  payload: unknown;
  recordedAt: number;
}

Default singleton

defaultEventLog is exported as a process-wide shared instance so multiple endpoints can record / replay against the same buffer without coordinating:
import { defaultEventLog } from "@agentium/transport";

formatSSEEvent(ev)

Renders an SSEEvent for the wire:
id: 7
data: {"type":"text","text":"Hello"}

(Trailing blank line per SSE spec.) If ev.event is set, an event: <name> line is also emitted.

Full reference handler

import { defaultEventLog, formatSSEEvent } from "@agentium/transport";
import { Agent, DrainController, openai } from "@agentium/core";
import express from "express";

const agent = new Agent({ name: "resumable", model: openai("gpt-4o-mini") });
const drain = new DrainController();
process.on("SIGTERM", () => drain.requestDrain());

const app = express();
app.use(express.json());

app.post("/run", async (req, res) => {
  const runId = req.body.runId ?? `run-${Date.now()}`;
  const lastSeen = Number(req.header("last-event-id") ?? "0");

  res.writeHead(200, {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
    Connection: "keep-alive",
  });

  // Replay any events the client missed before reconnect.
  for (const ev of defaultEventLog.since(runId, lastSeen)) {
    res.write(formatSSEEvent(ev));
  }

  // Stream new chunks, recording each so the next reconnect can resume.
  for await (const chunk of agent.stream(req.body.input)) {
    if (drain.drained) break;
    const ev = defaultEventLog.record(runId, { payload: chunk });
    res.write(formatSSEEvent(ev));
  }
  defaultEventLog.finalize(runId);
  res.write("data: [DONE]\n\n");
  res.end();
});

app.listen(3000);

Storage requirements

InMemoryEventLog is in-process only. If you have multiple instances behind a load balancer, the client needs to reconnect to the same instance OR you need a shared backend. Implementing a Redis-backed SSEEventLog is straightforward — just match the interface (record, since, all, finalize, drop). On the roadmap as RedisEventLog.

Graceful Drain

Concept

When SIGTERM hits, you want to:
  1. Stop accepting new requests.
  2. Let in-flight requests finish their current LLM call / tool roundtrip.
  3. Persist a resumable checkpoint.
  4. Exit.
DrainController is a small primitive for cooperative shutdown.

DrainController

import { DrainController, RunDrainedError } from "@agentium/core";

const drain = new DrainController();

// In a signal handler:
process.on("SIGTERM", () => drain.requestDrain());

// In your run loop:
for await (const chunk of agent.stream(input)) {
  if (drain.drained) {
    throw new RunDrainedError(ctx.runId);
  }
  // ...
}

// Or wait for drain to fire:
const drained = drain.waitForDrain(); // resolves when requestDrain() is called

const result = await Promise.race([
  agent.run(input),
  drained.then(() => { throw new RunDrainedError("run-1") }),
]);

API

class DrainController {
  get drained(): boolean;             // true after requestDrain
  requestDrain(): void;               // idempotent
  waitForDrain(): Promise<void>;      // resolves once requestDrain fires
}

class RunDrainedError extends Error {
  readonly runId: string;
  readonly name = "RunDrainedError";
  constructor(runId: string, message?: string);
}
The controller is intentionally low-level. Higher-level integration (e.g. Agent.requestDrain() that automatically saves a checkpoint) is on the roadmap. For now, wire it yourself.

Full drain flow

import { DrainController, RunDrainedError, Workflow, InMemoryStorage, StorageBackedCheckpointStore } from "@agentium/core";

const drain = new DrainController();
const checkpointStore = new StorageBackedCheckpointStore(new InMemoryStorage());

const workflow = new Workflow({
  name: "long-pipeline",
  checkpointStore,
  // ...
});

async function runWithDrain(input: string) {
  const run = workflow.runWithCheckpoints();
  const result = await Promise.race([
    run,
    drain.waitForDrain().then(() => {
      throw new RunDrainedError("workflow-1");
    }),
  ]);
  return result;
}

process.on("SIGTERM", async () => {
  drain.requestDrain();
  // Give in-flight runs ~30s to wrap up.
  setTimeout(() => process.exit(0), 30_000);
});
When the workflow resumes on the next pod, it picks up from the last saved checkpoint via Workflow.replay(checkpointId).

See also