Skip to main content
Sign In
Features

Workflows

Build durable, replayable run loops in Rivet Actors with steps, queue waits, timers, and rollback.

Use workflows for durable, multi-step execution with replay safety.

What are workflows?

A workflow is a durable, replayable run handler for a Rivet Actor.

  • Survives restarts: workflow progress is saved automatically.
  • Re-runs safely: replay follows the same recorded steps.
  • Event-driven: workflows can pause for queue messages, then continue.

Getting started

Simple workflow

Use this when you need a short multi-step sequence.

Loops

This is the recommended workflow shape for most actor workloads.

  • Use a queue wait inside the loop to receive the next unit of work.
  • Keep actor state changes in a single workflow loop.
  • This gives you one durable workflow that manages all actor progress.

Setup & teardown

Use this when the workflow should initialize resources, process queued commands, then clean up.

Features

Queue

Use this for fire-and-forget commands where the client does not need a reply.

Use the Loops example above as the baseline pattern.

Request/response (using queue)

Use this when the caller needs a response from queued processing.

Timers

Use queue messages as the trigger source, then sleep durably inside the workflow.

Join

Use join when several independent tasks can run in parallel.

Race

Use race when you need first-winner behavior.

Timeouts

Use step timeouts and retries for slow or flaky dependencies.

import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

async function chargeCard(orderId: string): Promise<string> {
  return `charge-${orderId}`;
}

export const timeoutActor = actor({
  state: {
    lastChargeId: null as string | null,
  },
  queues: {
    charge: queue<{ orderId: string }>(),
  },
  run: workflow(async (ctx) => {
    await ctx.loop("charge-loop", async (loopCtx) => {
        const message = await loopCtx.queue.next("wait-charge");

        const chargeId = await loopCtx.step<string>({
          name: "charge-card",
          timeout: 5_000,
          maxRetries: 5,
          retryBackoffBase: 200,
          retryBackoffMax: 2_000,
          run: async () => await chargeCard(message.body.orderId),
        });

        await loopCtx.step("save-charge", async () => {
          loopCtx.state.lastChargeId = chargeId;
        });

      });
  }),
});

export const registry = setup({ use: { timeoutActor } });

Rollback

Use rollback checkpoints before steps that have compensating actions.

import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const checkoutActor = actor({
  state: { status: "pending" as string },
  queues: {
    orders: queue<{ orderId: string }>(),
  },
  run: workflow(async (ctx) => {
    await ctx.loop("checkout-loop", async (loopCtx) => {
        const message = await loopCtx.queue.next("wait-order");

        await loopCtx.rollbackCheckpoint("checkout-checkpoint");

        await loopCtx.step<string>({
          name: "reserve-inventory",
          run: () => reserveInventory(message.body.orderId),
          rollback: async (_rollbackCtx, id) => {
            await releaseInventory(id as string);
          },
        });

        await loopCtx.step<string>({
          name: "charge-card",
          run: () => chargeCard(message.body.orderId),
          rollback: async (_rollbackCtx, chargeId) => {
            await refundCharge(chargeId as string);
          },
        });

        await loopCtx.step("confirm", async () => {
          loopCtx.state.status = "confirmed";
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function reserveInventory(orderId: string): Promise<string> {
  const res = await fetch("https://api.example.com/inventory/reserve", {
    method: "POST",
    body: JSON.stringify({ orderId }),
  });
  return ((await res.json()) as { reservationId: string }).reservationId;
}

async function releaseInventory(reservationId: string): Promise<void> {
  await fetch(`https://api.example.com/inventory/${reservationId}/release`, {
    method: "POST",
  });
}

async function chargeCard(orderId: string): Promise<string> {
  const res = await fetch("https://api.stripe.com/v1/charges", {
    method: "POST",
    headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
    body: JSON.stringify({ orderId }),
  });
  return ((await res.json()) as { id: string }).id;
}

async function refundCharge(chargeId: string): Promise<void> {
  await fetch("https://api.stripe.com/v1/refunds", {
    method: "POST",
    headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
    body: JSON.stringify({ charge: chargeId }),
  });
}

export const registry = setup({ use: { checkoutActor } });

Patterns

Store workflow progress in state + broadcast

Store progress in state so replay and recovery always restore it. Broadcast state changes so clients can render progress in realtime.

Cron (queue-driven)

Rivet scheduling triggers actions. For cron-like workflows, use a small scheduled action as a bridge that enqueues work, then process that work in the workflow loop.

import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

function nextMinute(timestamp: number): number {
  const minuteMs = 60_000;
  return Math.floor(timestamp / minuteMs) * minuteMs + minuteMs;
}

export const cronActor = actor({
  state: {
    runs: 0,
    lastRunAt: null as number | null,
  },
  queues: {
    "cron-tick": queue<{ scheduledAt: number }>(),
  },
  onCreate: async (c) => {
    const firstTickAt = nextMinute(Date.now());
    await c.schedule.at(firstTickAt, "enqueueCronTick", firstTickAt);
  },
  actions: {
    enqueueCronTick: async (c, scheduledAt: number) => {
      await c.queue.send("cron-tick", { scheduledAt });

      const nextTickAt = nextMinute(scheduledAt + 1);
      await c.schedule.at(nextTickAt, "enqueueCronTick", nextTickAt);
    },
    getState: (c) => c.state,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("cron-loop", async (loopCtx) => {
        const message = await loopCtx.queue.next("wait-cron-tick");

        await loopCtx.step("run-cron-job", async () => {
          loopCtx.state.runs += 1;
          loopCtx.state.lastRunAt = message.body.scheduledAt;
        });

      });
  }),
});

export const registry = setup({ use: { cronActor } });

These are common workflow shapes used in production systems.

Queue-driven worker

Use this when external systems enqueue work and the actor should process each item durably.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type Job = { id: string; amount: number };

export const queueWorkerActor = actor({
  state: {
    processed: 0,
    totalAmount: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("worker-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-job", {
          timeout: 30_000,
        });

        if (!message) return;
        const job = message.body as Job;

        await loopCtx.step("process-job", async () => {
          loopCtx.state.processed += 1;
          loopCtx.state.totalAmount += job.amount;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { queueWorkerActor } });

Setup & teardown

Use this when you need one-time initialization before a long-lived loop, plus cleanup when the actor stops sleeping or is destroyed.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

function openResource(): string {
  return "connected";
}

function closeResource(_resource: string): void {}

export const setupRunTeardownActor = actor({
  vars: {
    resource: null as string | null,
  },
  state: {
    initialized: false,
    ticks: 0,
  },
  onWake: (c) => {
    c.vars.resource = openResource();
  },
  onSleep: (c) => {
    if (!c.vars.resource) return;
    closeResource(c.vars.resource);
    c.vars.resource = null;
  },
  run: workflow(async (ctx) => {
    await ctx.step("setup", async () => {
      if (!ctx.vars.resource) ctx.vars.resource = openResource();
      ctx.state.initialized = true;
    });

    await ctx.loop("main-loop", async (loopCtx) => {
        await loopCtx.sleep("tick", 1_000);
        await loopCtx.step("tick-step", async () => {
          loopCtx.state.ticks += 1;
        });
      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { setupRunTeardownActor } });

Human approval gate

Use this when an operation must pause for a user or system decision before continuing.

import { actor, queue, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const approvalGateActor = actor({
  state: { status: "pending" as string },
  queues: {
    approval: queue<{ approved: boolean }>(),
  },
  run: workflow(async (ctx) => {
    await ctx.step("validate-order", async () => {
      await validateOrder("order-123");
      ctx.state.status = "awaiting_approval";
    });

    const decision = await ctx.queue.next("wait-approval");

    if (decision.body.approved) {
      await ctx.step("fulfill-order", async () => {
        await fulfillOrder("order-123");
        ctx.state.status = "fulfilled";
      });
    } else {
      await ctx.step("cancel-order", async () => {
        await cancelOrder("order-123");
        ctx.state.status = "cancelled";
      });
    }
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function validateOrder(orderId: string): Promise<void> {
  const res = await fetch(
    `https://api.example.com/orders/${orderId}/validate`,
    { method: "POST" },
  );
  if (!res.ok) throw new Error("Order validation failed");
}

async function fulfillOrder(orderId: string): Promise<void> {
  await fetch(`https://api.example.com/orders/${orderId}/fulfill`, {
    method: "POST",
  });
}

async function cancelOrder(orderId: string): Promise<void> {
  await fetch(`https://api.example.com/orders/${orderId}/cancel`, {
    method: "POST",
  });
}

export const registry = setup({ use: { approvalGateActor } });

Fan-out / fan-in (join)

Use this when independent work items can run in parallel and you need a single merged result.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const fanInOutActor = actor({
  state: {
    total: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("join-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-refresh", {
          timeout: 30_000,
        });

        if (!message) return;

        const joined = await loopCtx.join("parallel-work", {
          users: {
            run: async (branchCtx) =>
              await branchCtx.step("fetch-users", () => fetchCount("/users")),
          },
          orders: {
            run: async (branchCtx) =>
              await branchCtx.step("fetch-orders", () => fetchCount("/orders")),
          },
          invoices: {
            run: async (branchCtx) =>
              await branchCtx.step("fetch-invoices", () => fetchCount("/invoices")),
          },
        });

        await loopCtx.step("merge-results", async () => {
          loopCtx.state.total =
            joined.users + joined.orders + joined.invoices;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function fetchCount(path: string): Promise<number> {
  const res = await fetch(`https://api.example.com${path}`);
  if (!res.ok) throw new Error(`fetch ${path} failed: ${res.status}`);
  return ((await res.json()) as { count: number }).count;
}

export const registry = setup({ use: { fanInOutActor } });

Batch drainer

Use this when throughput matters and handling one message at a time is too expensive.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type MetricMessage = { value: number };

export const batchDrainerActor = actor({
  state: {
    pending: [] as number[],
    flushedBatches: 0,
    lastBatchTotal: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("drain-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-metric", {
          timeout: 5_000,
        });

        if (message) {
          const metric = message.body as MetricMessage;
          await loopCtx.step("buffer-message", async () => {
            loopCtx.state.pending.push(metric.value);
          });
        }

        if (loopCtx.state.pending.length < 5) return;

        await loopCtx.step("flush-batch", async () => flushBatch(loopCtx));

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

function flushBatch(ctx: WorkflowLoopContextOf<typeof batchDrainerActor>): void {
  const total = ctx.state.pending.reduce(
    (sum: number, value: number) => sum + value,
    0,
  );
  ctx.state.lastBatchTotal = total;
  ctx.state.flushedBatches += 1;
  ctx.state.pending = [];
}

export const registry = setup({ use: { batchDrainerActor } });

Coordinator -> worker RPC

Use this when one actor orchestrates work by calling actions on other actors.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type TaskMessage = {
  taskId: string;
  workerId: string;
  value: number;
};

export const workerActor = actor({
  actions: {
    runTask: async (_c, value: number) => value * 2,
  },
});

export const coordinatorActor = actor({
  state: {
    lastTaskId: null as string | null,
    lastResult: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("orchestrator-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-task", {
          timeout: 30_000,
        });

        if (!message) return;
        const task = message.body as TaskMessage;

        const result = await loopCtx.step("dispatch-rpc", async () =>
          dispatchTask(loopCtx, task),
        );

        await loopCtx.step("record-result", async () => {
          loopCtx.state.lastTaskId = task.taskId;
          loopCtx.state.lastResult = result as number;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function dispatchTask(
  ctx: WorkflowLoopContextOf<typeof coordinatorActor>,
  task: TaskMessage,
): Promise<number> {
  const client = ctx.client();
  const worker = client.workerActor.getOrCreate([task.workerId]);
  return await worker.runTask(task.value);
}

export const registry = setup({ use: { coordinatorActor, workerActor } });

Request/response over queue (async RPC)

Use this when you want decoupled actor-to-actor communication with durable waits and explicit completion.

Scatter-gather across actors

Use this when multiple actors can process independent parts of a request in parallel, then return a merged response.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type ScatterMessage = { input: number };

export const shardActor = actor({
  actions: {
    compute: async (_c, input: number) => input * 10,
  },
});

export const scatterGatherActor = actor({
  state: {
    lastSum: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("scatter-gather-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-scatter", {
          timeout: 30_000,
        });

        if (!message) return;
        const scatter = message.body as ScatterMessage;

        const gathered = await loopCtx.join("gather", {
          shardA: {
            run: async (joinCtx) =>
              await joinCtx.step("call-shard-a", async () =>
                callShard(joinCtx, "a", scatter.input),
              ),
          },
          shardB: {
            run: async (joinCtx) =>
              await joinCtx.step("call-shard-b", async () =>
                callShard(joinCtx, "b", scatter.input),
              ),
          },
          shardC: {
            run: async (joinCtx) =>
              await joinCtx.step("call-shard-c", async () =>
                callShard(joinCtx, "c", scatter.input),
              ),
          },
        });

        await loopCtx.step("aggregate", async () => {
          loopCtx.state.lastSum = gathered.shardA + gathered.shardB + gathered.shardC;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function callShard(
  ctx: WorkflowLoopContextOf<typeof scatterGatherActor>,
  shardId: "a" | "b" | "c",
  input: number,
): Promise<number> {
  const client = ctx.client();
  const handle = client.shardActor.getOrCreate([shardId]);
  return await handle.compute(input);
}

export const registry = setup({ use: { scatterGatherActor, shardActor } });

Timeout + fallback actor

Use this when a primary actor call might be slow or unavailable and you need a deterministic fallback path.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const primaryServiceActor = actor({
  actions: {
    fetchValue: async () => {
      await new Promise((resolve) => setTimeout(resolve, 500));
      return "primary";
    },
  },
});

export const fallbackServiceActor = actor({
  actions: {
    fetchValue: async () => "fallback",
  },
});

export const timeoutFallbackActor = actor({
  state: {
    lastSource: "none" as "none" | "primary" | "fallback",
    lastValue: "",
  },
  run: workflow(async (ctx) => {
    await ctx.loop("timeout-loop", async (loopCtx) => {
        await loopCtx.queue.nextBatch("wait-request", {
          timeout: 30_000,
        });

        const winner = await loopCtx.race("primary-vs-timeout", [
          {
            name: "primary",
            run: async (raceCtx) =>
              await raceCtx.step("call-primary", async () =>
                callPrimaryValue(raceCtx),
              ),
          },
          {
            name: "timeout",
            run: async (raceCtx) => {
              await raceCtx.sleep("primary-timeout", 200);
              return "timeout";
            },
          },
        ]);

        let value = winner.value as string;
        let source: "primary" | "fallback" = "primary";

        if (winner.winner === "timeout") {
          value = (await loopCtx.step("fallback-call", async () =>
            callFallbackValue(loopCtx),
          )) as string;
          source = "fallback";
        }

        await loopCtx.step("record-choice", async () => {
          loopCtx.state.lastSource = source;
          loopCtx.state.lastValue = value;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function callPrimaryValue(
  ctx: WorkflowLoopContextOf<typeof timeoutFallbackActor>,
): Promise<string> {
  const client = ctx.client();
  const primary = client.primaryServiceActor.getOrCreate(["main"]);
  return await primary.fetchValue();
}

async function callFallbackValue(
  ctx: WorkflowLoopContextOf<typeof timeoutFallbackActor>,
): Promise<string> {
  const client = ctx.client();
  const fallback = client.fallbackServiceActor.getOrCreate(["main"]);
  return await fallback.fetchValue();
}

export const registry = setup({
  use: { timeoutFallbackActor, primaryServiceActor, fallbackServiceActor },
});

Cross-actor saga (compensating actions)

Use this when a workflow spans multiple actors and each side effect may need compensation.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type CheckoutMessage = {
  orderId: string;
  amount: number;
};

export const inventoryActor = actor({
  actions: {
    reserve: async (_c, orderId: string) => `reserve-${orderId}`,
    release: async (_c, reservationId: string) => reservationId,
  },
});

export const billingActor = actor({
  actions: {
    charge: async (_c, amount: number) => `charge-${amount}`,
    refund: async (_c, chargeId: string) => chargeId,
  },
});

export const checkoutSagaActor = actor({
  state: {
    completedOrders: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("checkout-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-order", {
          timeout: 30_000,
        });

        if (!message) return;
        const checkout = message.body as CheckoutMessage;

        await loopCtx.rollbackCheckpoint("checkout-saga");

        await loopCtx.step({
          name: "reserve-inventory",
          run: async () => reserveInventoryForCheckout(loopCtx, checkout.orderId),
          rollback: async (_rollbackCtx, output) => {
            await releaseInventoryForCheckout(loopCtx, output as string);
          },
        });

        await loopCtx.step({
          name: "charge-card",
          run: async () => chargeCheckout(loopCtx, checkout.amount),
          rollback: async (_rollbackCtx, output) => {
            await refundCheckout(loopCtx, output as string);
          },
        });

        await loopCtx.step("mark-complete", async () => markOrderComplete(loopCtx));

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function reserveInventoryForCheckout(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
  orderId: string,
): Promise<string> {
  const client = ctx.client();
  const inventory = client.inventoryActor.getOrCreate(["main"]);
  return await inventory.reserve(orderId);
}

async function releaseInventoryForCheckout(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
  reservationId: string,
): Promise<void> {
  const client = ctx.client();
  const inventory = client.inventoryActor.getOrCreate(["main"]);
  await inventory.release(reservationId);
}

async function chargeCheckout(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
  amount: number,
): Promise<string> {
  const client = ctx.client();
  const billing = client.billingActor.getOrCreate(["main"]);
  return await billing.charge(amount);
}

async function refundCheckout(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
  chargeId: string,
): Promise<void> {
  const client = ctx.client();
  const billing = client.billingActor.getOrCreate(["main"]);
  await billing.refund(chargeId);
}

function markOrderComplete(
  ctx: WorkflowLoopContextOf<typeof checkoutSagaActor>,
): void {
  ctx.state.completedOrders += 1;
}

export const registry = setup({
  use: { checkoutSagaActor, inventoryActor, billingActor },
});

Signal-driven control loop

Use this when workflow progress should be triggered by commands/events instead of fixed polling intervals.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type ControlSignal = { kind: "pause" | "resume" | "stop" };

export const controlLoopActor = actor({
  state: {
    mode: "running" as "running" | "paused" | "stopped",
    handledSignals: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("control-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-signal", {
          timeout: 30_000,
        });

        if (!message) return;
        const signal = message.body as ControlSignal;

        await loopCtx.step("apply-signal", async () =>
          applyControlSignal(loopCtx, signal.kind),
        );

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

function applyControlSignal(
  ctx: WorkflowLoopContextOf<typeof controlLoopActor>,
  kind: ControlSignal["kind"],
): void {
  ctx.state.handledSignals += 1;
  if (kind === "pause") ctx.state.mode = "paused";
  if (kind === "resume") ctx.state.mode = "running";
  if (kind === "stop") ctx.state.mode = "stopped";
}

export const registry = setup({ use: { controlLoopActor } });

Poll + backoff loop

Use this when an external dependency has variable availability and retries should slow down after failures.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

async function pollExternal(attempt: number): Promise<boolean> {
  return attempt % 3 === 0;
}

export const pollBackoffActor = actor({
  state: {
    attempts: 0,
    backoffMs: 100,
    status: "unknown" as "unknown" | "healthy" | "retrying",
  },
  run: workflow(async (ctx) => {
    await ctx.loop("poll-loop", async (loopCtx) => {
        const success = await loopCtx.step("poll-target", async () => {
          loopCtx.state.attempts += 1;
          return pollExternal(loopCtx.state.attempts);
        });

        if (success) {
          await loopCtx.step("reset-backoff", async () => {
            loopCtx.state.status = "healthy";
            loopCtx.state.backoffMs = 100;
          });
          await loopCtx.sleep("healthy-interval", 1_000);
          return;
        }

        await loopCtx.step("grow-backoff", async () => {
          loopCtx.state.status = "retrying";
          loopCtx.state.backoffMs = Math.min(loopCtx.state.backoffMs * 2, 5_000);
        });

        await loopCtx.sleep("retry-delay", loopCtx.state.backoffMs);
      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { pollBackoffActor } });

Child worker orchestration

Use this when one workflow coordinates many child workers (actors or worker workflows) and manages their lifecycle.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type BatchMessage = { payload: number };

export const childWorkerActor = actor({
  actions: {
    process: async (_c, payload: number) => payload * 3,
  },
});

export const orchestratorActor = actor({
  state: {
    lastTotal: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.step("start-children", async () => startChildren(ctx));

    await ctx.loop("orchestrate-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-batch", {
          timeout: 30_000,
        });

        if (!message) return;
        const batch = message.body as BatchMessage;

        const results = await loopCtx.join("collect-updates", {
          a: {
            run: async (joinCtx) =>
              await joinCtx.step("run-child-a", async () =>
                runChildWorker(joinCtx, "child-a", batch.payload),
              ),
          },
          b: {
            run: async (joinCtx) =>
              await joinCtx.step("run-child-b", async () =>
                runChildWorker(joinCtx, "child-b", batch.payload),
              ),
          },
          c: {
            run: async (joinCtx) =>
              await joinCtx.step("run-child-c", async () =>
                runChildWorker(joinCtx, "child-c", batch.payload),
              ),
          },
        });

        await loopCtx.step("reconcile", async () => {
          loopCtx.state.lastTotal = results.a + results.b + results.c;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function startChildren(
  ctx: WorkflowContextOf<typeof orchestratorActor>,
): Promise<void> {
  const client = ctx.client();
  await client.childWorkerActor.getOrCreate(["child-a"]).process(0);
  await client.childWorkerActor.getOrCreate(["child-b"]).process(0);
  await client.childWorkerActor.getOrCreate(["child-c"]).process(0);
}

async function runChildWorker(
  ctx: WorkflowBranchContextOf<typeof orchestratorActor>,
  workerId: "child-a" | "child-b" | "child-c",
  payload: number,
): Promise<number> {
  const client = ctx.client();
  return await client.childWorkerActor.getOrCreate([workerId]).process(payload);
}

export const registry = setup({ use: { orchestratorActor, childWorkerActor } });

Bounded drain + concurrency cap

Use this when inbound work can spike and you need predictable per-iteration limits.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type WorkMessage = { id: string; value: number };

const MAX_PER_ITERATION = 10;
const CONCURRENCY_LIMIT = 3;

async function processWork(value: number): Promise<number> {
  return value * 2;
}

async function runWithLimit<T>(
  limit: number,
  items: T[],
  fn: (item: T) => Promise<void>,
): Promise<void> {
  let nextIndex = 0;
  const workers = Array.from({ length: limit }, async () => {
    while (nextIndex < items.length) {
      const current = items[nextIndex];
      nextIndex += 1;
      await fn(current);
    }
  });
  await Promise.all(workers);
}

export const boundedDrainActor = actor({
  state: {
    processed: 0,
    lastWindowSize: 0,
    lastWindowTotal: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("bounded-drain-loop", async (loopCtx) => {
        const window: WorkMessage[] = [];

        for (let i = 0; i < MAX_PER_ITERATION; i += 1) {
          const [message] = await loopCtx.queue.nextBatch("wait-work", {
            timeout: i === 0 ? 30_000 : 10,
          });
          if (!message) break;
          window.push(message.body as WorkMessage);
        }

        if (window.length === 0) return;

        await loopCtx.step("process-window", async () =>
          processWindow(loopCtx, window),
        );

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

async function processWindow(
  ctx: WorkflowLoopContextOf<typeof boundedDrainActor>,
  window: WorkMessage[],
): Promise<void> {
  let windowTotal = 0;
  await runWithLimit(CONCURRENCY_LIMIT, window, async (work) => {
    const result = await processWork(work.value);
    windowTotal += result;
  });

  ctx.state.processed += window.length;
  ctx.state.lastWindowSize = window.length;
  ctx.state.lastWindowTotal = windowTotal;
}

export const registry = setup({ use: { boundedDrainActor } });

Versioned workflow evolution

Use this when workflow structure changes across deployments and old histories must still replay.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

export const versionedWorkflowActor = actor({
  state: {
    runs: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.step("validate-v2", async () => {
      ctx.state.runs += 1;
    });

    await ctx.removed("validate-v1", "step");

    await ctx.loop("main-loop-v2", async (loopCtx) => {
        await loopCtx.sleep("idle", 500);
        await loopCtx.step("heartbeat-v2", async () => {
          loopCtx.state.runs += 1;
        });
      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { versionedWorkflowActor } });

Checkpoint-friendly loop design

Use this when you need reliable replay and resume semantics across crashes and restarts.

import { actor, setup } from "rivetkit";
import { type WorkflowContextOf, type WorkflowLoopContextOf, type WorkflowBranchContextOf, workflow } from "rivetkit/workflow";

type PaymentMessage = { id: string; amount: number };

export const checkpointFriendlyActor = actor({
  state: {
    appliedCount: 0,
    totalAmount: 0,
    lastPaymentId: null as string | null,
  },
  run: workflow(async (ctx) => {
    await ctx.loop("payment-loop", async (loopCtx) => {
        const [message] = await loopCtx.queue.nextBatch("wait-payment", {
          timeout: 30_000,
        });

        if (!message) return;
        const payment = message.body as PaymentMessage;

        await loopCtx.rollbackCheckpoint("apply-payment-checkpoint");

        const plan = (await loopCtx.step("build-plan", async () =>
          buildPaymentPlan(payment),
        )) as { paymentId: string; amount: number };

        await loopCtx.step("apply-side-effects", async () => {
          loopCtx.state.appliedCount += 1;
          loopCtx.state.totalAmount += plan.amount;
          loopCtx.state.lastPaymentId = plan.paymentId;
        });

      });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

function buildPaymentPlan(payment: PaymentMessage): {
  paymentId: string;
  amount: number;
} {
  return {
    paymentId: payment.id,
    amount: payment.amount,
  };
}

export const registry = setup({ use: { checkpointFriendlyActor } });

Migrations

  • Keep workflow entry names stable once deployed.
  • If an old entry was removed or renamed, call ctx.removed(name, originalType).
  • This keeps replay compatible across deployments.

Step-only access to actor APIs

state, vars, db, client(), and connection/event APIs are only valid inside ctx.step(...) callbacks.

Use non-step workflow code for orchestration only: queue waits, sleeps, loops, joins, races, and rollback boundaries. Keep actor-local side effects in steps.

Debugging

  • GET /inspector/workflow-history returns workflow history status for an actor.
  • Response includes isWorkflowEnabled and history.
  • In non-dev mode, inspector endpoints require authorization.

Recommendations

  • Prefer queue-driven loops for long-lived workflows.
  • Structure long-lived workflows with setup and teardown around the main loop.
  • Keep actor state changes and side effects inside steps.
  • Store workflow progress in state and broadcast updates as progress changes.
  • Use timeouts and rollback for external side effects.