Documentation Index
Fetch the complete documentation index at: https://docs.agentium.in/llms.txt
Use this file to discover all available pages before exploring further.
Workflow Time Travel
What it gives you
After every step of a Workflow.runWithCheckpoints() execution, the framework snapshots the state and writes a checkpoint. You can later:
- Replay a run from any checkpoint (deterministically reapply later steps).
- Fork a run at any checkpoint with a mutated state to explore alternatives.
- List all checkpoints to drive a “rewind” UI.
This is similar to LangGraph’s checkpoint feature and Temporal’s history replay — applied to Agentium’s simpler Workflow model.
Architecture
step 1 step 2 step 3
initial ───────────▶ state A ───────────▶ state B ───────────▶ state C
│ │ │ │
▼ ▼ ▼ ▼
checkpoint:initial checkpoint:step-1 checkpoint:step-2 checkpoint:step-3
(stored in CheckpointStore)
Every checkpoint contains:
- The full state at that point
runId, stepIndex, stepName, createdAt
- An incremental
id (the same runId:step-N shape used to look it up)
WorkflowCheckpointStore interface
interface WorkflowCheckpointStore<TState> {
save(checkpoint: WorkflowCheckpoint<TState>): Promise<void>;
load(checkpointId: string): Promise<WorkflowCheckpoint<TState> | null>;
list(runId: string): Promise<WorkflowCheckpoint<TState>[]>;
latest(runId: string): Promise<WorkflowCheckpoint<TState> | null>;
clear(runId: string): Promise<void>;
}
interface WorkflowCheckpoint<TState> {
id: string; // "<runId>:step-<N>" or "<runId>:initial"
runId: string;
stepIndex: number; // -1 for the pre-run "initial" checkpoint
stepName: string;
state: TState;
createdAt: number;
}
Implement this interface against any backend (Redis, S3, your own DB).
StorageBackedCheckpointStore
Built-in implementation that uses any StorageDriver:
import {
InMemoryStorage,
StorageBackedCheckpointStore,
Workflow,
} from "@agentium/core";
interface State {
count: number;
log: string[];
}
const store = new StorageBackedCheckpointStore<State>(new InMemoryStorage(), {
keepLastN: 50, // optional cap on retained checkpoints per run
});
keepLastN is best-effort: after each save(), if more than keepLastN checkpoints exist for the run, the oldest are deleted. Set to Infinity (or omit) to retain everything.
Define a workflow with checkpointing
const wf = new Workflow<State>({
name: "counter",
initialState: { count: 0, log: [] },
checkpointStore: store,
steps: [
{
name: "increment-once",
run: async (state) => ({ ...state, count: state.count + 1, log: [...state.log, "+1"] }),
},
{
name: "increment-twice",
run: async (state) => ({ ...state, count: state.count + 2, log: [...state.log, "+2"] }),
},
{
name: "finalize",
run: async (state) => ({ ...state, log: [...state.log, `total=${state.count}`] }),
},
],
});
runWithCheckpoints()
Execute the workflow and persist a checkpoint after every step:
const initial = await wf.runWithCheckpoints();
console.log(initial.runId); // "run-1762..."
console.log(initial.state.count); // 3 after all steps
Returns:
interface WorkflowRunOutput<TState> {
runId: string; // generated on each call; pass as input.runId to resume
state: TState; // final state
}
You can also pass an explicit runId to overwrite checkpoints under that ID:
const out = await wf.runWithCheckpoints({ runId: "user-42-run-1" });
listCheckpoints(runId)
const checkpoints = await wf.listCheckpoints(initial.runId);
for (const c of checkpoints) {
console.log(`#${c.stepIndex}: ${c.stepName} state=${JSON.stringify(c.state)}`);
}
Sorted by stepIndex ascending. The first entry has stepIndex: -1, stepName: "initial" and contains the state BEFORE any step ran.
replay(checkpointId)
Re-execute the workflow from a specific checkpoint:
const replayed = await wf.replay(checkpoints[0].id);
// Replays from after the "initial" checkpoint - re-runs all steps with the same starting state.
// state matches the original final state for deterministic workflows.
Determinism caveat: if your step run functions are non-deterministic (calling LLMs, hitting external APIs), replays will produce different outputs. You can:
- Mock side effects: override side-effecting calls during replay.
- Cache step outputs: wrap each step’s
run in RetryEnvelope + a cache keyed by (stepIndex, JSON.stringify(state)).
- Treat replay as “from here, with the live world”: acceptable for debugging.
fork(checkpointId, mutator?)
Branch a new run from a checkpoint with optionally mutated state:
const forked = await wf.fork(checkpoints[0].id, (state) => ({
...state,
count: 100, // start the fork with count: 100 instead of 0
}));
console.log(forked.runId); // NEW run id - distinct from the parent
console.log(forked.state.count); // 100 + 3 = 103
The mutator is called synchronously and must return the new state. If omitted, the fork starts with the exact state from the checkpoint (useful when you just want a “what-if” continuation under a different runId).
Forks have completely independent checkpoint chains. Listing checkpoints on the parent and forked runIds shows two disjoint histories.
Use cases
Debugging a flaky agent
A workflow that orchestrates four agents fails at step 3. Instead of rerunning the entire pipeline:
const cps = await wf.listCheckpoints(badRunId);
// Replay just the failing step in isolation:
const replayed = await wf.replay(cps[2].id);
A/B exploration
After running an analysis workflow, fork from step 2 with different parameters to see how the final answer changes:
const forkA = await wf.fork(cps[2].id, (s) => ({ ...s, model: "gpt-4o" }));
const forkB = await wf.fork(cps[2].id, (s) => ({ ...s, model: "claude-sonnet-4" }));
// Compare forkA.state vs forkB.state
Resumable long workflows
A 10-step workflow gets killed by SIGTERM after step 6. The next pod picks it up:
const cps = await wf.listCheckpoints(savedRunId);
const last = cps[cps.length - 1]; // the latest persisted checkpoint
const resumed = await wf.replay(last.id);
Audit / time travel UI
Render the checkpoint list as a timeline. Clicking step N shows the state right before step N+1 ran. Clicking “fork from here” creates a sandbox run.
Combining with DrainController
Graceful K8s rollouts are the canonical “save a checkpoint and exit” scenario:
import { DrainController, RunDrainedError } from "@agentium/core";
const drain = new DrainController();
process.on("SIGTERM", () => drain.requestDrain());
async function runWithDrain(input: { runId?: string }) {
try {
return await Promise.race([
wf.runWithCheckpoints(input),
drain.waitForDrain().then(() => {
throw new RunDrainedError(input.runId ?? "unknown");
}),
]);
} catch (err) {
if (err instanceof RunDrainedError) {
// The latest checkpoint is already on disk. Next pod picks up via replay.
return;
}
throw err;
}
}
- Each checkpoint write is one
StorageDriver.set call. For SQLite that’s ~1ms; for Postgres ~5ms.
- Checkpoint size = state size. Keep state lean by storing references (IDs) rather than full LLM messages.
keepLastN: 50 is a reasonable default. For tight memory budgets, drop to 10.
See also