> For the complete documentation index, see [llms.txt](/llms.txt).
> A full single-fetch corpus is available at [llms-full.txt](/llms-full.txt).
---
title: Workflows
description: Define durable orchestrators that sequence functions, agents, and human approvals into reliable pipelines.
last_verified: 2026-06-02
---

A **workflow** is a durable orchestrator that sequences functions, agents, and human approvals into a reliable pipeline. If a workflow crashes mid-run, it restarts and picks up from the last completed step rather than starting over.

---

## Defining a workflow

Decorate any async function with `@workflow`. The function is registered automatically at import time.

```python
from agnt5 import workflow, WorkflowContext

@workflow
async def onboarding_workflow(ctx: WorkflowContext, user_email: str) -> dict:
    account = await ctx.step(create_account, user_email)
    await ctx.step(send_welcome_email, account["id"])
    return {"status": "done", "account_id": account["id"]}
```

The first parameter must be `ctx: WorkflowContext`. Everything after that is your workflow's input (whatever the caller passes) when triggering the run.

| Parameter | Type | Default | Description |
|---|---|---|---|
| `name` | `str` | `function.__name__` | Name registered with the platform |
| `triggers` | `list[TriggerSpec]` | `None` | Event or webhook triggers that start this workflow automatically |

---

## Steps: the unit of durable work

Use `ctx.step()` to call a `@function` inside a workflow. The result is checkpointed after the first successful run. On replay (after a restart), the cached result is returned immediately. The function is not called again.

```python
from agnt5 import workflow, WorkflowContext, function, FunctionContext

@function
async def fetch_user(ctx: FunctionContext, user_id: str) -> dict:
    # call your database here
    return {"id": user_id, "name": "Ada"}

@function
async def send_email(ctx: FunctionContext, user: dict, subject: str) -> str:
    # call your email provider here
    return f"Sent to {user['name']}"

@workflow
async def notify_workflow(ctx: WorkflowContext, user_id: str) -> str:
    user   = await ctx.step(fetch_user, user_id)
    result = await ctx.step(send_email, user, "Welcome!")
    return result
```

Calling `await fetch_user(ctx, user_id)` directly also works, but that result is **not** checkpointed. The function re-runs every time the workflow replays.

| Call style | Checkpointed | Use when |
|---|---|---|
| `await ctx.step(fn, *args)` | Yes | Always prefer this inside a workflow |
| `await fn(ctx, *args)` | No | One-off calls where replay is fine |

---

## Running in parallel

### `ctx.parallel()`: run tasks concurrently, collect in order

```python
@workflow
async def report_workflow(ctx: WorkflowContext, report_id: str) -> dict:
    sales, inventory, customers = await ctx.parallel(
        ctx.step(fetch_sales, report_id),
        ctx.step(fetch_inventory, report_id),
        ctx.step(fetch_customers, report_id),
    )
    return await ctx.step(compile_report, sales, inventory, customers)
```

Results come back in the same order as the tasks.

### `ctx.gather()`: run tasks concurrently, collect by name

```python
@workflow
async def dashboard_workflow(ctx: WorkflowContext) -> dict:
    data = await ctx.gather(
        revenue=fetch_revenue(),
        users=fetch_active_users(),
        errors=fetch_error_rate(),
    )
    # data["revenue"], data["users"], data["errors"]
    return data
```

### `ctx.batch()`: run a function across many inputs

Use `batch()` when you have a list of items and want to process them in parallel with controlled concurrency.

```python
@function
async def process_document(ctx: FunctionContext, doc_id: str) -> dict:
    # process one document
    return {"doc_id": doc_id, "status": "processed"}

@workflow
async def bulk_processor(ctx: WorkflowContext, doc_ids: list[str]) -> dict:
    result = await ctx.batch(
        process_document,
        [{"doc_id": d} for d in doc_ids],
        max_concurrency=20,
    )
    return {
        "processed": result.stats.completed_items,
        "failed":    result.stats.failed_items,
    }
```

`ctx.map()` is a simpler wrapper around `batch()`. It returns just the outputs and raises if any item fails:

```python
outputs = await ctx.map(process_document, [{"doc_id": d} for d in doc_ids])
```

---

## Durable sleep

`ctx.sleep()` pauses the workflow for a set duration. Unlike `asyncio.sleep()`, it survives restarts. If the worker crashes during the wait, the workflow resumes and only sleeps for the time that remains.

```python
@workflow
async def follow_up_workflow(ctx: WorkflowContext, user_id: str) -> str:
    await ctx.step(send_confirmation, user_id)

    await ctx.sleep(24 * 60 * 60, name="wait_24h")   # wait 24 hours

    await ctx.step(send_follow_up, user_id)
    return "Follow-up sent."
```

---

## State

Workflows have access to three state scopes through `ctx`.

```python
@workflow
async def stateful_workflow(ctx: WorkflowContext, user_id: str) -> dict:
    # Run-scoped: lives for this run only
    await ctx.state.set("phase", "started")

    # Session-scoped: persists across turns for the same session
    count = await ctx.session.state.get("visit_count", 0)
    await ctx.session.state.set("visit_count", count + 1)

    return {"visits": count + 1}
```

| Scope | Access | Persists |
|---|---|---|
| Run | `ctx.state` | Current run only. Cleared when the run finishes |
| Session | `ctx.session.state` | Across multiple runs with the same `session_id` |

---

## Triggering workflows

### Event trigger

Start a workflow automatically when a named event fires.

```python
from agnt5 import workflow, WorkflowContext
from agnt5.types import event

@workflow(triggers=[event("user.signed_up")])
async def welcome_workflow(ctx: WorkflowContext, user_id: str) -> str:
    await ctx.step(send_welcome_email, user_id)
    return "Welcome email sent."
```

### Webhook trigger

Start a workflow from an incoming webhook.

```python
from agnt5.types import webhook

@workflow(triggers=[webhook("stripe", event="payment_intent.succeeded")])
async def payment_workflow(ctx: WorkflowContext, amount: int, currency: str) -> str:
    await ctx.step(record_payment, amount, currency)
    return "Payment recorded."
```

---

## Human-in-the-loop

Pause a workflow and wait for user input using `ctx.wait_for_user()`. See [Human-in-the-loop](/docs/build/human-in-the-loop.md) for the full reference.

```python
@workflow
async def approval_workflow(ctx: WorkflowContext, order_id: str) -> dict:
    summary = await ctx.step(prepare_order_summary, order_id)

    decision = await ctx.wait_for_user(
        question=f"Approve this order?\n\n{summary}",
        input_type="approval",
        options=[
            {"id": "approve", "label": "Approve"},
            {"id": "reject",  "label": "Reject"},
        ],
    )

    if decision == "reject":
        return {"status": "rejected"}

    result = await ctx.step(fulfil_order, order_id)
    return result
```

---

## Real-world example

An order processing pipeline: validate the order, charge the customer, and send a confirmation, all as durable steps. If any step fails, the workflow retries from that point without re-running earlier steps.

```python
from agnt5 import workflow, WorkflowContext, function, FunctionContext


@function
async def validate_order(ctx: FunctionContext, order_id: str) -> dict:
    # check stock, validate address, etc.
    return {"order_id": order_id, "total": 89.99, "valid": True}


@function
async def charge_customer(ctx: FunctionContext, order_id: str, amount: float) -> dict:
    # call your payments API
    return {"order_id": order_id, "charge_id": "ch_abc123", "status": "paid"}


@function
async def send_confirmation(ctx: FunctionContext, order_id: str, charge_id: str) -> str:
    # send confirmation email
    return f"Confirmation sent for order {order_id}"


@workflow
async def order_workflow(ctx: WorkflowContext, order_id: str) -> dict:
    # Step 1: validate
    order = await ctx.step(validate_order, order_id)

    if not order["valid"]:
        return {"status": "invalid", "order_id": order_id}

    # Step 2: charge (checkpointed, never charged twice even if workflow restarts)
    charge = await ctx.step(charge_customer, order_id, order["total"])

    # Step 3: confirm
    await ctx.step(send_confirmation, order_id, charge["charge_id"])

    return {"status": "complete", "order_id": order_id, "charge_id": charge["charge_id"]}
```

Each step is checkpointed. If the worker restarts between step 2 and step 3, the charge is not made again. The saved result is used and execution continues at step 3.
