Workflows
Build durable, replayable run loops in Rivet Actors with steps, queue waits, timers, and rollback.
Use workflows for durable, multi-step execution with replay safety.
What are workflows?
A workflow is a durable, replayable run handler for a Rivet Actor.
- Survives restarts: workflow progress is saved automatically.
- Re-runs safely: replay follows the same recorded steps.
- Event-driven: workflows can pause for queue messages, then continue.
Getting started
Simple workflow
Use this when you need a short multi-step sequence.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const invoiceActor = actor({
state: {
invoiceId: null as string | null,
subtotal: 0,
tax: 0,
total: 0,
status: "idle" as "idle" | "complete",
},
run: workflow(async (ctx) => {
const subtotal = await ctx.step("load-subtotal", async () =>
loadSubtotal(),
);
const tax = await ctx.step("calculate-tax", async () =>
calculateTax(subtotal),
);
await ctx.step("save-invoice", async () =>
saveInvoice(ctx, subtotal, tax),
);
}),
actions: {
getState: (c) => c.state,
},
});
async function loadSubtotal(): Promise<number> {
const response = await fetch("https://api.example.com/carts/main");
if (!response.ok) {
throw new Error(`load subtotal failed: ${response.status}`);
}
const cart = (await response.json()) as { subtotal: number };
return cart.subtotal;
}
async function calculateTax(subtotal: number): Promise<number> {
const response = await fetch("https://api.example.com/tax/quote", {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify({ subtotal }),
});
if (!response.ok) {
throw new Error(`tax quote failed: ${response.status}`);
}
const quote = (await response.json()) as { tax: number };
return quote.tax;
}
async function saveInvoice(
ctx: WorkflowContextOf<typeof invoiceActor>,
subtotal: number,
tax: number,
): Promise<void> {
const total = subtotal + tax;
const response = await fetch("https://api.example.com/invoices", {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify({ subtotal, tax, total }),
});
if (!response.ok) {
throw new Error(`save invoice failed: ${response.status}`);
}
const invoice = (await response.json()) as { id: string };
ctx.state.invoiceId = invoice.id;
ctx.state.subtotal = subtotal;
ctx.state.tax = tax;
ctx.state.total = total;
ctx.state.status = "complete";
}
export const registry = setup({ use: { invoiceActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.invoiceActor.getOrCreate(["main"]);
const state = await handle.getState();
console.log(state.status, state.total);
Loops
This is the recommended workflow shape for most actor workloads.
- Use a queue wait inside the loop to receive the next unit of work.
- Keep actor state changes in a single workflow loop.
- This gives you one durable workflow that manages all actor progress.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const workflowCounter = actor({
state: {
value: 0,
processed: 0,
lastOperationId: null as string | null,
},
queues: {
counter: queue<{ delta: number }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("counter-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-counter-command");
await loopCtx.step("apply-counter-command", async () =>
applyCounterCommand(loopCtx, message.body.delta),
);
});
}),
actions: {
getState: (c) => c.state,
},
});
async function applyCounterCommand(
ctx: WorkflowLoopContextOf<typeof workflowCounter>,
delta: number,
): Promise<void> {
const response = await fetch("https://api.example.com/counter/apply", {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify({ delta }),
});
if (!response.ok) {
throw new Error(`counter apply failed: ${response.status}`);
}
const result = (await response.json()) as {
nextValue: number;
operationId: string;
};
ctx.state.value = result.nextValue;
ctx.state.lastOperationId = result.operationId;
ctx.state.processed += 1;
}
export const registry = setup({ use: { workflowCounter } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.workflowCounter.getOrCreate(["main"]);
await handle.send("counter", { delta: 1 });
await handle.send("counter", { delta: 2 });
const state = await handle.getState();
console.log(state.value, state.processed);
Setup & teardown
Use this when the workflow should initialize resources, process queued commands, then clean up.
import { actor, queue, setup } from "rivetkit";
import { Loop, type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type WorkMessage = { amount: number };
type ControlMessage = { type: "stop"; reason: string };
export const setupRunTeardownActor = actor({
state: {
phase: "idle" as "idle" | "running" | "stopped",
total: 0,
processed: 0,
stopReason: null as string | null,
workerSessionId: null as string | null,
},
queues: {
work: queue<WorkMessage>(),
control: queue<ControlMessage>(),
},
run: workflow(async (ctx) => {
await ctx.step("setup", async () => setupWorkerSession(ctx));
const stopReason = await ctx.loop("worker-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-command", {
names: ["work", "control"],
});
if (message.name === "work") {
const work = message.body as WorkMessage;
await loopCtx.step("apply-work", async () =>
applyWorkerMessage(loopCtx, work),
);
return;
}
const control = message.body as ControlMessage;
if (control.type === "stop") {
return Loop.break(control.reason);
}
});
await ctx.step("teardown", async () =>
teardownWorkerSession(ctx, stopReason),
);
}),
actions: {
getState: (c) => c.state,
},
});
async function setupWorkerSession(
ctx: WorkflowContextOf<typeof setupRunTeardownActor>,
): Promise<void> {
const response = await fetch("https://api.example.com/workers/session", {
method: "POST",
});
if (!response.ok) {
throw new Error(`worker setup failed: ${response.status}`);
}
const session = (await response.json()) as { sessionId: string };
ctx.state.workerSessionId = session.sessionId;
ctx.state.phase = "running";
ctx.state.stopReason = null;
}
async function applyWorkerMessage(
ctx: WorkflowLoopContextOf<typeof setupRunTeardownActor>,
work: WorkMessage,
): Promise<void> {
const response = await fetch("https://api.example.com/workers/process", {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify({
sessionId: ctx.state.workerSessionId,
amount: work.amount,
}),
});
if (!response.ok) {
throw new Error(`worker process failed: ${response.status}`);
}
const result = (await response.json()) as { appliedAmount: number };
ctx.state.total += result.appliedAmount;
ctx.state.processed += 1;
}
async function teardownWorkerSession(
ctx: WorkflowContextOf<typeof setupRunTeardownActor>,
stopReason: string,
): Promise<void> {
if (ctx.state.workerSessionId) {
const response = await fetch(
`https://api.example.com/workers/session/${ctx.state.workerSessionId}`,
{ method: "DELETE" },
);
if (!response.ok) {
throw new Error(`worker teardown failed: ${response.status}`);
}
}
ctx.state.phase = "stopped";
ctx.state.stopReason = stopReason;
}
export const registry = setup({ use: { setupRunTeardownActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.setupRunTeardownActor.getOrCreate(["main"]);
await handle.send("work", { amount: 5 });
await handle.send("work", { amount: 3 });
await handle.send("control", { type: "stop", reason: "maintenance" });
const state = await handle.getState();
console.log(state.phase, state.total, state.stopReason);
Features
Queue
Use this for fire-and-forget commands where the client does not need a reply.
Use the Loops example above as the baseline pattern.
Request/response (using queue)
Use this when the caller needs a response from queued processing.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const requestResponseActor = actor({
state: {
handled: 0,
},
queues: {
requests: queue<{ value: number }, { doubled: number }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("request-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-request", {
completable: true,
});
if (!message.complete) return;
const doubled = await loopCtx.step("handle-request", async () => {
loopCtx.state.handled += 1;
return message.body.value * 2;
});
await message.complete({ doubled });
});
}),
});
export const registry = setup({ use: { requestResponseActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.requestResponseActor.getOrCreate(["main"]);
const result = await handle.send(
"requests",
{ value: 21 },
{ wait: true, timeout: 1_000 },
);
if (result.status === "completed") {
const response = result.response as { doubled: number };
console.log(response.doubled);
}
Timers
Use queue messages as the trigger source, then sleep durably inside the workflow.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type Reminder = {
text: string;
at: number;
};
export const reminderActor = actor({
state: {
fired: [] as string[],
},
queues: {
reminders: queue<Reminder>(),
},
run: workflow(async (ctx) => {
await ctx.loop("reminder-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-reminder");
const runAt = Math.max(Date.now(), message.body.at);
await loopCtx.sleepUntil("wait-until-reminder", runAt);
await loopCtx.step("record-reminder", async () => {
loopCtx.state.fired.push(message.body.text);
});
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { reminderActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.reminderActor.getOrCreate(["main"]);
await handle.send("reminders", {
text: "send weekly report",
at: Date.now() + 1_000,
});
await new Promise((resolve) => setTimeout(resolve, 1_300));
console.log(await handle.getState());
Join
Use join when several independent tasks can run in parallel.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const dashboardActor = actor({
state: {
summary: null as null | {
users: number;
orders: number;
revenue: number;
},
},
queues: {
refresh: queue<Record<string, never>>(),
},
run: workflow(async (ctx) => {
await ctx.loop("dashboard-loop", async (loopCtx) => {
await loopCtx.queue.next("wait-refresh");
const summary = await loopCtx.join("fetch-summary", {
users: {
run: async (branchCtx) => {
return await branchCtx.step("fetch-users", () => fetchCount("/users"));
},
},
orders: {
run: async (branchCtx) => {
return await branchCtx.step("fetch-orders", () => fetchCount("/orders"));
},
},
revenue: {
run: async (branchCtx) => {
return await branchCtx.step("fetch-revenue", () => fetchCount("/revenue"));
},
},
});
await loopCtx.step("save-summary", async () => {
loopCtx.state.summary = summary;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function fetchCount(path: string): Promise<number> {
const res = await fetch(`https://api.example.com${path}`);
if (!res.ok) throw new Error(`fetch ${path} failed: ${res.status}`);
return ((await res.json()) as { count: number }).count;
}
export const registry = setup({ use: { dashboardActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.dashboardActor.getOrCreate(["main"]);
await handle.send("refresh", {});
console.log(await handle.getState());
Race
Use race when you need first-winner behavior.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const auctionActor = actor({
state: { result: null as "sold" | "expired" | null },
queues: {
bids: queue<{ amount: number }>(),
},
run: workflow(async (ctx) => {
await ctx.step("list-item", () => listItem("item-123"));
const { winner } = await ctx.race("bid-or-expire", [
{
name: "bid",
run: async (branchCtx) => {
const bid = await branchCtx.queue.next("wait-bid");
return bid.body.amount;
},
},
{
name: "expire",
run: async (branchCtx) => {
await branchCtx.sleep("auction-timeout", 24 * 60 * 60 * 1000);
return 0;
},
},
]);
await ctx.step("finalize", async () => {
await finalizeAuction("item-123", winner);
ctx.state.result = winner === "bid" ? "sold" : "expired";
});
}),
actions: {
getState: (c) => c.state,
},
});
async function listItem(itemId: string): Promise<void> {
await fetch(`https://api.example.com/auctions/${itemId}`, {
method: "POST",
});
}
async function finalizeAuction(
itemId: string,
outcome: string,
): Promise<void> {
await fetch(`https://api.example.com/auctions/${itemId}/finalize`, {
method: "POST",
body: JSON.stringify({ outcome }),
});
}
export const registry = setup({ use: { auctionActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.auctionActor.getOrCreate(["item-123"]);
await handle.send("bids", { amount: 100 });
console.log(await handle.getState());
Timeouts
Use step timeouts and retries for slow or flaky dependencies.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
async function chargeCard(orderId: string): Promise<string> {
return `charge-${orderId}`;
}
export const timeoutActor = actor({
state: {
lastChargeId: null as string | null,
},
queues: {
charge: queue<{ orderId: string }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("charge-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-charge");
const chargeId = await loopCtx.step<string>({
name: "charge-card",
timeout: 5_000,
maxRetries: 5,
retryBackoffBase: 200,
retryBackoffMax: 2_000,
run: async () => await chargeCard(message.body.orderId),
});
await loopCtx.step("save-charge", async () => {
loopCtx.state.lastChargeId = chargeId;
});
});
}),
});
export const registry = setup({ use: { timeoutActor } });
Rollback
Use rollback checkpoints before steps that have compensating actions.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const checkoutActor = actor({
state: { status: "pending" as string },
queues: {
orders: queue<{ orderId: string }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("checkout-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-order");
await loopCtx.rollbackCheckpoint("checkout-checkpoint");
await loopCtx.step<string>({
name: "reserve-inventory",
run: () => reserveInventory(message.body.orderId),
rollback: async (_rollbackCtx, id) => {
await releaseInventory(id as string);
},
});
await loopCtx.step<string>({
name: "charge-card",
run: () => chargeCard(message.body.orderId),
rollback: async (_rollbackCtx, chargeId) => {
await refundCharge(chargeId as string);
},
});
await loopCtx.step("confirm", async () => {
loopCtx.state.status = "confirmed";
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function reserveInventory(orderId: string): Promise<string> {
const res = await fetch("https://api.example.com/inventory/reserve", {
method: "POST",
body: JSON.stringify({ orderId }),
});
return ((await res.json()) as { reservationId: string }).reservationId;
}
async function releaseInventory(reservationId: string): Promise<void> {
await fetch(`https://api.example.com/inventory/${reservationId}/release`, {
method: "POST",
});
}
async function chargeCard(orderId: string): Promise<string> {
const res = await fetch("https://api.stripe.com/v1/charges", {
method: "POST",
headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
body: JSON.stringify({ orderId }),
});
return ((await res.json()) as { id: string }).id;
}
async function refundCharge(chargeId: string): Promise<void> {
await fetch("https://api.stripe.com/v1/refunds", {
method: "POST",
headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
body: JSON.stringify({ charge: chargeId }),
});
}
export const registry = setup({ use: { checkoutActor } });
Patterns
Store workflow progress in state + broadcast
Store progress in state so replay and recovery always restore it. Broadcast state changes so clients can render progress in realtime.
import { actor, event, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type Progress = {
stage: "idle" | "running" | "completed";
completed: number;
total: number;
};
export const progressActor = actor({
state: {
progress: {
stage: "idle",
completed: 0,
total: 0,
} as Progress,
sum: 0,
},
events: {
progressUpdated: event<Progress>(),
},
queues: {
jobs: queue<{ value: number }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("progress-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-job");
await loopCtx.step("mark-running", async () =>
markProgressRunning(loopCtx),
);
await loopCtx.step("apply-job", async () =>
applyProgressJob(loopCtx, message.body.value),
);
});
}),
actions: {
getState: (c) => c.state,
},
});
function markProgressRunning(ctx: WorkflowLoopContextOf<typeof progressActor>): void {
ctx.state.progress = {
stage: "running",
completed: ctx.state.progress.completed,
total: ctx.state.progress.total + 1,
};
ctx.broadcast("progressUpdated", ctx.state.progress);
}
function applyProgressJob(
ctx: WorkflowLoopContextOf<typeof progressActor>,
value: number,
): void {
ctx.state.sum += value;
ctx.state.progress = {
stage: "completed",
completed: ctx.state.progress.completed + 1,
total: ctx.state.progress.total,
};
ctx.broadcast("progressUpdated", ctx.state.progress);
}
export const registry = setup({ use: { progressActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.progressActor.getOrCreate(["main"]);
const conn = handle.connect();
conn.on("progressUpdated", (progress) => {
console.log("progress", progress);
});
await handle.send("jobs", { value: 5 });
await handle.send("jobs", { value: 7 });
console.log(await handle.getState());
Cron (queue-driven)
Rivet scheduling triggers actions. For cron-like workflows, use a small scheduled action as a bridge that enqueues work, then process that work in the workflow loop.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
function nextMinute(timestamp: number): number {
const minuteMs = 60_000;
return Math.floor(timestamp / minuteMs) * minuteMs + minuteMs;
}
export const cronActor = actor({
state: {
runs: 0,
lastRunAt: null as number | null,
},
queues: {
"cron-tick": queue<{ scheduledAt: number }>(),
},
onCreate: async (c) => {
const firstTickAt = nextMinute(Date.now());
await c.schedule.at(firstTickAt, "enqueueCronTick", firstTickAt);
},
actions: {
enqueueCronTick: async (c, scheduledAt: number) => {
await c.queue.send("cron-tick", { scheduledAt });
const nextTickAt = nextMinute(scheduledAt + 1);
await c.schedule.at(nextTickAt, "enqueueCronTick", nextTickAt);
},
getState: (c) => c.state,
},
run: workflow(async (ctx) => {
await ctx.loop("cron-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-cron-tick");
await loopCtx.step("run-cron-job", async () => {
loopCtx.state.runs += 1;
loopCtx.state.lastRunAt = message.body.scheduledAt;
});
});
}),
});
export const registry = setup({ use: { cronActor } });
These are common workflow shapes used in production systems.
Queue-driven worker
Use this when external systems enqueue work and the actor should process each item durably.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type Job = { id: string; amount: number };
export const queueWorkerActor = actor({
state: {
processed: 0,
totalAmount: 0,
},
run: workflow(async (ctx) => {
await ctx.loop("worker-loop", async (loopCtx) => {
const [message] = await loopCtx.queue.nextBatch("wait-job", {
timeout: 30_000,
});
if (!message) return;
const job = message.body as Job;
await loopCtx.step("process-job", async () => {
loopCtx.state.processed += 1;
loopCtx.state.totalAmount += job.amount;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { queueWorkerActor } });
Setup & teardown
Use this when you need one-time initialization before a long-lived loop, plus cleanup when the actor stops sleeping or is destroyed.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
function openResource(): string {
return "connected";
}
function closeResource(_resource: string): void {}
export const setupRunTeardownActor = actor({
vars: {
resource: null as string | null,
},
state: {
initialized: false,
ticks: 0,
},
onWake: (c) => {
c.vars.resource = openResource();
},
onSleep: (c) => {
if (!c.vars.resource) return;
closeResource(c.vars.resource);
c.vars.resource = null;
},
run: workflow(async (ctx) => {
await ctx.step("setup", async () => {
if (!ctx.vars.resource) ctx.vars.resource = openResource();
ctx.state.initialized = true;
});
await ctx.loop("main-loop", async (loopCtx) => {
await loopCtx.sleep("tick", 1_000);
await loopCtx.step("tick-step", async () => {
loopCtx.state.ticks += 1;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { setupRunTeardownActor } });
Human approval gate
Use this when an operation must pause for a user or system decision before continuing.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const approvalGateActor = actor({
state: { status: "pending" as string },
queues: {
approval: queue<{ approved: boolean }>(),
},
run: workflow(async (ctx) => {
await ctx.step("validate-order", async () => {
await validateOrder("order-123");
ctx.state.status = "awaiting_approval";
});
const decision = await ctx.queue.next("wait-approval");
if (decision.body.approved) {
await ctx.step("fulfill-order", async () => {
await fulfillOrder("order-123");
ctx.state.status = "fulfilled";
});
} else {
await ctx.step("cancel-order", async () => {
await cancelOrder("order-123");
ctx.state.status = "cancelled";
});
}
}),
actions: {
getState: (c) => c.state,
},
});
async function validateOrder(orderId: string): Promise<void> {
const res = await fetch(
`https://api.example.com/orders/${orderId}/validate`,
{ method: "POST" },
);
if (!res.ok) throw new Error("Order validation failed");
}
async function fulfillOrder(orderId: string): Promise<void> {
await fetch(`https://api.example.com/orders/${orderId}/fulfill`, {
method: "POST",
});
}
async function cancelOrder(orderId: string): Promise<void> {
await fetch(`https://api.example.com/orders/${orderId}/cancel`, {
method: "POST",
});
}
export const registry = setup({ use: { approvalGateActor } });
Fan-out / fan-in (join)
Use this when independent work items can run in parallel and you need a single merged result.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const fanInOutActor = actor({
state: {
total: 0,
},
run: workflow(async (ctx) => {
await ctx.loop("join-loop", async (loopCtx) => {
const [message] = await loopCtx.queue.nextBatch("wait-refresh", {
timeout: 30_000,
});
if (!message) return;
const joined = await loopCtx.join("parallel-work", {
users: {
run: async (branchCtx) =>
await branchCtx.step("fetch-users", () => fetchCount("/users")),
},
orders: {
run: async (branchCtx) =>
await branchCtx.step("fetch-orders", () => fetchCount("/orders")),
},
invoices: {
run: async (branchCtx) =>
await branchCtx.step("fetch-invoices", () => fetchCount("/invoices")),
},
});
await loopCtx.step("merge-results", async () => {
loopCtx.state.total =
joined.users + joined.orders + joined.invoices;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function fetchCount(path: string): Promise<number> {
const res = await fetch(`https://api.example.com${path}`);
if (!res.ok) throw new Error(`fetch ${path} failed: ${res.status}`);
return ((await res.json()) as { count: number }).count;
}
export const registry = setup({ use: { fanInOutActor } });
Batch drainer
Use this when throughput matters and handling one message at a time is too expensive.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type MetricMessage = { value: number };
export const batchDrainerActor = actor({
state: {
pending: [] as number[],
flushedBatches: 0,
lastBatchTotal: 0,
},
run: workflow(async (ctx) => {
await ctx.loop("drain-loop", async (loopCtx) => {
const [message] = await loopCtx.queue.nextBatch("wait-metric", {
timeout: 5_000,
});
if (message) {
const metric = message.body as MetricMessage;
await loopCtx.step("buffer-message", async () => {
loopCtx.state.pending.push(metric.value);
});
}
if (loopCtx.state.pending.length < 5) return;
await loopCtx.step("flush-batch", async () => flushBatch(loopCtx));
});
}),
actions: {
getState: (c) => c.state,
},
});
function flushBatch(ctx: WorkflowLoopContextOf<typeof batchDrainerActor>): void {
const total = ctx.state.pending.reduce(
(sum: number, value: number) => sum + value,
0,
);
ctx.state.lastBatchTotal = total;
ctx.state.flushedBatches += 1;
ctx.state.pending = [];
}
export const registry = setup({ use: { batchDrainerActor } });
Coordinator -> worker RPC
Use this when one actor orchestrates work by calling actions on other actors.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type TaskMessage = {
taskId: string;
workerId: string;
value: number;
};
export const workerActor = actor({
actions: {
runTask: async (_c, value: number) => value * 2,
},
});
export const coordinatorActor = actor({
state: {
lastTaskId: null as string | null,
lastResult: 0,
},
run: workflow(async (ctx) => {
await ctx.loop("orchestrator-loop", async (loopCtx) => {
const [message] = await loopCtx.queue.nextBatch("wait-task", {
timeout: 30_000,
});
if (!message) return;
const task = message.body as TaskMessage;
const result = await loopCtx.step("dispatch-rpc", async () =>
dispatchTask(loopCtx, task),
);
await loopCtx.step("record-result", async () => {
loopCtx.state.lastTaskId = task.taskId;
loopCtx.state.lastResult = result as number;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function dispatchTask(
ctx: WorkflowLoopContextOf<typeof coordinatorActor>,
task: TaskMessage,
): Promise<number> {
const client = ctx.client();
const worker = client.workerActor.getOrCreate([task.workerId]);
return await worker.runTask(task.value);
}
export const registry = setup({ use: { coordinatorActor, workerActor } });
Request/response over queue (async RPC)
Use this when you want decoupled actor-to-actor communication with durable waits and explicit completion.
import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type RequestMessage = { value: number };
export const requestResponseActor = actor({
state: {
handled: 0,
},
queues: {
requests: queue<RequestMessage, { doubled: number }>(),
},
run: workflow(async (ctx) => {
await ctx.loop("request-response-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-request", {
completable: true,
});
if (!message.complete) return;
const doubled = await loopCtx.step("handle-request", async () => {
loopCtx.state.handled += 1;
return message.body.value * 2;
});
await message.complete({ doubled });
});
}),
});
export const registry = setup({ use: { requestResponseActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.requestResponseActor.getOrCreate(["main"]);
const result = await handle.send("requests", { value: 21 }, { wait: true });
if (result.status === "completed") {
const response = result.response as { doubled: number };
console.log(response.doubled);
}
Scatter-gather across actors
Use this when multiple actors can process independent parts of a request in parallel, then return a merged response.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type ScatterMessage = { input: number };
export const shardActor = actor({
actions: {
compute: async (_c, input: number) => input * 10,
},
});
export const scatterGatherActor = actor({
state: {
lastSum: 0,
},
run: workflow(async (ctx) => {
await ctx.loop("scatter-gather-loop", async (loopCtx) => {
const [message] = await loopCtx.queue.nextBatch("wait-scatter", {
timeout: 30_000,
});
if (!message) return;
const scatter = message.body as ScatterMessage;
const gathered = await loopCtx.join("gather", {
shardA: {
run: async (joinCtx) =>
await joinCtx.step("call-shard-a", async () =>
callShard(joinCtx, "a", scatter.input),
),
},
shardB: {
run: async (joinCtx) =>
await joinCtx.step("call-shard-b", async () =>
callShard(joinCtx, "b", scatter.input),
),
},
shardC: {
run: async (joinCtx) =>
await joinCtx.step("call-shard-c", async () =>
callShard(joinCtx, "c", scatter.input),
),
},
});
await loopCtx.step("aggregate", async () => {
loopCtx.state.lastSum = gathered.shardA + gathered.shardB + gathered.shardC;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function callShard(
ctx: WorkflowLoopContextOf<typeof scatterGatherActor>,
shardId: "a" | "b" | "c",
input: number,
): Promise<number> {
const client = ctx.client();
const handle = client.shardActor.getOrCreate([shardId]);
return await handle.compute(input);
}
export const registry = setup({ use: { scatterGatherActor, shardActor } });
Timeout + fallback actor
Use this when a primary actor call might be slow or unavailable and you need a deterministic fallback path.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const primaryServiceActor = actor({
actions: {
fetchValue: async () => {
await new Promise((resolve) => setTimeout(resolve, 500));
return "primary";
},
},
});
export const fallbackServiceActor = actor({
actions: {
fetchValue: async () => "fallback",
},
});
export const timeoutFallbackActor = actor({
state: {
lastSource: "none" as "none" | "primary" | "fallback",
lastValue: "",
},
run: workflow(async (ctx) => {
await ctx.loop("timeout-loop", async (loopCtx) => {
await loopCtx.queue.nextBatch("wait-request", {
timeout: 30_000,
});
const winner = await loopCtx.race("primary-vs-timeout", [
{
name: "primary",
run: async (raceCtx) =>
await raceCtx.step("call-primary", async () =>
callPrimaryValue(raceCtx),
),
},
{
name: "timeout",
run: async (raceCtx) => {
await raceCtx.sleep("primary-timeout", 200);
return "timeout";
},
},
]);
let value = winner.value as string;
let source: "primary" | "fallback" = "primary";
if (winner.winner === "timeout") {
value = (await loopCtx.step("fallback-call", async () =>
callFallbackValue(loopCtx),
)) as string;
source = "fallback";
}
await loopCtx.step("record-choice", async () => {
loopCtx.state.lastSource = source;
loopCtx.state.lastValue = value;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function callPrimaryValue(
ctx: WorkflowLoopContextOf<typeof timeoutFallbackActor>,
): Promise<string> {
const client = ctx.client();
const primary = client.primaryServiceActor.getOrCreate(["main"]);
return await primary.fetchValue();
}
async function callFallbackValue(
ctx: WorkflowLoopContextOf<typeof timeoutFallbackActor>,
): Promise<string> {
const client = ctx.client();
const fallback = client.fallbackServiceActor.getOrCreate(["main"]);
return await fallback.fetchValue();
}
export const registry = setup({
use: { timeoutFallbackActor, primaryServiceActor, fallbackServiceActor },
});
Cross-actor saga (compensating actions)
Use this when a workflow spans multiple actors and each side effect may need compensation.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type CheckoutMessage = {
orderId: string;
amount: number;
};
export const inventoryActor = actor({
actions: {
reserve: async (_c, orderId: string) => `reserve-${orderId}`,
release: async (_c, reservationId: string) => reservationId,
},
});
export const billingActor = actor({
actions: {
charge: async (_c, amount: number) => `charge-${amount}`,
refund: async (_c, chargeId: string) => chargeId,
},
});
export const checkoutSagaActor = actor({
state: {
completedOrders: 0,
},
run: workflow(async (ctx) => {
await ctx.loop("checkout-loop", async (loopCtx) => {
const [message] = await loopCtx.queue.nextBatch("wait-order", {
timeout: 30_000,
});
if (!message) return;
const checkout = message.body as CheckoutMessage;
await loopCtx.rollbackCheckpoint("checkout-saga");
await loopCtx.step({
name: "reserve-inventory",
run: async () => reserveInventoryForCheckout(loopCtx, checkout.orderId),
rollback: async (_rollbackCtx, output) => {
await releaseInventoryForCheckout(loopCtx, output as string);
},
});
await loopCtx.step({
name: "charge-card",
run: async () => chargeCheckout(loopCtx, checkout.amount),
rollback: async (_rollbackCtx, output) => {
await refundCheckout(loopCtx, output as string);
},
});
await loopCtx.step("mark-complete", async () => markOrderComplete(loopCtx));
});
}),
actions: {
getState: (c) => c.state,
},
});
async function reserveInventoryForCheckout(
ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
orderId: string,
): Promise<string> {
const client = ctx.client();
const inventory = client.inventoryActor.getOrCreate(["main"]);
return await inventory.reserve(orderId);
}
async function releaseInventoryForCheckout(
ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
reservationId: string,
): Promise<void> {
const client = ctx.client();
const inventory = client.inventoryActor.getOrCreate(["main"]);
await inventory.release(reservationId);
}
async function chargeCheckout(
ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
amount: number,
): Promise<string> {
const client = ctx.client();
const billing = client.billingActor.getOrCreate(["main"]);
return await billing.charge(amount);
}
async function refundCheckout(
ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
chargeId: string,
): Promise<void> {
const client = ctx.client();
const billing = client.billingActor.getOrCreate(["main"]);
await billing.refund(chargeId);
}
function markOrderComplete(
ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
): void {
ctx.state.completedOrders += 1;
}
export const registry = setup({
use: { checkoutSagaActor, inventoryActor, billingActor },
});
Signal-driven control loop
Use this when workflow progress should be triggered by commands/events instead of fixed polling intervals.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type ControlSignal = { kind: "pause" | "resume" | "stop" };
export const controlLoopActor = actor({
state: {
mode: "running" as "running" | "paused" | "stopped",
handledSignals: 0,
},
run: workflow(async (ctx) => {
await ctx.loop("control-loop", async (loopCtx) => {
const [message] = await loopCtx.queue.nextBatch("wait-signal", {
timeout: 30_000,
});
if (!message) return;
const signal = message.body as ControlSignal;
await loopCtx.step("apply-signal", async () =>
applyControlSignal(loopCtx, signal.kind),
);
});
}),
actions: {
getState: (c) => c.state,
},
});
function applyControlSignal(
ctx: WorkflowLoopContextOf<typeof controlLoopActor>,
kind: ControlSignal["kind"],
): void {
ctx.state.handledSignals += 1;
if (kind === "pause") ctx.state.mode = "paused";
if (kind === "resume") ctx.state.mode = "running";
if (kind === "stop") ctx.state.mode = "stopped";
}
export const registry = setup({ use: { controlLoopActor } });
Poll + backoff loop
Use this when an external dependency has variable availability and retries should slow down after failures.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
async function pollExternal(attempt: number): Promise<boolean> {
return attempt % 3 === 0;
}
export const pollBackoffActor = actor({
state: {
attempts: 0,
backoffMs: 100,
status: "unknown" as "unknown" | "healthy" | "retrying",
},
run: workflow(async (ctx) => {
await ctx.loop("poll-loop", async (loopCtx) => {
const success = await loopCtx.step("poll-target", async () => {
loopCtx.state.attempts += 1;
return pollExternal(loopCtx.state.attempts);
});
if (success) {
await loopCtx.step("reset-backoff", async () => {
loopCtx.state.status = "healthy";
loopCtx.state.backoffMs = 100;
});
await loopCtx.sleep("healthy-interval", 1_000);
return;
}
await loopCtx.step("grow-backoff", async () => {
loopCtx.state.status = "retrying";
loopCtx.state.backoffMs = Math.min(loopCtx.state.backoffMs * 2, 5_000);
});
await loopCtx.sleep("retry-delay", loopCtx.state.backoffMs);
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { pollBackoffActor } });
Child worker orchestration
Use this when one workflow coordinates many child workers (actors or worker workflows) and manages their lifecycle.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type BatchMessage = { payload: number };
export const childWorkerActor = actor({
actions: {
process: async (_c, payload: number) => payload * 3,
},
});
export const orchestratorActor = actor({
state: {
lastTotal: 0,
},
run: workflow(async (ctx) => {
await ctx.step("start-children", async () => startChildren(ctx));
await ctx.loop("orchestrate-loop", async (loopCtx) => {
const [message] = await loopCtx.queue.nextBatch("wait-batch", {
timeout: 30_000,
});
if (!message) return;
const batch = message.body as BatchMessage;
const results = await loopCtx.join("collect-updates", {
a: {
run: async (joinCtx) =>
await joinCtx.step("run-child-a", async () =>
runChildWorker(joinCtx, "child-a", batch.payload),
),
},
b: {
run: async (joinCtx) =>
await joinCtx.step("run-child-b", async () =>
runChildWorker(joinCtx, "child-b", batch.payload),
),
},
c: {
run: async (joinCtx) =>
await joinCtx.step("run-child-c", async () =>
runChildWorker(joinCtx, "child-c", batch.payload),
),
},
});
await loopCtx.step("reconcile", async () => {
loopCtx.state.lastTotal = results.a + results.b + results.c;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
async function startChildren(
ctx: WorkflowContextOf<typeof orchestratorActor>,
): Promise<void> {
const client = ctx.client();
await client.childWorkerActor.getOrCreate(["child-a"]).process(0);
await client.childWorkerActor.getOrCreate(["child-b"]).process(0);
await client.childWorkerActor.getOrCreate(["child-c"]).process(0);
}
async function runChildWorker(
ctx: WorkflowBranchContextOf<typeof orchestratorActor>,
workerId: "child-a" | "child-b" | "child-c",
payload: number,
): Promise<number> {
const client = ctx.client();
return await client.childWorkerActor.getOrCreate([workerId]).process(payload);
}
export const registry = setup({ use: { orchestratorActor, childWorkerActor } });
Bounded drain + concurrency cap
Use this when inbound work can spike and you need predictable per-iteration limits.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type WorkMessage = { id: string; value: number };
const MAX_PER_ITERATION = 10;
const CONCURRENCY_LIMIT = 3;
async function processWork(value: number): Promise<number> {
return value * 2;
}
async function runWithLimit<T>(
limit: number,
items: T[],
fn: (item: T) => Promise<void>,
): Promise<void> {
let nextIndex = 0;
const workers = Array.from({ length: limit }, async () => {
while (nextIndex < items.length) {
const current = items[nextIndex];
nextIndex += 1;
await fn(current);
}
});
await Promise.all(workers);
}
export const boundedDrainActor = actor({
state: {
processed: 0,
lastWindowSize: 0,
lastWindowTotal: 0,
},
run: workflow(async (ctx) => {
await ctx.loop("bounded-drain-loop", async (loopCtx) => {
const window: WorkMessage[] = [];
for (let i = 0; i < MAX_PER_ITERATION; i += 1) {
const [message] = await loopCtx.queue.nextBatch("wait-work", {
timeout: i === 0 ? 30_000 : 10,
});
if (!message) break;
window.push(message.body as WorkMessage);
}
if (window.length === 0) return;
await loopCtx.step("process-window", async () =>
processWindow(loopCtx, window),
);
});
}),
actions: {
getState: (c) => c.state,
},
});
async function processWindow(
ctx: WorkflowLoopContextOf<typeof boundedDrainActor>,
window: WorkMessage[],
): Promise<void> {
let windowTotal = 0;
await runWithLimit(CONCURRENCY_LIMIT, window, async (work) => {
const result = await processWork(work.value);
windowTotal += result;
});
ctx.state.processed += window.length;
ctx.state.lastWindowSize = window.length;
ctx.state.lastWindowTotal = windowTotal;
}
export const registry = setup({ use: { boundedDrainActor } });
Versioned workflow evolution
Use this when workflow structure changes across deployments and old histories must still replay.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
export const versionedWorkflowActor = actor({
state: {
runs: 0,
},
run: workflow(async (ctx) => {
await ctx.step("validate-v2", async () => {
ctx.state.runs += 1;
});
await ctx.removed("validate-v1", "step");
await ctx.loop("main-loop-v2", async (loopCtx) => {
await loopCtx.sleep("idle", 500);
await loopCtx.step("heartbeat-v2", async () => {
loopCtx.state.runs += 1;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { versionedWorkflowActor } });
Checkpoint-friendly loop design
Use this when you need reliable replay and resume semantics across crashes and restarts.
import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";
type PaymentMessage = { id: string; amount: number };
export const checkpointFriendlyActor = actor({
state: {
appliedCount: 0,
totalAmount: 0,
lastPaymentId: null as string | null,
},
run: workflow(async (ctx) => {
await ctx.loop("payment-loop", async (loopCtx) => {
const [message] = await loopCtx.queue.nextBatch("wait-payment", {
timeout: 30_000,
});
if (!message) return;
const payment = message.body as PaymentMessage;
await loopCtx.rollbackCheckpoint("apply-payment-checkpoint");
const plan = (await loopCtx.step("build-plan", async () =>
buildPaymentPlan(payment),
)) as { paymentId: string; amount: number };
await loopCtx.step("apply-side-effects", async () => {
loopCtx.state.appliedCount += 1;
loopCtx.state.totalAmount += plan.amount;
loopCtx.state.lastPaymentId = plan.paymentId;
});
});
}),
actions: {
getState: (c) => c.state,
},
});
function buildPaymentPlan(payment: PaymentMessage): {
paymentId: string;
amount: number;
} {
return {
paymentId: payment.id,
amount: payment.amount,
};
}
export const registry = setup({ use: { checkpointFriendlyActor } });
Migrations
- Keep workflow entry names stable once deployed.
- If an old entry was removed or renamed, call
ctx.removed(name, originalType). - This keeps replay compatible across deployments.
Step-only access to actor APIs
state, vars, db, client(), and connection/event APIs are only valid inside ctx.step(...) callbacks.
Use non-step workflow code for orchestration only: queue waits, sleeps, loops, joins, races, and rollback boundaries. Keep actor-local side effects in steps.
Debugging
GET /inspector/workflow-historyreturns workflow history status for an actor.- Response includes
isWorkflowEnabledandhistory. - In non-dev mode, inspector endpoints require authorization.
Recommendations
- Prefer queue-driven loops for long-lived workflows.
- Structure long-lived workflows with setup and teardown around the main loop.
- Keep actor state changes and side effects inside steps.
- Store workflow progress in
stateand broadcast updates as progress changes. - Use timeouts and rollback for external side effects.