Workflows
Define durable orchestrators that sequence functions, agents, and human approvals into reliable pipelines.
A workflow is a durable orchestrator that sequences functions, agents, and human approvals into a reliable pipeline. If a workflow crashes mid-run, it restarts and picks up from the last completed step rather than starting over.
Defining a workflow
Decorate any async function with @workflow. The function is registered automatically at import time.
from agnt5 import workflow, WorkflowContext
@workflow
async def onboarding_workflow(ctx: WorkflowContext, user_email: str) -> dict:
account = await ctx.step(create_account, user_email)
await ctx.step(send_welcome_email, account["id"])
return {"status": "done", "account_id": account["id"]}The first parameter must be ctx: WorkflowContext. Everything after that is your workflow’s input (whatever the caller passes) when triggering the run.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | function.__name__ | Name registered with the platform |
triggers | list[TriggerSpec] | None | Event or webhook triggers that start this workflow automatically |
Steps: the unit of durable work
Use ctx.step() to call a @function inside a workflow. The result is checkpointed after the first successful run. On replay (after a restart), the cached result is returned immediately. The function is not called again.
from agnt5 import workflow, WorkflowContext, function, FunctionContext
@function
async def fetch_user(ctx: FunctionContext, user_id: str) -> dict:
# call your database here
return {"id": user_id, "name": "Ada"}
@function
async def send_email(ctx: FunctionContext, user: dict, subject: str) -> str:
# call your email provider here
return f"Sent to {user['name']}"
@workflow
async def notify_workflow(ctx: WorkflowContext, user_id: str) -> str:
user = await ctx.step(fetch_user, user_id)
result = await ctx.step(send_email, user, "Welcome!")
return resultCalling await fetch_user(ctx, user_id) directly also works, but that result is not checkpointed. The function re-runs every time the workflow replays.
| Call style | Checkpointed | Use when |
|---|---|---|
await ctx.step(fn, *args) | Yes | Always prefer this inside a workflow |
await fn(ctx, *args) | No | One-off calls where replay is fine |
Running in parallel
ctx.parallel(): run tasks concurrently, collect in order
@workflow
async def report_workflow(ctx: WorkflowContext, report_id: str) -> dict:
sales, inventory, customers = await ctx.parallel(
ctx.step(fetch_sales, report_id),
ctx.step(fetch_inventory, report_id),
ctx.step(fetch_customers, report_id),
)
return await ctx.step(compile_report, sales, inventory, customers)Results come back in the same order as the tasks.
ctx.gather(): run tasks concurrently, collect by name
@workflow
async def dashboard_workflow(ctx: WorkflowContext) -> dict:
data = await ctx.gather(
revenue=fetch_revenue(),
users=fetch_active_users(),
errors=fetch_error_rate(),
)
# data["revenue"], data["users"], data["errors"]
return data ctx.batch(): run a function across many inputs
Use batch() when you have a list of items and want to process them in parallel with controlled concurrency.
@function
async def process_document(ctx: FunctionContext, doc_id: str) -> dict:
# process one document
return {"doc_id": doc_id, "status": "processed"}
@workflow
async def bulk_processor(ctx: WorkflowContext, doc_ids: list[str]) -> dict:
result = await ctx.batch(
process_document,
[{"doc_id": d} for d in doc_ids],
max_concurrency=20,
)
return {
"processed": result.stats.completed_items,
"failed": result.stats.failed_items,
}ctx.map() is a simpler wrapper around batch(). It returns just the outputs and raises if any item fails:
outputs = await ctx.map(process_document, [{"doc_id": d} for d in doc_ids])Durable sleep
ctx.sleep() pauses the workflow for a set duration. Unlike asyncio.sleep(), it survives restarts. If the worker crashes during the wait, the workflow resumes and only sleeps for the time that remains.
@workflow
async def follow_up_workflow(ctx: WorkflowContext, user_id: str) -> str:
await ctx.step(send_confirmation, user_id)
await ctx.sleep(24 * 60 * 60, name="wait_24h") # wait 24 hours
await ctx.step(send_follow_up, user_id)
return "Follow-up sent."State
Workflows have access to three state scopes through ctx.
@workflow
async def stateful_workflow(ctx: WorkflowContext, user_id: str) -> dict:
# Run-scoped: lives for this run only
await ctx.state.set("phase", "started")
# Session-scoped: persists across turns for the same session
count = await ctx.session.state.get("visit_count", 0)
await ctx.session.state.set("visit_count", count + 1)
return {"visits": count + 1}| Scope | Access | Persists |
|---|---|---|
| Run | ctx.state | Current run only. Cleared when the run finishes |
| Session | ctx.session.state | Across multiple runs with the same session_id |
Triggering workflows
Event trigger
Start a workflow automatically when a named event fires.
from agnt5 import workflow, WorkflowContext
from agnt5.types import event
@workflow(triggers=[event("user.signed_up")])
async def welcome_workflow(ctx: WorkflowContext, user_id: str) -> str:
await ctx.step(send_welcome_email, user_id)
return "Welcome email sent."Webhook trigger
Start a workflow from an incoming webhook.
from agnt5.types import webhook
@workflow(triggers=[webhook("stripe", event="payment_intent.succeeded")])
async def payment_workflow(ctx: WorkflowContext, amount: int, currency: str) -> str:
await ctx.step(record_payment, amount, currency)
return "Payment recorded."Human-in-the-loop
Pause a workflow and wait for user input using ctx.wait_for_user(). See Human-in-the-loop for the full reference.
@workflow
async def approval_workflow(ctx: WorkflowContext, order_id: str) -> dict:
summary = await ctx.step(prepare_order_summary, order_id)
decision = await ctx.wait_for_user(
question=f"Approve this order?\n\n{summary}",
input_type="approval",
options=[
{"id": "approve", "label": "Approve"},
{"id": "reject", "label": "Reject"},
],
)
if decision == "reject":
return {"status": "rejected"}
result = await ctx.step(fulfil_order, order_id)
return resultReal-world example
An order processing pipeline: validate the order, charge the customer, and send a confirmation, all as durable steps. If any step fails, the workflow retries from that point without re-running earlier steps.
from agnt5 import workflow, WorkflowContext, function, FunctionContext
@function
async def validate_order(ctx: FunctionContext, order_id: str) -> dict:
# check stock, validate address, etc.
return {"order_id": order_id, "total": 89.99, "valid": True}
@function
async def charge_customer(ctx: FunctionContext, order_id: str, amount: float) -> dict:
# call your payments API
return {"order_id": order_id, "charge_id": "ch_abc123", "status": "paid"}
@function
async def send_confirmation(ctx: FunctionContext, order_id: str, charge_id: str) -> str:
# send confirmation email
return f"Confirmation sent for order {order_id}"
@workflow
async def order_workflow(ctx: WorkflowContext, order_id: str) -> dict:
# Step 1: validate
order = await ctx.step(validate_order, order_id)
if not order["valid"]:
return {"status": "invalid", "order_id": order_id}
# Step 2: charge (checkpointed, never charged twice even if workflow restarts)
charge = await ctx.step(charge_customer, order_id, order["total"])
# Step 3: confirm
await ctx.step(send_confirmation, order_id, charge["charge_id"])
return {"status": "complete", "order_id": order_id, "charge_id": charge["charge_id"]}Each step is checkpointed. If the worker restarts between step 2 and step 3, the charge is not made again. The saved result is used and execution continues at step 3.