
Today we’re releasing Queues for Rivet Actors: per-actor durable queues with a programmable run handler. Also known as the actor mailbox pattern.
- Durable and ordered: messages persist through sleep, crashes, and deploys, processed one at a time
- Handles traffic spikes: absorbs bursts of messages without dropping any
- Request/response: callers can await a typed response from queued work
- Programmable run handler:
runis a long-lived async function you control, not a callback. Selectively consume named queues, race messages against each other, cancel work mid-flight. - Pairs with workflows: use queues as input to durable, replayable workflows
- Built into the actor: queues, state, SQLite, events, and workflows, all in one place. No external broker to provision.
Show Me The Code
Define typed queues, process them in a run handler, and send messages from a client.
import { actor, queue, setup } from "rivetkit";
const counter = actor({
state: { value: 0 },
// Define typed queues
queues: {
increment: queue<{ amount: number }>(),
},
// Process messages in the run handler
run: async (c) => {
for await (const message of c.queue.iter()) {
c.state.value += message.body.amount;
}
},
actions: {
getValue: (c) => c.state.value,
},
});
export const registry = setup({ use: { counter } });
import { actor, queue, setup } from "rivetkit";
const counter = actor({
state: { value: 0 },
queues: {
// Second type parameter is the response type
increment: queue<{ amount: number }, { value: number }>(),
},
run: async (c) => {
// Enable completable to allow responding to callers
for await (const message of c.queue.iter({ completable: true })) {
c.state.value += message.body.amount;
// Send typed response back to the caller
await message.complete({ value: c.state.value });
}
},
});
export const registry = setup({ use: { counter } });
import { actor, queue, setup } from "rivetkit";
const worker = actor({
state: { processed: 0 },
queues: {
// undefined response type means ack-only, no return data
process: queue<{ taskId: string }, undefined>(),
},
run: async (c) => {
for await (const message of c.queue.iter({ completable: true })) {
// Do work
await processTask(message.body.taskId);
c.state.processed += 1;
// Acknowledge completion without returning data
await message.complete();
}
},
});
async function processTask(taskId: string) {
await fetch(`https://api.example.com/tasks/${taskId}/complete`, {
method: "POST",
});
}
export const registry = setup({ use: { worker } });
Sending messages from a client:
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.counter.getOrCreate(["main"]);
// Fire-and-forget
await handle.send("increment", { amount: 1 });
// Wait for a typed response
const result = await handle.send(
"increment",
{ amount: 5 },
{ wait: true, timeout: 5_000 },
);
if (result.status === "completed") {
console.log(result.response); // { value: 6 }
} else if (result.status === "timedOut") {
console.log("timed out");
}
The Run Handler
The run handler is the heart of an actor. It’s a long-lived async function that owns the actor’s main processing. Instead of registering callbacks, you write it yourself: iterate queues, sleep between ticks, race signals against each other. You control exactly how and when messages are consumed.
import { actor, queue, setup } from "rivetkit";
const worker = actor({
state: { processed: 0 },
queues: {
jobs: queue<{ url: string }>(),
},
// Iterate messages as they arrive
run: async (c) => {
for await (const message of c.queue.iter()) {
await fetch(message.body.url, { method: "POST" });
c.state.processed += 1;
}
},
});
export const registry = setup({ use: { worker } });
import { actor, setup } from "rivetkit";
import { interval } from "rivetkit/utils";
const gameRoom = actor({
state: {
tick: 0,
players: {} as Record<string, { x: number; y: number }>,
},
// Fixed-interval game loop
run: async (c) => {
const tick = interval(100); // 10 ticks per second
while (!c.aborted) {
await tick();
if (c.aborted) break;
c.state.tick += 1;
// Update physics, check collisions, etc.
for (const player of Object.values(c.state.players)) {
player.x = Math.max(0, Math.min(1000, player.x));
player.y = Math.max(0, Math.min(1000, player.y));
}
c.broadcast("snapshot", c.state);
}
},
actions: {
setInput: (c, input: { x: number; y: number }) => {
c.state.players[c.conn.id] = input;
},
},
});
export const registry = setup({ use: { gameRoom } });
import { openai } from "@ai-sdk/openai";
import { generateText } from "ai";
import { actor, queue, setup } from "rivetkit";
import { joinSignals } from "rivetkit/utils";
const agent = actor({
state: { running: false, messages: [] as string[] },
queues: {
// Separate queues for different message types
prompt: queue<{ prompt: string }, undefined>(),
stop: queue<{ reason?: string }>(),
},
run: async (c) => {
// Only consume from the prompt queue
for await (const promptMessage of c.queue.iter({
names: ["prompt"],
completable: true,
})) {
const stopController = new AbortController();
const runSignal = joinSignals(c.abortSignal, stopController.signal);
// Race: watch for stop messages while generating
c.queue
.next({ names: ["stop"], signal: runSignal })
.then(([stopMessage]) => {
if (stopMessage) stopController.abort();
})
.catch(() => {});
// Generate until complete or cancelled
c.state.running = true;
const { text } = await generateText({
model: openai("gpt-5"),
prompt: promptMessage.body.prompt,
abortSignal: runSignal,
}).finally(() => {
stopController.abort();
c.state.running = false;
});
c.state.messages.push(text);
await promptMessage.complete();
}
},
});
export const registry = setup({ use: { agent } });
import { actor, queue, setup } from "rivetkit";
import { joinSignals } from "rivetkit/utils";
const worker = actor({
state: {},
createVars: () => ({
cancelController: new AbortController(),
}),
queues: {
jobs: queue<{ id: string }>(),
},
actions: {
// Cancel processing from outside
cancelProcessing: async (c) => {
c.vars.cancelController.abort();
},
},
run: async (c) => {
while (!c.aborted) {
// Combine actor shutdown signal with custom cancel signal
const signal = joinSignals(
c.abortSignal,
c.vars.cancelController.signal,
);
try {
const [message] = await c.queue.next({ signal });
if (!message) continue;
console.log("Processing job", message.body.id);
} catch (error) {
// Reset cancel controller if it was a manual cancel
if (c.vars.cancelController.signal.aborted && !c.aborted) {
c.vars.cancelController = new AbortController();
continue;
}
throw error;
}
}
},
});
export const registry = setup({ use: { worker } });
Queues for Agents
Queues are a natural fit for AI agents. Use a prompt queue for incoming messages, a stop queue for cancellation, and SQLite for persistent chat history. The run handler processes messages durably, so the agent survives crashes and picks up where it left off.
import { actor, queue, setup } from "rivetkit";
import { db } from "rivetkit/db";
import { generateText, tool } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
const agent = actor({
// SQLite for persistent chat history
db: db({
onMigrate: async (db) => {
await db.execute(`
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL
)
`);
},
}),
queues: {
prompt: queue<{ content: string }>(),
},
run: async (c) => {
for await (const message of c.queue.iter()) {
// Save user message to SQLite
await c.db.execute(
"INSERT INTO messages (role, content) VALUES (?, ?)",
"user",
message.body.content,
);
// Load full chat history
const history = (await c.db.execute(
"SELECT role, content FROM messages ORDER BY id",
)) as { role: string; content: string }[];
// Generate with tool use
const result = await generateText({
model: openai("gpt-5"),
messages: history.map((row) => ({
role: row.role as "user" | "assistant",
content: row.content,
})),
tools: {
getWeather: tool({
description: "Get the weather for a location",
parameters: z.object({ location: z.string() }),
execute: async ({ location }) => `72F in ${location}`,
}),
},
maxSteps: 5,
});
// Save assistant response
await c.db.execute(
"INSERT INTO messages (role, content) VALUES (?, ?)",
"assistant",
result.text,
);
}
},
});
export const registry = setup({ use: { agent } });
import { actor, queue, setup } from "rivetkit";
import { db } from "rivetkit/db";
import { generateText, tool } from "ai";
import { openai } from "@ai-sdk/openai";
import { joinSignals } from "rivetkit/utils";
import { z } from "zod";
const agent = actor({
db: db({
onMigrate: async (db) => {
await db.execute(`
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL
)
`);
},
}),
queues: {
// Separate queues for prompts and stop signals
prompt: queue<{ content: string }, undefined>(),
stop: queue<{ reason?: string }>(),
},
run: async (c) => {
for await (const message of c.queue.iter({
names: ["prompt"],
completable: true,
})) {
const stopController = new AbortController();
const runSignal = joinSignals(c.abortSignal, stopController.signal);
// Cancel generation if a stop message arrives
c.queue
.next({ names: ["stop"], signal: runSignal })
.then(([stopMessage]) => {
if (stopMessage) stopController.abort();
})
.catch(() => {});
// Save user message to SQLite
await c.db.execute(
"INSERT INTO messages (role, content) VALUES (?, ?)",
"user",
message.body.content,
);
// Load full chat history
const history = (await c.db.execute(
"SELECT role, content FROM messages ORDER BY id",
)) as { role: string; content: string }[];
// Generate with tool use
const result = await generateText({
model: openai("gpt-5"),
messages: history.map((row) => ({
role: row.role as "user" | "assistant",
content: row.content,
})),
tools: {
getWeather: tool({
description: "Get the weather for a location",
parameters: z.object({ location: z.string() }),
execute: async ({ location }) => `72F in ${location}`,
}),
},
maxSteps: 5,
abortSignal: runSignal,
}).finally(() => {
stopController.abort();
});
// Save assistant response
await c.db.execute(
"INSERT INTO messages (role, content) VALUES (?, ?)",
"assistant",
result.text,
);
await message.complete();
}
},
});
export const registry = setup({ use: { agent } });
Request/Response
Three delivery modes depending on what the caller needs:
- Fire-and-forget: send and move on
- Completable: send and wait for acknowledgment
- Request/response: send and await a typed reply
Pairs with Workflows
Feed queue messages into durable workflows. Each workflow step is checkpointed, so crashes pick up where they left off. Combine queues with sleep, join, race, rollback, and human-in-the-loop patterns.
import { actor, queue, setup } from "rivetkit";
import { workflow } from "rivetkit/workflow";
const worker = actor({
state: { processed: 0 },
queues: {
orders: queue<{ orderId: string }>(),
},
// Workflow replays safely on crash or restart
run: workflow(async (ctx) => {
for await (const message of ctx.queue.iter()) {
await ctx.step("charge", async () =>
charge(message.body.orderId),
);
await ctx.step("fulfill", async () =>
fulfill(message.body.orderId),
);
await ctx.step("notify", async () =>
notify(message.body.orderId),
);
}
}),
});
async function charge(orderId: string) { /* ... */ }
async function fulfill(orderId: string) { /* ... */ }
async function notify(orderId: string) { /* ... */ }
export const registry = setup({ use: { worker } });
Built into the Actor
Queues are part of the actor, not a separate service. The same actor has state, SQLite, events, and workflows, all built in. No external broker to provision, no connection strings, no infrastructure to manage.
Plus everything else that comes with Rivet Actors: scale to millions of instances, scale to zero, TypeScript-native, deploy on Cloudflare Workers, Vercel, Railway, or your own infra.
Get Started
Queues are available today in RivetKit.
npm install rivetkit
import { queue } from "rivetkit";