Documentation Index
Fetch the complete documentation index at: https://docs.xhipai.com/llms.txt
Use this file to discover all available pages before exploring further.
The AgentQueue class is the producer side of the queue system. It enqueues jobs for agents and workflows to be processed asynchronously by workers.
Setup
import { AgentQueue } from "@agentium/queue";
const queue = new AgentQueue({
connection: { host: "localhost", port: 6379 },
queueName: "agentium:jobs", // optional, this is the default
});
connection
{ host: string; port: number }
required
Redis connection details.
queueName
string
default:"agentium:jobs"
BullMQ queue name.
Default options for all jobs (priority, attempts, backoff, etc.).
Enqueue Agent Runs
const { jobId } = await queue.enqueueAgentRun({
agentName: "assistant",
input: "Analyze this quarterly report",
sessionId: "user-123",
userId: "user-123",
});
console.log("Job enqueued:", jobId);
With Priority and Delay
const { jobId } = await queue.enqueueAgentRun({
agentName: "processor",
input: "High priority task",
priority: 1, // Lower number = higher priority
delay: 5000, // Delay 5 seconds before processing
});
Enqueue Workflows
const { jobId } = await queue.enqueueWorkflow({
workflowName: "content-pipeline",
initialState: {
topic: "AI in healthcare",
format: "blog-post",
},
sessionId: "session-abc",
});
Check Job Status
const status = await queue.getJobStatus(jobId);
console.log(status.state); // "waiting" | "active" | "completed" | "failed" | "delayed"
console.log(status.progress); // 0-100
console.log(status.result); // RunOutput (when completed)
console.log(status.error); // Error message (when failed)
JobStatus Fields
| Field | Type | Description |
|---|
jobId | string | Unique job identifier |
state | string | Current state |
progress | number | Completion percentage |
result | RunOutput | Agent output (when completed) |
error | string | Error message (when failed) |
createdAt | Date | When the job was enqueued |
processedAt | Date | When processing started |
finishedAt | Date | When processing finished |
Cancel Jobs
await queue.cancelJob(jobId);
Event Handlers
Listen for job completion or failure:
queue.onCompleted((jobId, result) => {
console.log(`Job ${jobId} completed:`, result.text);
// Send notification, update database, etc.
});
queue.onFailed((jobId, error) => {
console.error(`Job ${jobId} failed:`, error.message);
// Alert, retry logic, etc.
});
Cleanup
Full Example
import { Agent, openai } from "@agentium/core";
import { AgentQueue } from "@agentium/queue";
const queue = new AgentQueue({
connection: { host: "localhost", port: 6379 },
});
// Enqueue multiple jobs
const jobs = await Promise.all([
queue.enqueueAgentRun({ agentName: "summarizer", input: "Long article text..." }),
queue.enqueueAgentRun({ agentName: "translator", input: "Translate this to Spanish" }),
queue.enqueueAgentRun({ agentName: "classifier", input: "Classify this email" }),
]);
// Monitor all jobs
for (const { jobId } of jobs) {
const status = await queue.getJobStatus(jobId);
console.log(`${jobId}: ${status.state}`);
}
// Wait for completion
queue.onCompleted((id, result) => {
console.log(`Done: ${id} -> ${result.text.slice(0, 100)}`);
});