Workflow Agents

Build durable multi-step agents with workflow(), useWorkflow(), and workflows.signal()

workflow() is Gencow's durable execution primitive for long-running AI jobs, human approval checkpoints, and multi-step work that must resume safely after retries or restarts.

When to use workflow()

Use workflow() when:

  • the job can exceed normal mutation timeout
  • the work must survive crash, sleep, or restart
  • a human approval or external event should resume the flow later
  • some steps are expensive and should not rerun after partial success
  • you want operational visibility in React, CLI, and Dashboard Jobs

Use a normal procedure.mutation when the whole job is short and synchronous.

Scaffold the starter

gencow add Agent

This adds:

  • gencow/agent.ts with a workflow("agents.run", ...) starter
  • gencow/ai.ts when AI is not installed yet
  • import "./agent"; in gencow/index.ts

import "./agent"; is required. Without it, the workflow will not be registered for codegen/runtime. workflow itself must be imported from @gencow/core. Do not create gencow/workflow.ts or import ./workflow inside gencow/.

Mental model

workflow() is not a background thread. It is a resumable handler that re-enters from the top while completed checkpoints are memoized.

start mutation
  -> _gencow_workflows row created
  -> scheduler queues resume action
  -> runner claims the row
  -> handler executes
  -> completed wf.step() outputs are memoized in _gencow_workflow_steps
  -> retry/sleep/wait resumes the same workflow later

Example

import { workflow, v } from "@gencow/core";
import { ai } from "./ai";

export const runAgent = workflow("agents.run", {
    args: {
        goal: v.string(),
        requireApproval: v.optional(v.boolean()),
    },
    handler: async (wf, args) => {
        const plan = await wf.step("plan", async () => {
            const result = await ai.chat({
                system: "Return a short actionable plan.",
                messages: [{ role: "user", content: args.goal }],
            });
            return result.text;
        });

        const [research, critique] = await wf.parallel([
            async () => {
                const result = await ai.chat({
                    system: "Gather supporting facts and constraints.",
                    messages: [{ role: "user", content: `Goal: ${args.goal}\n\nPlan:\n${plan}` }],
                });
                return result.text;
            },
            async () => {
                const result = await ai.chat({
                    system: "Find gaps and risks.",
                    messages: [{ role: "user", content: `Goal: ${args.goal}\n\nPlan:\n${plan}` }],
                });
                return result.text;
            },
        ] as const);

        if (args.requireApproval) {
            await wf.waitForEvent<{ approved?: boolean }>("approval", "24h");
        }

        return wf.step("finalize", async () => {
            const result = await ai.chat({
                system: "Synthesize research and critique.",
                messages: [{
                    role: "user",
                    content: `Goal: ${args.goal}\n\nPlan:\n${plan}\n\nResearch:\n${research}\n\nCritique:\n${critique}`,
                }],
            });
            return result.text;
        });
    },
});

Runtime primitives

  • wf.step(name, fn) — memoized checkpoint
  • wf.parallel([...]) — branch fan-out with partial progress retention
  • wf.waitForEvent(name, timeout?) — wait for an external signal with optional timeout
  • wf.sleep(duration) — pause and resume without consuming retries

Authoring rules

  • Keep step names stable across retries and deploys. wf.step("plan") is safe; wf.step(\plan-${Date.now()}`)` is not.
  • Import workflow from @gencow/core, never from a local gencow/workflow.ts fork.
  • Return only JSON-serializable values from args, step outputs, signal payloads, and final results.
  • Put side effects inside named steps and make them idempotent when possible.
  • Use wf.waitForEvent(..., timeout) when the business process needs a deadline.
  • Keep normal synchronous work in procedure.mutation; use workflow() when resume semantics matter.

Track from React

import { api } from "@/gencow/api";
import { useMutation, useWorkflow } from "@gencow/react";

const { mutate: start } = useMutation(api.agents.run);
const { mutate: signal } = useMutation(api.workflows.signal);

const run = await start({ goal: "Launch brief", requireApproval: true });
const state = useWorkflow(api.workflows.get, run.id);

await signal({
    id: run.id,
    event: "approval",
    payload: { approved: true, note: "Looks good" },
});

useWorkflow() prefers exact-id realtime updates and falls back to safe polling while the run is active.

Useful fields on the workflow snapshot:

  • status — canonical engine status (pending, running, completed, failed)
  • derivedStatus — display-oriented state (queued, waiting, sleeping, running, completed, failed)
  • currentStep — current checkpoint name
  • steps — step timeline with output and error snapshots

State model

Canonical workflow status:

  • pending
  • running
  • completed
  • failed

Display state:

  • pending + sleep#... -> sleeping
  • pending + wait:... -> waiting
  • other pending -> queued
  • running/completed/failed -> unchanged

This display-oriented value is exposed as derivedStatus.

Approval and external events

Use wf.waitForEvent() for human approval, webhook continuation, or any out-of-band signal.

const approval = await wf.waitForEvent<{ approved: boolean; note?: string }>(
    "approval",
    "30m"
);

Then resume it with:

await signal({
    id: workflowId,
    event: "approval",
    payload: { approved: true, note: "Ship it" },
});

If the timeout elapses before the signal arrives, the workflow fails terminally without consuming retry backoff.

Ownership and system tables

Workflow runtime rows live in platform-owned tables:

  • _gencow_workflows
  • _gencow_workflow_steps
  • _gencow_workflow_events

Treat these as internal runtime state, not app schema:

  • do not declare them in schema.ts
  • do not write migrations that ALTER, DROP, or TRUNCATE them
  • keep drizzle.config.ts filtering them out with tablesFilter: ["!_system_*", "!_gencow_*"]

For app UI and dashboards:

  • prefer useWorkflow(api.workflows.get, run.id) for exact-id inspection
  • if you build a custom workflow list query with ctx.unsafeDb, require auth and filter by user_id
  • never expose a public unfiltered list of workflow rows

Observe and operate

You can inspect workflow runs in:

  • React via useWorkflow(api.workflows.get, run.id)
  • CLI via gencow jobs
  • Cloud Dashboard Jobs tab

Failed runs can be retried, and completed or failed runs can be replayed from the Jobs UI/API. Pending or running runs can be cancelled as a best-effort stop, and terminal runs can be deleted. Stale runs can be inspected first with gencow jobs workflow cleanup.

Operational semantics:

  • Retry continues the same workflow run and preserves completed checkpoints.
  • Replay creates a fresh workflow run from the original args and starts from scratch.
  • Cancel marks the run terminal and removes queued resume jobs when possible, but it does not preempt an already executing handler.
  • Delete removes a terminal run and its stored step/event snapshots.

BaaS vs local behavior

  • In BaaS, workflow resume uses the Platform's DB-backed scheduler and survives app sleep or restart.
  • In local dev, the auto-created scheduler is still in-memory, so restart durability is best-effort.
  • If you want to verify crash/sleep recovery semantics, treat the cloud/BaaS path as the source of truth.

Common mistakes

  • Forgetting import "./agent"; in gencow/index.ts
  • Using unstable step names across resumes
  • Returning non-JSON-serializable values from steps
  • Putting important side effects outside wf.step()
  • Assuming local dev restart behavior is identical to BaaS durability
  • Leaving wf.waitForEvent() unbounded when the process needs a deadline