Orchestration
Workflow Automation
Orchestrate multi-step agent tasks with durable workflows.
- Durable workflows that survive crashes and restarts
- Multi-step orchestration with sessions, file operations, and process execution
- Error handling and retry via
ctx.step()for each operation - Agent chaining where output of one session feeds into the next
Basic workflow
Use the actor workflow() primitive to orchestrate a multi-step agent task. Each step is durable and will resume from where it left off after a restart.
Session creation and prompting must happen within the same step because sessions are ephemeral and won’t survive a replay.
import { agentOs } from "rivetkit/agent-os";
import common from "@rivet-dev/agent-os-common";
import pi from "@rivet-dev/agent-os-pi";
import { actor, setup, workflow } from "rivetkit";
const automator = actor({
workflows: {
fixBug: workflow<{ repo: string; issue: string }>(),
},
run: async (c) => {
for await (const message of c.workflow.iter("fixBug")) {
const { repo, issue } = message.body;
const agentHandle = c.actors.vm.getOrCreate([`fix-${issue}`]);
// Step 1: Clone the repo
await c.step("clone-repo", async () => {
return agentHandle.exec(`git clone ${repo} /home/user/repo`);
});
// Step 2: Agent fixes the bug (session lives within this step)
await c.step("fix-bug", async () => {
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(
session.sessionId,
`Fix the bug described in issue: ${issue}`,
);
await agentHandle.closeSession(session.sessionId);
});
// Step 3: Run tests
const tests = await c.step("run-tests", async () => {
return agentHandle.exec("cd /home/user/repo && npm test");
});
console.log("Tests exit code:", tests.exitCode);
await message.complete();
}
},
});
const vm = agentOs({
options: { software: [common, pi] },
});
export const registry = setup({ use: { automator, vm } });
registry.start();
import { createClient } from "rivetkit/client";
import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.automator.getOrCreate(["main"]);
// Trigger the workflow
await handle.send("fixBug", {
repo: "https://github.com/example/repo.git",
issue: "Fix the login redirect bug",
});
Agent chaining
Output of one agent session feeds into the next. Each session is created and completed within its own step.
import { agentOs } from "rivetkit/agent-os";
import common from "@rivet-dev/agent-os-common";
import pi from "@rivet-dev/agent-os-pi";
import { actor, setup, workflow } from "rivetkit";
const pipeline = actor({
workflows: {
codeReview: workflow<{ filePath: string }>(),
},
run: async (c) => {
for await (const message of c.workflow.iter("codeReview")) {
const agentHandle = c.actors.vm.getOrCreate([`review-${Date.now()}`]);
// Step 1: Agent reviews code and writes findings to a file
await c.step("review", async () => {
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(
session.sessionId,
`Review the code at ${message.body.filePath} and write your findings to /home/user/review.md`,
);
await agentHandle.closeSession(session.sessionId);
});
// Step 2: Read the review from the filesystem
const review = await c.step("read-review", async () => {
const content = await agentHandle.readFile("/home/user/review.md");
return new TextDecoder().decode(content);
});
// Step 3: Second session applies fixes based on the review
await c.step("fix", async () => {
const session = await agentHandle.createSession("pi", {
env: { ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY! },
});
await agentHandle.sendPrompt(
session.sessionId,
`Apply the following review feedback:\n\n${review}`,
);
await agentHandle.closeSession(session.sessionId);
});
await message.complete();
}
},
});
const vm = agentOs({
options: { software: [common, pi] },
});
export const registry = setup({ use: { pipeline, vm } });
registry.start();
import { createClient } from "rivetkit/client";
import type { registry } from "./server";
const client = createClient<typeof registry>("http://localhost:6420");
const handle = client.pipeline.getOrCreate(["main"]);
await handle.send("codeReview", { filePath: "/home/user/src/auth.ts" });
Recommendations
- Create and close sessions within the same step. Sessions are ephemeral and won’t exist after a workflow replays.
- Pass data between steps via the filesystem or step return values, not session state.
- Keep step names stable across code changes. Renaming a step breaks replay for in-progress workflows.
- Use separate actors for the workflow orchestrator and the agentOS VM.
- See Workflows for the full workflow API reference including timers, joins, and races.