Orchestration
Queues
Serialize agent work with durable queues for backpressure and rate limiting.
- Serial execution ensures agents process one task at a time
- Durable messages survive sleep and restart
- Completable messages for request/response patterns with agents
- Backpressure absorbs bursts and prevents overload
Queue agent commands
Use actor queues to serialize work that an agent processes one task at a time.
import { agentOs } from "rivetkit/agent-os";
import common from "@rivet-dev/agent-os-common";
import pi from "@rivet-dev/agent-os-pi";
import { actor, queue, setup } from "rivetkit";
const taskRunner = actor({
queues: {
tasks: queue<{ prompt: string }>(),
},
run: async (c) => {
const agentHandle = c.actors.vm.getOrCreate(["task-agent"]);
for await (const message of c.queue.iter()) {
// Process one task at a time
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(session.sessionId, message.body.prompt);
await agentHandle.closeSession(session.sessionId);
}
},
});
const vm = agentOs({
options: { software: [common, pi] },
});
export const registry = setup({ use: { taskRunner, vm } });
registry.start();
import { createClient } from "rivetkit/client";
import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.taskRunner.getOrCreate(["main"]);
// Queue up work. Tasks are processed one at a time.
await handle.send("tasks", { prompt: "Review PR #123" });
await handle.send("tasks", { prompt: "Fix the flaky test in auth.test.ts" });
await handle.send("tasks", { prompt: "Update the README" });
Request/response with completable messages
Use completable messages when the caller needs to wait for the agent to finish.
import { agentOs } from "rivetkit/agent-os";
import common from "@rivet-dev/agent-os-common";
import pi from "@rivet-dev/agent-os-pi";
import { actor, queue, setup } from "rivetkit";
const reviewer = actor({
queues: {
review: queue<{ file: string }, { summary: string }>(),
},
run: async (c) => {
const agentHandle = c.actors.vm.getOrCreate(["reviewer"]);
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
for await (const message of c.queue.iter({ completable: true })) {
const content = await agentHandle.readFile(message.body.file);
const text = new TextDecoder().decode(content);
await agentHandle.sendPrompt(
session.sessionId,
`Review this code and write a summary to /home/user/review.txt:\n\n${text}`,
);
const review = await agentHandle.readFile("/home/user/review.txt");
await message.complete({
summary: new TextDecoder().decode(review),
});
}
},
});
const vm = agentOs({
options: { software: [common, pi] },
});
export const registry = setup({ use: { reviewer, vm } });
registry.start();
import { createClient } from "rivetkit/client";
import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.reviewer.getOrCreate(["main"]);
// Wait for the agent to complete the review
const result = await handle.send(
"review",
{ file: "/home/user/src/auth.ts" },
{ wait: true, timeout: 120_000 },
);
if (result.status === "completed") {
console.log("Review:", result.response.summary);
}
Ingesting from external systems
Accept tasks from webhooks, APIs, or other services and queue them for agent processing.
import { agentOs } from "rivetkit/agent-os";
import common from "@rivet-dev/agent-os-common";
import pi from "@rivet-dev/agent-os-pi";
import { actor, queue, setup } from "rivetkit";
const issueWorker = actor({
queues: {
issues: queue<{ title: string; body: string }>(),
},
actions: {
// HTTP endpoint to receive webhook payloads
ingestIssue: async (c, title: string, body: string) => {
await c.queue.push("issues", { title, body });
},
},
run: async (c) => {
const agentHandle = c.actors.vm.getOrCreate(["issue-worker"]);
for await (const message of c.queue.iter()) {
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(
session.sessionId,
`Investigate and fix this issue:\n\nTitle: ${message.body.title}\n\n${message.body.body}`,
);
await agentHandle.closeSession(session.sessionId);
}
},
});
const vm = agentOs({
options: { software: [common, pi] },
});
export const registry = setup({ use: { issueWorker, vm } });
registry.start();
import { createClient } from "rivetkit/client";
import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.issueWorker.getOrCreate(["main"]);
// Ingest from a webhook or external system
await handle.ingestIssue(
"Login redirect broken",
"Users are redirected to /undefined after login on mobile",
);
Recommendations
- Use queues when you need guaranteed serial execution. Agents process one message at a time, preventing race conditions.
- Use completable messages when the caller needs the result. Set a generous timeout since agent work can take minutes.
- Queues survive actor sleep. Messages are persisted and processed when the actor wakes up.
- See Queues & Run Loops for the full queue API reference.