Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.agentium.in/llms.txt

Use this file to discover all available pages before exploring further.

Workflow Time Travel

What it gives you

After every step of a Workflow.runWithCheckpoints() execution, the framework snapshots the state and writes a checkpoint. You can later:
  • Replay a run from any checkpoint (deterministically reapply later steps).
  • Fork a run at any checkpoint with a mutated state to explore alternatives.
  • List all checkpoints to drive a “rewind” UI.
This is similar to LangGraph’s checkpoint feature and Temporal’s history replay — applied to Agentium’s simpler Workflow model.

Architecture

                 step 1               step 2               step 3
   initial   ───────────▶  state A  ───────────▶  state B  ───────────▶  state C
       │                       │                     │                     │
       ▼                       ▼                     ▼                     ▼
   checkpoint:initial      checkpoint:step-1     checkpoint:step-2     checkpoint:step-3
       (stored in CheckpointStore)
Every checkpoint contains:
  • The full state at that point
  • runId, stepIndex, stepName, createdAt
  • An incremental id (the same runId:step-N shape used to look it up)

WorkflowCheckpointStore interface

interface WorkflowCheckpointStore<TState> {
  save(checkpoint: WorkflowCheckpoint<TState>): Promise<void>;
  load(checkpointId: string): Promise<WorkflowCheckpoint<TState> | null>;
  list(runId: string): Promise<WorkflowCheckpoint<TState>[]>;
  latest(runId: string): Promise<WorkflowCheckpoint<TState> | null>;
  clear(runId: string): Promise<void>;
}

interface WorkflowCheckpoint<TState> {
  id: string;            // "<runId>:step-<N>" or "<runId>:initial"
  runId: string;
  stepIndex: number;     // -1 for the pre-run "initial" checkpoint
  stepName: string;
  state: TState;
  createdAt: number;
}
Implement this interface against any backend (Redis, S3, your own DB).

StorageBackedCheckpointStore

Built-in implementation that uses any StorageDriver:
import {
  InMemoryStorage,
  StorageBackedCheckpointStore,
  Workflow,
} from "@agentium/core";

interface State {
  count: number;
  log: string[];
}

const store = new StorageBackedCheckpointStore<State>(new InMemoryStorage(), {
  keepLastN: 50, // optional cap on retained checkpoints per run
});
keepLastN is best-effort: after each save(), if more than keepLastN checkpoints exist for the run, the oldest are deleted. Set to Infinity (or omit) to retain everything.

Define a workflow with checkpointing

const wf = new Workflow<State>({
  name: "counter",
  initialState: { count: 0, log: [] },
  checkpointStore: store,
  steps: [
    {
      name: "increment-once",
      run: async (state) => ({ ...state, count: state.count + 1, log: [...state.log, "+1"] }),
    },
    {
      name: "increment-twice",
      run: async (state) => ({ ...state, count: state.count + 2, log: [...state.log, "+2"] }),
    },
    {
      name: "finalize",
      run: async (state) => ({ ...state, log: [...state.log, `total=${state.count}`] }),
    },
  ],
});

runWithCheckpoints()

Execute the workflow and persist a checkpoint after every step:
const initial = await wf.runWithCheckpoints();
console.log(initial.runId);        // "run-1762..."
console.log(initial.state.count);  // 3 after all steps
Returns:
interface WorkflowRunOutput<TState> {
  runId: string;        // generated on each call; pass as input.runId to resume
  state: TState;        // final state
}
You can also pass an explicit runId to overwrite checkpoints under that ID:
const out = await wf.runWithCheckpoints({ runId: "user-42-run-1" });

listCheckpoints(runId)

const checkpoints = await wf.listCheckpoints(initial.runId);
for (const c of checkpoints) {
  console.log(`#${c.stepIndex}: ${c.stepName}  state=${JSON.stringify(c.state)}`);
}
Sorted by stepIndex ascending. The first entry has stepIndex: -1, stepName: "initial" and contains the state BEFORE any step ran.

replay(checkpointId)

Re-execute the workflow from a specific checkpoint:
const replayed = await wf.replay(checkpoints[0].id);
// Replays from after the "initial" checkpoint - re-runs all steps with the same starting state.
// state matches the original final state for deterministic workflows.
Determinism caveat: if your step run functions are non-deterministic (calling LLMs, hitting external APIs), replays will produce different outputs. You can:
  • Mock side effects: override side-effecting calls during replay.
  • Cache step outputs: wrap each step’s run in RetryEnvelope + a cache keyed by (stepIndex, JSON.stringify(state)).
  • Treat replay as “from here, with the live world”: acceptable for debugging.

fork(checkpointId, mutator?)

Branch a new run from a checkpoint with optionally mutated state:
const forked = await wf.fork(checkpoints[0].id, (state) => ({
  ...state,
  count: 100, // start the fork with count: 100 instead of 0
}));
console.log(forked.runId);        // NEW run id - distinct from the parent
console.log(forked.state.count);  // 100 + 3 = 103
The mutator is called synchronously and must return the new state. If omitted, the fork starts with the exact state from the checkpoint (useful when you just want a “what-if” continuation under a different runId). Forks have completely independent checkpoint chains. Listing checkpoints on the parent and forked runIds shows two disjoint histories.

Use cases

Debugging a flaky agent

A workflow that orchestrates four agents fails at step 3. Instead of rerunning the entire pipeline:
const cps = await wf.listCheckpoints(badRunId);
// Replay just the failing step in isolation:
const replayed = await wf.replay(cps[2].id);

A/B exploration

After running an analysis workflow, fork from step 2 with different parameters to see how the final answer changes:
const forkA = await wf.fork(cps[2].id, (s) => ({ ...s, model: "gpt-4o" }));
const forkB = await wf.fork(cps[2].id, (s) => ({ ...s, model: "claude-sonnet-4" }));
// Compare forkA.state vs forkB.state

Resumable long workflows

A 10-step workflow gets killed by SIGTERM after step 6. The next pod picks it up:
const cps = await wf.listCheckpoints(savedRunId);
const last = cps[cps.length - 1]; // the latest persisted checkpoint
const resumed = await wf.replay(last.id);

Audit / time travel UI

Render the checkpoint list as a timeline. Clicking step N shows the state right before step N+1 ran. Clicking “fork from here” creates a sandbox run.

Combining with DrainController

Graceful K8s rollouts are the canonical “save a checkpoint and exit” scenario:
import { DrainController, RunDrainedError } from "@agentium/core";

const drain = new DrainController();
process.on("SIGTERM", () => drain.requestDrain());

async function runWithDrain(input: { runId?: string }) {
  try {
    return await Promise.race([
      wf.runWithCheckpoints(input),
      drain.waitForDrain().then(() => {
        throw new RunDrainedError(input.runId ?? "unknown");
      }),
    ]);
  } catch (err) {
    if (err instanceof RunDrainedError) {
      // The latest checkpoint is already on disk. Next pod picks up via replay.
      return;
    }
    throw err;
  }
}

Performance

  • Each checkpoint write is one StorageDriver.set call. For SQLite that’s ~1ms; for Postgres ~5ms.
  • Checkpoint size = state size. Keep state lean by storing references (IDs) rather than full LLM messages.
  • keepLastN: 50 is a reasonable default. For tight memory budgets, drop to 10.

See also