Documentation Index
Fetch the complete documentation index at: https://docs.agentium.in/llms.txt
Use this file to discover all available pages before exploring further.
Resumable SSE + Graceful Drain
The problem
Two related production scenarios:
-
Flaky network. A mobile client streaming an agent response loses signal mid-stream. When it reconnects, you want to resume from where it dropped, not restart from scratch.
-
Graceful K8s rollout. A pod receives SIGTERM. You don’t want to drop in-flight agent runs on the floor; you want each one to finish its current step, persist a checkpoint, and exit cleanly so the next pod can pick up.
Agentium ships small primitives for both.
Resumable SSE
Concept
Every event emitted during an agent run is recorded in an in-memory ring buffer keyed by runId. Each event gets a monotonically increasing numeric id. When a client reconnects with the standard Last-Event-ID HTTP header, the server replays any events the client missed before resuming.
Client A run-1 stream:
event id=1 → delivered
event id=2 → delivered
event id=3 → client disconnects
event id=4 → buffered
event id=5 → buffered
Client A reconnects: GET /run/run-1 Last-Event-ID: 2
Server replays:
id=3 → delivered
id=4 → delivered
id=5 → delivered
(continues with new events)
InMemoryEventLog
import { InMemoryEventLog } from "@agentium/transport";
const log = new InMemoryEventLog({
maxEventsPerRun: 1024, // ring buffer size per run; default 1024
ttlMs: 5 * 60_000, // drop buffer N ms after finalize(); default 5min
});
API
interface SSEEventLog {
record(runId: string, event: { event?: string; payload: unknown }): SSEEvent;
since(runId: string, afterId: number): SSEEvent[];
all(runId: string): SSEEvent[];
finalize(runId: string): void; // schedule buffer deletion after ttlMs
drop(runId: string): void; // delete buffer immediately
}
interface SSEEvent {
id: number;
event?: string;
payload: unknown;
recordedAt: number;
}
Default singleton
defaultEventLog is exported as a process-wide shared instance so multiple endpoints can record / replay against the same buffer without coordinating:
import { defaultEventLog } from "@agentium/transport";
Renders an SSEEvent for the wire:
id: 7
data: {"type":"text","text":"Hello"}
(Trailing blank line per SSE spec.)
If ev.event is set, an event: <name> line is also emitted.
Full reference handler
import { defaultEventLog, formatSSEEvent } from "@agentium/transport";
import { Agent, DrainController, openai } from "@agentium/core";
import express from "express";
const agent = new Agent({ name: "resumable", model: openai("gpt-4o-mini") });
const drain = new DrainController();
process.on("SIGTERM", () => drain.requestDrain());
const app = express();
app.use(express.json());
app.post("/run", async (req, res) => {
const runId = req.body.runId ?? `run-${Date.now()}`;
const lastSeen = Number(req.header("last-event-id") ?? "0");
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
// Replay any events the client missed before reconnect.
for (const ev of defaultEventLog.since(runId, lastSeen)) {
res.write(formatSSEEvent(ev));
}
// Stream new chunks, recording each so the next reconnect can resume.
for await (const chunk of agent.stream(req.body.input)) {
if (drain.drained) break;
const ev = defaultEventLog.record(runId, { payload: chunk });
res.write(formatSSEEvent(ev));
}
defaultEventLog.finalize(runId);
res.write("data: [DONE]\n\n");
res.end();
});
app.listen(3000);
Storage requirements
InMemoryEventLog is in-process only. If you have multiple instances behind a load balancer, the client needs to reconnect to the same instance OR you need a shared backend.
Implementing a Redis-backed SSEEventLog is straightforward — just match the interface (record, since, all, finalize, drop). On the roadmap as RedisEventLog.
Graceful Drain
Concept
When SIGTERM hits, you want to:
- Stop accepting new requests.
- Let in-flight requests finish their current LLM call / tool roundtrip.
- Persist a resumable checkpoint.
- Exit.
DrainController is a small primitive for cooperative shutdown.
DrainController
import { DrainController, RunDrainedError } from "@agentium/core";
const drain = new DrainController();
// In a signal handler:
process.on("SIGTERM", () => drain.requestDrain());
// In your run loop:
for await (const chunk of agent.stream(input)) {
if (drain.drained) {
throw new RunDrainedError(ctx.runId);
}
// ...
}
// Or wait for drain to fire:
const drained = drain.waitForDrain(); // resolves when requestDrain() is called
const result = await Promise.race([
agent.run(input),
drained.then(() => { throw new RunDrainedError("run-1") }),
]);
API
class DrainController {
get drained(): boolean; // true after requestDrain
requestDrain(): void; // idempotent
waitForDrain(): Promise<void>; // resolves once requestDrain fires
}
class RunDrainedError extends Error {
readonly runId: string;
readonly name = "RunDrainedError";
constructor(runId: string, message?: string);
}
The controller is intentionally low-level. Higher-level integration (e.g. Agent.requestDrain() that automatically saves a checkpoint) is on the roadmap. For now, wire it yourself.
Full drain flow
import { DrainController, RunDrainedError, Workflow, InMemoryStorage, StorageBackedCheckpointStore } from "@agentium/core";
const drain = new DrainController();
const checkpointStore = new StorageBackedCheckpointStore(new InMemoryStorage());
const workflow = new Workflow({
name: "long-pipeline",
checkpointStore,
// ...
});
async function runWithDrain(input: string) {
const run = workflow.runWithCheckpoints();
const result = await Promise.race([
run,
drain.waitForDrain().then(() => {
throw new RunDrainedError("workflow-1");
}),
]);
return result;
}
process.on("SIGTERM", async () => {
drain.requestDrain();
// Give in-flight runs ~30s to wrap up.
setTimeout(() => process.exit(0), 30_000);
});
When the workflow resumes on the next pod, it picks up from the last saved checkpoint via Workflow.replay(checkpointId).
See also