Skip to content
Docs
Build Workflows

Workflows

Define durable orchestrators that sequence functions, agents, and human approvals into reliable pipelines.

A workflow is a durable orchestrator that sequences functions, agents, and human approvals into a reliable pipeline. If a workflow crashes mid-run, it restarts and picks up from the last completed step rather than starting over.


Defining a workflow

Decorate any async function with @workflow. The function is registered automatically at import time.

from agnt5 import workflow, WorkflowContext

@workflow
async def onboarding_workflow(ctx: WorkflowContext, user_email: str) -> dict:
    account = await ctx.step(create_account, user_email)
    await ctx.step(send_welcome_email, account["id"])
    return {"status": "done", "account_id": account["id"]}

The first parameter must be ctx: WorkflowContext. Everything after that is your workflow’s input (whatever the caller passes) when triggering the run.

Parameter Type Default Description
name str function.__name__ Name registered with the platform
triggers list[TriggerSpec] None Event or webhook triggers that start this workflow automatically

Steps: the unit of durable work

Use ctx.step() to call a @function inside a workflow. The result is checkpointed after the first successful run. On replay (after a restart), the cached result is returned immediately. The function is not called again.

from agnt5 import workflow, WorkflowContext, function, FunctionContext

@function
async def fetch_user(ctx: FunctionContext, user_id: str) -> dict:
    # call your database here
    return {"id": user_id, "name": "Ada"}

@function
async def send_email(ctx: FunctionContext, user: dict, subject: str) -> str:
    # call your email provider here
    return f"Sent to {user['name']}"

@workflow
async def notify_workflow(ctx: WorkflowContext, user_id: str) -> str:
    user   = await ctx.step(fetch_user, user_id)
    result = await ctx.step(send_email, user, "Welcome!")
    return result

Calling await fetch_user(ctx, user_id) directly also works, but that result is not checkpointed. The function re-runs every time the workflow replays.

Call style Checkpointed Use when
await ctx.step(fn, *args) Yes Always prefer this inside a workflow
await fn(ctx, *args) No One-off calls where replay is fine

Running in parallel

ctx.parallel(): run tasks concurrently, collect in order

@workflow
async def report_workflow(ctx: WorkflowContext, report_id: str) -> dict:
    sales, inventory, customers = await ctx.parallel(
        ctx.step(fetch_sales, report_id),
        ctx.step(fetch_inventory, report_id),
        ctx.step(fetch_customers, report_id),
    )
    return await ctx.step(compile_report, sales, inventory, customers)

Results come back in the same order as the tasks.

ctx.gather(): run tasks concurrently, collect by name

@workflow
async def dashboard_workflow(ctx: WorkflowContext) -> dict:
    data = await ctx.gather(
        revenue=fetch_revenue(),
        users=fetch_active_users(),
        errors=fetch_error_rate(),
    )
    # data["revenue"], data["users"], data["errors"]
    return data

ctx.batch(): run a function across many inputs

Use batch() when you have a list of items and want to process them in parallel with controlled concurrency.

@function
async def process_document(ctx: FunctionContext, doc_id: str) -> dict:
    # process one document
    return {"doc_id": doc_id, "status": "processed"}

@workflow
async def bulk_processor(ctx: WorkflowContext, doc_ids: list[str]) -> dict:
    result = await ctx.batch(
        process_document,
        [{"doc_id": d} for d in doc_ids],
        max_concurrency=20,
    )
    return {
        "processed": result.stats.completed_items,
        "failed":    result.stats.failed_items,
    }

ctx.map() is a simpler wrapper around batch(). It returns just the outputs and raises if any item fails:

outputs = await ctx.map(process_document, [{"doc_id": d} for d in doc_ids])

Durable sleep

ctx.sleep() pauses the workflow for a set duration. Unlike asyncio.sleep(), it survives restarts. If the worker crashes during the wait, the workflow resumes and only sleeps for the time that remains.

@workflow
async def follow_up_workflow(ctx: WorkflowContext, user_id: str) -> str:
    await ctx.step(send_confirmation, user_id)

    await ctx.sleep(24 * 60 * 60, name="wait_24h")   # wait 24 hours

    await ctx.step(send_follow_up, user_id)
    return "Follow-up sent."

State

Workflows have access to three state scopes through ctx.

@workflow
async def stateful_workflow(ctx: WorkflowContext, user_id: str) -> dict:
    # Run-scoped: lives for this run only
    await ctx.state.set("phase", "started")

    # Session-scoped: persists across turns for the same session
    count = await ctx.session.state.get("visit_count", 0)
    await ctx.session.state.set("visit_count", count + 1)

    return {"visits": count + 1}
Scope Access Persists
Run ctx.state Current run only. Cleared when the run finishes
Session ctx.session.state Across multiple runs with the same session_id

Triggering workflows

Event trigger

Start a workflow automatically when a named event fires.

from agnt5 import workflow, WorkflowContext
from agnt5.types import event

@workflow(triggers=[event("user.signed_up")])
async def welcome_workflow(ctx: WorkflowContext, user_id: str) -> str:
    await ctx.step(send_welcome_email, user_id)
    return "Welcome email sent."

Webhook trigger

Start a workflow from an incoming webhook.

from agnt5.types import webhook

@workflow(triggers=[webhook("stripe", event="payment_intent.succeeded")])
async def payment_workflow(ctx: WorkflowContext, amount: int, currency: str) -> str:
    await ctx.step(record_payment, amount, currency)
    return "Payment recorded."

Human-in-the-loop

Pause a workflow and wait for user input using ctx.wait_for_user(). See Human-in-the-loop for the full reference.

@workflow
async def approval_workflow(ctx: WorkflowContext, order_id: str) -> dict:
    summary = await ctx.step(prepare_order_summary, order_id)

    decision = await ctx.wait_for_user(
        question=f"Approve this order?\n\n{summary}",
        input_type="approval",
        options=[
            {"id": "approve", "label": "Approve"},
            {"id": "reject",  "label": "Reject"},
        ],
    )

    if decision == "reject":
        return {"status": "rejected"}

    result = await ctx.step(fulfil_order, order_id)
    return result

Real-world example

An order processing pipeline: validate the order, charge the customer, and send a confirmation, all as durable steps. If any step fails, the workflow retries from that point without re-running earlier steps.

from agnt5 import workflow, WorkflowContext, function, FunctionContext


@function
async def validate_order(ctx: FunctionContext, order_id: str) -> dict:
    # check stock, validate address, etc.
    return {"order_id": order_id, "total": 89.99, "valid": True}


@function
async def charge_customer(ctx: FunctionContext, order_id: str, amount: float) -> dict:
    # call your payments API
    return {"order_id": order_id, "charge_id": "ch_abc123", "status": "paid"}


@function
async def send_confirmation(ctx: FunctionContext, order_id: str, charge_id: str) -> str:
    # send confirmation email
    return f"Confirmation sent for order {order_id}"


@workflow
async def order_workflow(ctx: WorkflowContext, order_id: str) -> dict:
    # Step 1: validate
    order = await ctx.step(validate_order, order_id)

    if not order["valid"]:
        return {"status": "invalid", "order_id": order_id}

    # Step 2: charge (checkpointed, never charged twice even if workflow restarts)
    charge = await ctx.step(charge_customer, order_id, order["total"])

    # Step 3: confirm
    await ctx.step(send_confirmation, order_id, charge["charge_id"])

    return {"status": "complete", "order_id": order_id, "charge_id": charge["charge_id"]}

Each step is checkpointed. If the worker restarts between step 2 and step 3, the charge is not made again. The saved result is used and execution continues at step 3.

© 2026 AGNT5
llms.txt