Workflow Agents
Build durable multi-step agents with workflow(), useWorkflow(), and workflows.signal()
workflow() is Gencow's durable execution primitive for long-running AI jobs, human approval checkpoints, and multi-step work that must resume safely after retries or restarts.
When to use workflow()
Use workflow() when:
- the job can exceed normal mutation timeout
- the work must survive crash, sleep, or restart
- a human approval or external event should resume the flow later
- some steps are expensive and should not rerun after partial success
- you want operational visibility in React, CLI, and Dashboard Jobs
Use a normal procedure.mutation when the whole job is short and synchronous.
Scaffold the starter
gencow add AgentThis adds:
gencow/agent.tswith aworkflow("agents.run", ...)startergencow/ai.tswhen AI is not installed yetimport "./agent";ingencow/index.ts
import "./agent";is required. Without it, the workflow will not be registered for codegen/runtime.workflowitself must be imported from@gencow/core. Do not creategencow/workflow.tsor import./workflowinsidegencow/.
Mental model
workflow() is not a background thread. It is a resumable handler that re-enters from the top while completed checkpoints are memoized.
start mutation
-> _gencow_workflows row created
-> scheduler queues resume action
-> runner claims the row
-> handler executes
-> completed wf.step() outputs are memoized in _gencow_workflow_steps
-> retry/sleep/wait resumes the same workflow laterExample
import { workflow, v } from "@gencow/core";
import { ai } from "./ai";
export const runAgent = workflow("agents.run", {
args: {
goal: v.string(),
requireApproval: v.optional(v.boolean()),
},
handler: async (wf, args) => {
const plan = await wf.step("plan", async () => {
const result = await ai.chat({
system: "Return a short actionable plan.",
messages: [{ role: "user", content: args.goal }],
});
return result.text;
});
const [research, critique] = await wf.parallel([
async () => {
const result = await ai.chat({
system: "Gather supporting facts and constraints.",
messages: [{ role: "user", content: `Goal: ${args.goal}\n\nPlan:\n${plan}` }],
});
return result.text;
},
async () => {
const result = await ai.chat({
system: "Find gaps and risks.",
messages: [{ role: "user", content: `Goal: ${args.goal}\n\nPlan:\n${plan}` }],
});
return result.text;
},
] as const);
if (args.requireApproval) {
await wf.waitForEvent<{ approved?: boolean }>("approval", "24h");
}
return wf.step("finalize", async () => {
const result = await ai.chat({
system: "Synthesize research and critique.",
messages: [{
role: "user",
content: `Goal: ${args.goal}\n\nPlan:\n${plan}\n\nResearch:\n${research}\n\nCritique:\n${critique}`,
}],
});
return result.text;
});
},
});Runtime primitives
wf.step(name, fn)— memoized checkpointwf.parallel([...])— branch fan-out with partial progress retentionwf.waitForEvent(name, timeout?)— wait for an external signal with optional timeoutwf.sleep(duration)— pause and resume without consuming retries
Authoring rules
- Keep step names stable across retries and deploys.
wf.step("plan")is safe;wf.step(\plan-${Date.now()}`)` is not. - Import
workflowfrom@gencow/core, never from a localgencow/workflow.tsfork. - Return only JSON-serializable values from args, step outputs, signal payloads, and final results.
- Put side effects inside named steps and make them idempotent when possible.
- Use
wf.waitForEvent(..., timeout)when the business process needs a deadline. - Keep normal synchronous work in
procedure.mutation; useworkflow()when resume semantics matter.
Track from React
import { api } from "@/gencow/api";
import { useMutation, useWorkflow } from "@gencow/react";
const { mutate: start } = useMutation(api.agents.run);
const { mutate: signal } = useMutation(api.workflows.signal);
const run = await start({ goal: "Launch brief", requireApproval: true });
const state = useWorkflow(api.workflows.get, run.id);
await signal({
id: run.id,
event: "approval",
payload: { approved: true, note: "Looks good" },
});useWorkflow() prefers exact-id realtime updates and falls back to safe polling while the run is active.
Useful fields on the workflow snapshot:
status— canonical engine status (pending,running,completed,failed)derivedStatus— display-oriented state (queued,waiting,sleeping,running,completed,failed)currentStep— current checkpoint namesteps— step timeline with output and error snapshots
State model
Canonical workflow status:
pendingrunningcompletedfailed
Display state:
pending + sleep#...->sleepingpending + wait:...->waiting- other
pending->queued running/completed/failed-> unchanged
This display-oriented value is exposed as derivedStatus.
Approval and external events
Use wf.waitForEvent() for human approval, webhook continuation, or any out-of-band signal.
const approval = await wf.waitForEvent<{ approved: boolean; note?: string }>(
"approval",
"30m"
);Then resume it with:
await signal({
id: workflowId,
event: "approval",
payload: { approved: true, note: "Ship it" },
});If the timeout elapses before the signal arrives, the workflow fails terminally without consuming retry backoff.
Ownership and system tables
Workflow runtime rows live in platform-owned tables:
_gencow_workflows_gencow_workflow_steps_gencow_workflow_events
Treat these as internal runtime state, not app schema:
- do not declare them in
schema.ts - do not write migrations that
ALTER,DROP, orTRUNCATEthem - keep
drizzle.config.tsfiltering them out withtablesFilter: ["!_system_*", "!_gencow_*"]
For app UI and dashboards:
- prefer
useWorkflow(api.workflows.get, run.id)for exact-id inspection - if you build a custom workflow list query with
ctx.unsafeDb, require auth and filter byuser_id - never expose a public unfiltered list of workflow rows
Observe and operate
You can inspect workflow runs in:
- React via
useWorkflow(api.workflows.get, run.id) - CLI via
gencow jobs - Cloud Dashboard Jobs tab
Failed runs can be retried, and completed or failed runs can be replayed from the Jobs UI/API.
Pending or running runs can be cancelled as a best-effort stop, and terminal runs can be deleted. Stale runs can be inspected first with gencow jobs workflow cleanup.
Operational semantics:
Retrycontinues the same workflow run and preserves completed checkpoints.Replaycreates a fresh workflow run from the original args and starts from scratch.Cancelmarks the run terminal and removes queued resume jobs when possible, but it does not preempt an already executing handler.Deleteremoves a terminal run and its stored step/event snapshots.
BaaS vs local behavior
- In BaaS, workflow resume uses the Platform's DB-backed scheduler and survives app sleep or restart.
- In local dev, the auto-created scheduler is still in-memory, so restart durability is best-effort.
- If you want to verify crash/sleep recovery semantics, treat the cloud/BaaS path as the source of truth.
Common mistakes
- Forgetting
import "./agent";ingencow/index.ts - Using unstable step names across resumes
- Returning non-JSON-serializable values from steps
- Putting important side effects outside
wf.step() - Assuming local dev restart behavior is identical to BaaS durability
- Leaving
wf.waitForEvent()unbounded when the process needs a deadline