Workflows
Multi-step orchestration with automatic checkpointing
A Workflow is a multi-step process where each step is automatically checkpointed. If a workflow fails, it resumes from the last completed step — not from the beginning.
Think of a Workflow like a try/catch block that spans hours — if anything fails, you resume from the last successful step, not from the beginning.
Why It Exists
When you fetch papers, analyze them with an LLM, then generate a summary — if the summary fails, you don’t want to re-fetch or re-analyze. Each step is expensive. Normal async code doesn’t checkpoint, so failures mean starting over.
Workflows solve this. Each step is checkpointed. Failures resume from the last checkpoint. You write normal async Python, AGNT5 makes it durable.
When to Use
- Multi-step pipelines with expensive operations (LLM calls, API requests)
- Processes that must survive crashes mid-execution
- Orchestration that needs progress tracking
- Human-in-the-loop flows that pause for user input
Example
from agnt5 import workflow, WorkflowContext
@workflow()
async def research_pipeline(ctx: WorkflowContext, topic: str) -> dict:
# Step 1: Fetch papers (checkpointed)
papers = await ctx.task(fetch_papers, topic)
# Step 2: Analyze with LLM (checkpointed)
analysis = await ctx.task(analyze_papers, papers)
# Step 3: Generate synthesis (checkpointed)
synthesis = await ctx.task(generate_synthesis, analysis)
return {"topic": topic, "synthesis": synthesis}Call it via the client:
from agnt5 import Client
client = Client()
result = client.run("research_pipeline", {"topic": "quantum computing"}, component_type="workflow")If the workflow crashes after Step 2:
- Step 1 replays from checkpoint (papers not re-fetched)
- Step 2 replays from checkpoint (LLM not re-called)
- Step 3 executes fresh
How It Works
Workflows provide three guarantees:
- Automatic Checkpointing — Each
ctx.task()orctx.step()saves its result before proceeding - Resume on Failure — Crashes resume from the last checkpoint, not from the beginning
- Parallel Execution —
ctx.parallel()runs multiple tasks concurrently with combined checkpointing
sequenceDiagram
participant C as Client
participant W as Workflow
participant S as State Store
C->>W: run(workflow, input)
W->>W: Execute step 1
W->>S: Checkpoint result
W->>W: Execute step 2
W->>S: Checkpoint result
Note over W: Crash here
C->>W: Resume workflow
W->>S: Load checkpoints
S-->>W: Step 1 & 2 results
W->>W: Execute step 3 (fresh)
W->>S: Checkpoint result
W-->>C: Final result
Configuration
Task Execution
Use ctx.task() to execute functions with checkpointing:
@workflow()
async def pipeline(ctx: WorkflowContext, doc_url: str) -> dict:
# Each task is checkpointed independently
content = await ctx.task(fetch_document, doc_url)
summary = await ctx.task(summarize, content)
return {"summary": summary}Parallel Execution
Run multiple tasks concurrently:
@workflow()
async def multi_source(ctx: WorkflowContext, topic: str) -> dict:
# All three run in parallel, checkpointed together
arxiv, wiki, news = await ctx.parallel(
ctx.task(search_arxiv, topic),
ctx.task(search_wikipedia, topic),
ctx.task(search_news, topic),
)
return {"arxiv": arxiv, "wiki": wiki, "news": news}Named Steps
Use ctx.step() for explicit checkpoint names:
@workflow()
async def research(ctx: WorkflowContext, topic: str) -> dict:
research = await ctx.step("initial_research", call_llm(f"Research: {topic}"))
analysis = await ctx.step("deep_analysis", call_llm(f"Analyze: {research}"))
return {"analysis": analysis}Workflow State
Track progress with durable state:
@workflow()
async def process_documents(ctx: WorkflowContext, docs: list[str]) -> dict:
ctx.state.set("total", len(docs))
ctx.state.set("processed", 0)
for doc in docs:
await ctx.task(process_doc, doc)
ctx.state.set("processed", ctx.state.get("processed") + 1)
return {"processed": ctx.state.get("processed")}| Method | Description |
|---|---|
ctx.state.get(key, default) | Get value from workflow state |
ctx.state.set(key, value) | Set value in workflow state |
ctx.state.delete(key) | Delete key from state |
ctx.state.clear() | Clear all state |
Human-in-the-Loop
Pause for user input with ctx.wait_for_user():
@workflow(chat=True)
async def deployment_approval(ctx: WorkflowContext, changes: dict) -> dict:
validation = await ctx.task(validate_deployment, changes)
if not validation["safe"]:
return {"status": "rejected", "reason": validation["issues"]}
# Pause for human approval
decision = await ctx.wait_for_user(
question=f"Approve deployment of {len(changes['files'])} files?",
input_type="approval",
options=[
{"id": "approve", "label": "Approve"},
{"id": "reject", "label": "Reject"}
]
)
if decision == "approve":
result = await ctx.task(execute_deployment, changes)
return {"status": "deployed", "result": result}
else:
return {"status": "cancelled"}Input types:
approval— Yes/no decisions with optionschoice— Multiple choice selectiontext— Free-form text input
The workflow literally pauses mid-execution. When the user responds, it resumes from where it left off.
Guidelines
Common Patterns
Workflow → Function → Function (sequential pipeline)
Workflow → parallel(Function, Function, Function) (fan-out)
Workflow → Entity (persist state beyond workflow)
Workflow → Agent (autonomous reasoning step)
Workflow → wait_for_user (human approval gate)Common Pitfalls
-
❌ Don’t make LLM calls outside
ctx.task()orctx.step()— They won’t be checkpointed. On retry, they’ll re-execute. -
❌ Don’t use workflow state for persistent data — Workflow state is temporary. Use Entities for data that should survive beyond the workflow.
-
❌ Don’t forget
chat=Truefor HITL workflows — Human-in-the-loop requires chat mode for session management.
What Workflows Don’t Do
- Not for single operations — Use Functions for atomic, stateless computation
- Not for persistent state — Use Entities for state that survives across workflows
- Not for autonomous reasoning — Use Agents for dynamic tool selection and planning
API Reference
@workflow()— Decorator to define a durable workflowWorkflowContext— Context with task, step, parallel, state, and HITL methodsctx.task(fn, *args)— Execute function with checkpointingctx.step(name, awaitable)— Checkpoint with explicit namectx.parallel(*tasks)— Run tasks concurrentlyctx.state— Durable state for progress trackingctx.wait_for_user()— Pause for human input (requireschat=True)