Workflows
Multi-step orchestration with automatic checkpointing
When you fetch research papers, analyze them with an LLM, then generate a summary — if the summary generation fails, you don’t want to re-fetch papers or re-run the expensive analysis. Each step is costly. Normal async code doesn’t checkpoint progress, so failures mean starting over.
AGNT5 workflows solve this problem. Each step is automatically checkpointed. When failures happen, workflows resume from the last completed step. You write normal async Python, and AGNT5 makes it durable.
Example
from agnt5 import workflow, WorkflowContext
@workflow()
async def research_pipeline(ctx: WorkflowContext, topic: str) -> dict:
# Step 1: Fetch papers (checkpointed)
papers = await ctx.task(fetch_research_papers, topic)
# Step 2: Analyze each paper with LLM (checkpointed)
analysis = await ctx.task(analyze_papers, papers)
# Step 3: Generate synthesis (checkpointed)
synthesis = await ctx.task(generate_synthesis, analysis)
return {
"topic": topic,
"papers_analyzed": len(papers),
"synthesis": synthesis
}
When you call it:
from agnt5 import Client
client = Client()
result = client.run("research_pipeline", {"topic": "quantum computing"})
AGNT5:
- Executes Step 1 (fetch papers), checkpoints the result
- Executes Step 2 (analyze with LLM), checkpoints the result
- Executes Step 3 (generate synthesis), checkpoints the result
- Returns the final result
If the workflow crashes after Step 2, on retry:
- Step 1 is replayed from checkpoint (papers not re-fetched)
- Step 2 is replayed from checkpoint (LLM analysis not re-run)
- Step 3 executes fresh (synthesis generation picks up where it left off)
This is durable orchestration: expensive LLM calls run once, progress is never lost.
Workflow State
Workflows have durable state that survives crashes:
from agnt5 import workflow, WorkflowContext
@workflow()
async def document_analysis(ctx: WorkflowContext, doc_url: str) -> dict:
# Initialize state
ctx.state.set("document", doc_url)
ctx.state.set("status", "analyzing")
ctx.state.set("chunks_processed", 0)
# Fetch and chunk document
chunks = await ctx.task(chunk_document, doc_url)
# Process each chunk with LLM
for chunk in chunks:
insights = await ctx.task(extract_insights, chunk)
# Update progress in state
count = ctx.state.get("chunks_processed", 0)
ctx.state.set("chunks_processed", count + 1)
ctx.state.set("latest_insights", insights)
ctx.state.set("status", "completed")
return {
"document": doc_url,
"chunks_analyzed": ctx.state.get("chunks_processed")
}
State operations:
ctx.state.set(key, value)— Store value in workflow statectx.state.get(key, default)— Retrieve value from statectx.state.delete(key)— Delete key from statectx.state.clear()— Clear all state
State persists across crashes. If the workflow fails mid-loop, on retry it resumes with sources_processed intact.
Orchestrating Functions
Workflows orchestrate functions as tasks. Each task is checkpointed independently.
from agnt5 import workflow, WorkflowContext, function, FunctionContext
# Define functions
@function()
async def fetch_paper(ctx: FunctionContext, paper_id: str) -> dict:
content = await download_arxiv_paper(paper_id)
return {"paper_id": paper_id, "content": content}
@function()
async def extract_key_points(ctx: FunctionContext, content: str) -> list[str]:
prompt = f"Extract key points from this paper:\n\n{content}"
points = await call_llm(prompt)
return points
@function()
async def summarize_points(ctx: FunctionContext, points: list[str]) -> str:
prompt = f"Summarize these key points:\n\n{points}"
summary = await call_llm(prompt)
return summary
# Orchestrate them
@workflow()
async def paper_analysis(ctx: WorkflowContext, paper_id: str) -> dict:
# Task 1: Fetch paper
paper = await ctx.task(fetch_paper, paper_id)
# Task 2: Extract key points with LLM
key_points = await ctx.task(extract_key_points, paper["content"])
# Task 3: Generate summary
summary = await ctx.task(summarize_points, key_points)
return {
"paper_id": paper_id,
"key_points": key_points,
"summary": summary
}
Each ctx.task() call:
- Executes the function
- Checkpoints the result
- Returns the value
On replay, completed tasks return cached results instantly — no re-execution.
Parallel Execution
Run multiple tasks concurrently with ctx.parallel() or ctx.gather():
from agnt5 import workflow, WorkflowContext
@workflow()
async def multi_source_research(ctx: WorkflowContext, topic: str) -> dict:
# Query multiple sources in parallel
arxiv_papers, wiki_content, news_articles = await ctx.parallel(
ctx.task(search_arxiv, topic),
ctx.task(search_wikipedia, topic),
ctx.task(search_news, topic),
)
# Combine results
return {
"topic": topic,
"arxiv": arxiv_papers,
"wikipedia": wiki_content,
"news": news_articles
}
Or use ctx.gather() for named results:
@workflow()
async def document_enrichment(ctx: WorkflowContext, doc_url: str) -> dict:
# Named parallel tasks
results = await ctx.gather(
content=ctx.task(extract_text, doc_url),
entities=ctx.task(extract_entities, doc_url),
summary=ctx.task(generate_summary, doc_url),
)
return {
"document": doc_url,
"content": results["content"],
"entities": results["entities"],
"summary": results["summary"]
}
Parallel tasks execute concurrently and are checkpointed together.
Checkpointing with ctx.step()
For expensive operations that aren’t functions, use ctx.step() to checkpoint explicitly:
from agnt5 import workflow, WorkflowContext
@workflow()
async def deep_research_workflow(ctx: WorkflowContext, topic: str) -> dict:
# Checkpoint expensive LLM calls
initial_research = await ctx.step(
"initial_research",
call_llm(f"Research this topic: {topic}")
)
# Another expensive LLM call
deep_analysis = await ctx.step(
"deep_analysis",
call_llm(f"Provide detailed analysis: {initial_research}")
)
# Final synthesis
synthesis = await ctx.step(
"synthesis",
call_llm(f"Synthesize findings: {deep_analysis}")
)
return {
"topic": topic,
"initial_research": initial_research,
"deep_analysis": deep_analysis,
"synthesis": synthesis
}
If the workflow crashes after “initial_research”, on retry:
- “initial_research” step is replayed from checkpoint (LLM not called again)
- “deep_analysis” and “synthesis” execute fresh
Each ctx.step() creates a durable checkpoint with a unique name.
Calling Workflows
From Client SDK
from agnt5 import Client
client = Client()
# Execute workflow and wait for result
result = client.run("research_pipeline", {"topic": "quantum computing"})
From Other Workflows
Workflows can orchestrate other workflows:
@workflow()
async def comprehensive_research(ctx: WorkflowContext, topic: str) -> dict:
# Call sub-workflow as a task
literature_review = await ctx.task(research_pipeline, topic)
# Call another sub-workflow with the results
expert_analysis = await ctx.task(expert_consultation_workflow, topic, literature_review)
return {"literature": literature_review, "expert_insights": expert_analysis}
Workflows nest naturally — each sub-workflow execution is checkpointed.
Workflow State vs Entity State
Workflows have temporary state for orchestration. Entities have persistent state for objects.
from agnt5 import workflow, WorkflowContext, Entity
# Entity for persistent conversation state
class Conversation(Entity):
async def add_message(self, role: str, content: str) -> dict:
messages = self.state.get("messages", [])
messages.append({"role": role, "content": content})
self.state.set("messages", messages)
return {"message_count": len(messages)}
async def get_history(self) -> list[dict]:
return self.state.get("messages", [])
# Workflow for research process (temporary state)
@workflow()
async def research_with_memory(ctx: WorkflowContext, session_id: str, query: str) -> dict:
# Workflow state: tracks this research's progress
ctx.state.set("session_id", session_id)
ctx.state.set("status", "researching")
# Entity state: persistent conversation history
conversation = client.entity("Conversation", f"session-{session_id}")
# Add user query to conversation
await ctx.task(lambda: conversation.add_message("user", query))
# Perform research
research_results = await ctx.task(research_topic, query)
# Generate response with conversation context
history = await ctx.task(lambda: conversation.get_history())
response = await ctx.task(generate_contextual_response, query, research_results, history)
# Save assistant response to conversation
if response["success"]:
ctx.state.set("status", "completed")
await ctx.task(lambda: conversation.add_message("assistant", response["content"]))
return {"response": response["content"], "status": ctx.state.get("status")}
Workflow state:
- Scoped to this workflow execution
- Used for orchestration progress
- Temporary (cleared after workflow completes)
Entity state:
- Scoped to the entity key
- Used for object state (conversation history, agent memory, research sessions)
- Persistent (survives beyond workflow execution)
Human-in-the-Loop Workflows
Some workflows need human input mid-execution. Approval workflows, data validation, content review — you can’t fully automate these processes. The workflow must pause, wait for a user decision, then continue.
AGNT5 workflows support human-in-the-loop (HITL) patterns through ctx.wait_for_user(). The workflow pauses execution, stores its state, and resumes when the user responds. No polling. No background jobs. The workflow itself handles the pause and resume.
Basic Approval Flow
from agnt5 import workflow, WorkflowContext
@workflow(chat=True)
async def deployment_approval(ctx: WorkflowContext, changes: dict) -> dict:
# Step 1: Validate changes
validation = await ctx.task(validate_deployment, changes)
if not validation["safe"]:
return {"status": "rejected", "reason": validation["issues"]}
# Step 2: Pause for human approval
decision = await ctx.wait_for_user(
question=f"Approve deployment of {len(changes['files'])} files?",
input_type="approval",
options=[
{"id": "approve", "label": "Approve"},
{"id": "reject", "label": "Reject"}
]
)
# Step 3: Handle decision
if decision == "approve":
result = await ctx.task(execute_deployment, changes)
return {"status": "deployed", "result": result}
else:
return {"status": "cancelled", "reason": "user_rejected"}
When you call this workflow:
from agnt5 import Client
client = Client()
result = client.run_workflow("deployment_approval", {
"changes": {"files": ["api.py", "models.py"], "env": "production"}
})
AGNT5:
- Executes Step 1 (validation), checkpoints result
- Reaches Step 2 (
wait_for_user), pauses execution - Returns HTTP 202 with pause information to frontend
- Frontend displays approval buttons
- User clicks “Approve” or “Reject”
- Frontend calls resume endpoint with user’s choice
- Workflow resumes at Step 3 with user’s decision
- Completes with deployment result or cancellation
The workflow doesn’t poll or check a database. It literally pauses mid-execution and resumes when the user responds.
How Pause and Resume Works
When ctx.wait_for_user() is called:
- Pause detection: Workflow raises a special exception to signal pause
- State persistence: Current workflow state is checkpointed
- Response caching: If this is a replay (after resume), returns cached user response
- Frontend notification: Platform returns pause info to client (HTTP 202)
- User interaction: Frontend displays question and options
- Resume trigger: User’s response triggers workflow resume
- Deterministic replay: Workflow replays from beginning with user response cached
- Continuation:
wait_for_user()returns the cached response, workflow continues
This is deterministic replay with injected responses. The workflow doesn’t maintain a running process — it replays from the beginning each time, but completed steps (including user responses) return cached results.
Input Types
Three types of user input are supported:
Approval (yes/no decisions):
decision = await ctx.wait_for_user(
question="Deploy to production?",
input_type="approval",
options=[
{"id": "yes", "label": "Deploy"},
{"id": "no", "label": "Cancel"}
]
)
# Returns: "yes" or "no"
Choice (multiple options):
priority = await ctx.wait_for_user(
question="What priority should this task have?",
input_type="choice",
options=[
{"id": "urgent", "label": "Urgent (24h)"},
{"id": "high", "label": "High (3 days)"},
{"id": "normal", "label": "Normal (1 week)"},
{"id": "low", "label": "Low (no deadline)"}
]
)
# Returns: selected option ID (e.g., "urgent")
Text (free-form input):
feedback = await ctx.wait_for_user(
question="Please provide additional context:",
input_type="text"
)
# Returns: user's text input
Multi-Step Pauses
Workflows can pause multiple times for different inputs:
@workflow(chat=True)
async def content_review(ctx: WorkflowContext, article_id: str) -> dict:
# Fetch and analyze content
article = await ctx.task(fetch_article, article_id)
analysis = await ctx.task(analyze_content, article)
# First pause: Approval to proceed
proceed = await ctx.wait_for_user(
question=f"Article has {len(analysis['issues'])} issues. Review them?",
input_type="approval",
options=[
{"id": "yes", "label": "Review"},
{"id": "no", "label": "Skip"}
]
)
if proceed == "no":
return {"status": "skipped"}
# Second pause: Choose action
action = await ctx.wait_for_user(
question="How should we handle the issues?",
input_type="choice",
options=[
{"id": "auto_fix", "label": "Auto-fix issues"},
{"id": "manual", "label": "Review manually"},
{"id": "reject", "label": "Reject article"}
]
)
# Third pause: If manual review, get feedback
if action == "manual":
feedback = await ctx.wait_for_user(
question="What changes are needed?",
input_type="text"
)
return {
"status": "needs_revision",
"feedback": feedback,
"issues": analysis["issues"]
}
# Handle other actions...
if action == "auto_fix":
fixed = await ctx.task(auto_fix_issues, article, analysis["issues"])
return {"status": "fixed", "changes": fixed}
else:
return {"status": "rejected"}
Each wait_for_user() call pauses the workflow. When resumed, the workflow replays from the beginning, but each pause point returns its cached response immediately.
Session Management
HITL workflows typically use chat sessions to maintain conversation context:
@workflow(chat=True)
async def research_assistant(ctx: WorkflowContext, message: str, session_id: str = None) -> dict:
# Load conversation history from entity
conversation = ctx.entity("Conversation", f"session-{session_id}")
history = await ctx.task(lambda: conversation.get_history())
# Perform initial research
research = await ctx.task(search_papers, message, history)
# Present findings and ask for direction
direction = await ctx.wait_for_user(
question=f"Found {len(research['papers'])} papers. What should I focus on?",
input_type="choice",
options=[
{"id": "summarize", "label": "Summarize key findings"},
{"id": "compare", "label": "Compare methodologies"},
{"id": "analyze", "label": "Deep dive on specific paper"}
]
)
# Execute chosen direction
if direction == "summarize":
summary = await ctx.task(summarize_papers, research['papers'])
result = {"type": "summary", "content": summary}
elif direction == "compare":
comparison = await ctx.task(compare_methodologies, research['papers'])
result = {"type": "comparison", "content": comparison}
else:
# Ask which paper to analyze
paper_choice = await ctx.wait_for_user(
question="Which paper should I analyze in depth?",
input_type="choice",
options=[
{"id": paper["id"], "label": paper["title"]}
for paper in research['papers'][:5]
]
)
analysis = await ctx.task(deep_analyze_paper, paper_choice)
result = {"type": "analysis", "content": analysis}
# Save to conversation history
await ctx.task(lambda: conversation.add_message("assistant", result["content"]))
return result
The session_id parameter is automatically managed by the platform when chat=True is set. Each session maintains its own conversation state through entities.
Crash Recovery
HITL workflows survive crashes just like regular workflows:
Scenario 1: Crash before user responds
- Workflow pauses at
wait_for_user() - State is saved to database
- Worker process crashes
- User clicks a button to respond
- Resume endpoint starts a fresh worker
- Workflow replays from beginning
- Reaches
wait_for_user(), finds cached response - Continues execution
Scenario 2: Crash after user responds
- Workflow pauses, user responds
- Workflow resumes and continues executing
- Crashes before completion
- Retry starts fresh worker
- Workflow replays from beginning
- All user responses are cached and returned immediately
- Execution continues from last checkpoint
The user’s responses are treated as completed steps in the checkpoint history. Replay always returns cached responses for completed pause points.
When to Use HITL Workflows
Use human-in-the-loop when you need:
- Approval gates — Deployments, content publishing, financial transactions
- Guided workflows — User chooses direction at decision points
- Data validation — Human verification of extracted or generated content
- Exception handling — Escalate to human when automation can’t decide
- Progressive disclosure — Show results, ask for next steps
Don’t use HITL when:
- Workflow can complete fully automated
- User input is collected upfront (use regular workflow inputs)
- Approval is deterministic (use business logic instead)
- Real-time interaction isn’t needed (use async notifications)
HITL workflows are for situations where the workflow genuinely needs to wait for human judgment during execution.
When to Use Workflows
Use workflows when you need:
- Multi-step orchestration — Coordinate multiple functions in sequence or parallel
- Checkpoint recovery — Expensive operations that shouldn’t re-execute on failure
- Progress tracking — State that tracks orchestration progress
- Durable coordination — Survive crashes mid-execution
- Human-in-the-loop — Pause for user input and resume with their response
Use functions instead when:
- Single operation with no orchestration
- Stateless computation
- No need for step-by-step checkpointing
Use entities instead when:
- Object-oriented state management
- Persistent state beyond single execution
- Single-writer consistency per key
Workflows as Foundation
Workflows provide durable orchestration for multi-step AI processes:
- Functions are the atomic units (individual operations)
- Entities manage persistent state (conversations, agent memory, research sessions)
- Workflows orchestrate functions and entities across multiple steps
What makes workflows reliable is automatic checkpointing: each task execution is saved before moving to the next. When you call a function, checkpoint it. When you call an LLM, checkpoint it. When the workflow crashes, it resumes from the last checkpoint.
This is different from normal async code in two ways: progress is never lost, and expensive operations never re-execute. You write sequential or parallel async logic, and AGNT5 handles the distributed systems complexity — checkpoint storage, replay detection, crash recovery, and exactly-once execution.
When an LLM call times out after 3 minutes, when a knowledge base query fails, when a process crashes mid-workflow — workflows don’t start over. They resume from checkpoints. This is what makes AI applications reliable: every step you write is durable, every expensive operation runs once, and progress is permanent.