Documentation Index
Fetch the complete documentation index at: https://docs.xhipai.com/llms.txt
Use this file to discover all available pages before exploring further.
The AgentWorker consumes jobs from the BullMQ queue and executes them using registered agents and workflows. Workers can run in the same process or as separate services.
Setup
import { Agent, openai } from "@agentium/core";
import { AgentWorker } from "@agentium/queue";
const assistant = new Agent({
name: "assistant",
model: openai("gpt-4o"),
instructions: "You are a helpful assistant.",
});
const worker = new AgentWorker({
connection: { host: "localhost", port: 6379 },
agentRegistry: { assistant },
concurrency: 5,
});
worker.start();
connection
{ host: string; port: number }
required
Redis connection details. Must match the producer’s connection.
queueName
string
default:"agentium:jobs"
Queue name to consume from. Must match the producer.
Number of jobs to process simultaneously.
agentRegistry
Record<string, Agent>
required
Map of agent names to Agent instances. Names must match what the producer enqueues.
Map of workflow names to Workflow instances.
How It Works
- Worker connects to Redis and listens for jobs on the configured queue
- When a job arrives, it looks up the agent/workflow by name in the registry
- Executes
agent.run() or workflow.run() with the job’s input
- Reports progress via BullMQ job progress updates
- Stores the result (or error) back to Redis for the producer to retrieve
With Workflows
import { Agent, Workflow, openai } from "@agentium/core";
import { AgentWorker } from "@agentium/queue";
const researcher = new Agent({
name: "researcher",
model: openai("gpt-4o"),
instructions: "Research the given topic thoroughly.",
});
const writer = new Agent({
name: "writer",
model: openai("gpt-4o"),
instructions: "Write a blog post based on the research.",
});
const pipeline = new Workflow({
name: "content-pipeline",
initialState: { topic: "", research: "", article: "" },
steps: [
{ name: "research", agent: researcher, inputFrom: (s) => s.topic },
{ name: "write", agent: writer, inputFrom: (s) => `Write about: ${s.research}` },
],
});
const worker = new AgentWorker({
connection: { host: "localhost", port: 6379 },
agentRegistry: { researcher, writer },
workflowRegistry: { "content-pipeline": pipeline },
concurrency: 3,
});
worker.start();
Graceful Shutdown
process.on("SIGTERM", async () => {
console.log("Shutting down worker...");
await worker.stop();
process.exit(0);
});
The stop() method waits for currently active jobs to complete before shutting down.
Event Bridging
The bridgeEventBusToJob utility connects an agent’s EventBus to BullMQ’s job progress system, enabling real-time progress tracking:
import { bridgeEventBusToJob } from "@agentium/queue";
// This is used internally by AgentWorker, but you can use it
// for custom worker implementations
const cleanup = bridgeEventBusToJob(agent.eventBus, job, runId);
// cleanup() removes all listeners when done
Events bridged:
run.stream.chunk updates job progress
tool.call adds to job logs
tool.result adds to job logs
Scaling
Run multiple worker processes to scale horizontally. BullMQ handles job distribution automatically:
# Terminal 1
node worker.js
# Terminal 2
node worker.js
# Terminal 3
node worker.js
Each worker processes up to concurrency jobs simultaneously. With 3 workers at concurrency 5, you can process 15 jobs in parallel.
Full Producer + Worker Example
// producer.ts
import { AgentQueue } from "@agentium/queue";
const queue = new AgentQueue({
connection: { host: "localhost", port: 6379 },
});
const { jobId } = await queue.enqueueAgentRun({
agentName: "assistant",
input: "Summarize the latest AI research papers",
});
queue.onCompleted((id, result) => {
console.log(`Result: ${result.text}`);
queue.close();
});
// worker.ts
import { Agent, openai } from "@agentium/core";
import { AgentWorker } from "@agentium/queue";
const assistant = new Agent({
name: "assistant",
model: openai("gpt-4o"),
instructions: "Summarize research papers concisely.",
});
const worker = new AgentWorker({
connection: { host: "localhost", port: 6379 },
agentRegistry: { assistant },
});
worker.start();
console.log("Worker listening for jobs...");