Workflows
Durable, multi-step orchestration with automatic checkpointing and retries
A Workflow is a durable, multi-step program where every step is checkpointed and every call is retried. If the process crashes mid-run, the workflow resumes from the last completed step — not from the start.
A Workflow is a try/catch block that spans hours. Functions are the units of work inside it: each one retries on failure, and each result is saved before the next step runs.
Mental model
You write normal async Python. AGNT5 records every input and every result. When something fails, it replays the record — the function doesn’t re-execute. When a step is new, it runs and its output is checkpointed before the workflow moves on.
Two primitives compose here. Functions are stateless units of computation with automatic retries and optional idempotency. Workflows chain functions into multi-step processes with checkpointing and parallelism.
A minimal workflow
from agnt5 import workflow, function, WorkflowContext
@function(retries=3, backoff="exponential")
async def fetch_papers(topic: str) -> list[dict]:
return await arxiv_search(topic)
@function()
async def summarize(papers: list[dict]) -> str:
return await call_llm(f"Summarize: {papers}")
@workflow()
async def research_pipeline(ctx: WorkflowContext, topic: str) -> dict:
papers = await ctx.task(fetch_papers, topic)
summary = await ctx.task(summarize, papers)
return {"topic": topic, "summary": summary}Invoke it:
from agnt5 import Client
client = Client()
result = client.run("research_pipeline", {"topic": "quantum computing"}, component_type="workflow")If the process dies after fetch_papers, the next run replays that checkpoint and only re-executes summarize.
Functions: the unit of work
A Function is a durable, retryable computation. Failures retry with backoff. Every call is traced. Idempotency keys guarantee exactly-once execution.
from agnt5 import function, FunctionContext
@function(retries=3)
async def call_flaky_api(ctx: FunctionContext, query: str) -> dict:
ctx.log("Searching", query=query, attempt=ctx.attempt)
return await external_api.search(query)FunctionContext is optional. When present, it provides structured logging, a run ID, and the current retry attempt:
| Property | Description |
|---|---|
ctx.log(msg, **kv) | Structured logging with correlation IDs |
ctx.run_id | Unique execution identifier |
ctx.attempt | Current retry attempt (0-indexed) |
Idempotency
Pass an idempotency_key when retries could cause side effects — payments, emails, record creation:
client.run("charge_payment", {"amount": 99.99}, idempotency_key=f"order-{order_id}")Same key returns the cached result. Different key executes fresh. Use deterministic keys derived from the logical operation (payment-{order_id}, email-{user_id}-{campaign_id}).
Retry policies
Configure retry behavior on the decorator:
from agnt5 import function, RetryPolicy, BackoffPolicy, BackoffType
@function(
retries=RetryPolicy(max_attempts=5, initial_interval_ms=1000),
backoff=BackoffPolicy(type=BackoffType.EXPONENTIAL, multiplier=2.0),
)
async def call_flaky_api(query: str) -> dict:
return await external_api.search(query)| Strategy | Pattern | Use case |
|---|---|---|
| Exponential | 1s → 2s → 4s → 8s | Rate limits, overloaded services |
| Linear | 1s → 2s → 3s → 4s | Predictable recovery time |
| Constant | 1s → 1s → 1s → 1s | Fast retry for transient errors |
Composition
ctx.task() runs a function with checkpointing. ctx.parallel() runs several at once:
@workflow()
async def multi_source(ctx: WorkflowContext, topic: str) -> dict:
arxiv, wiki, news = await ctx.parallel(
ctx.task(search_arxiv, topic),
ctx.task(search_wikipedia, topic),
ctx.task(search_news, topic),
)
return {"arxiv": arxiv, "wiki": wiki, "news": news}ctx.step(name, awaitable) names a checkpoint explicitly:
@workflow()
async def research(ctx: WorkflowContext, topic: str) -> dict:
research = await ctx.step("initial_research", call_llm(f"Research: {topic}"))
analysis = await ctx.step("deep_analysis", call_llm(f"Analyze: {research}"))
return {"analysis": analysis}Execution model
sequenceDiagram
participant C as Client
participant W as Workflow
participant S as State Store
C->>W: run(workflow, input)
W->>W: Execute step 1
W->>S: Checkpoint result
W->>W: Execute step 2
W->>S: Checkpoint result
Note over W: Crash
C->>W: Resume
W->>S: Load checkpoints
W->>W: Execute step 3
W-->>C: Final result
Every ctx.task() writes to the checkpoint log before the next instruction runs. On replay, reads from that log short-circuit re-execution.
When to use workflows
Workflows shine when the shape of the work is known ahead of time:
- Multi-step pipelines where each step is expensive and you don’t want to re-run the early ones on failure
- Processes that must survive crashes mid-execution
- Fan-out / fan-in over N inputs with combined checkpointing
- Anything where the sequence of steps is a property of the code, not a decision made at runtime
If the next step depends on something you only learn at runtime — a tool result, an LLM’s reasoning, a user’s reply — reach for an Agent instead.
Common pitfalls
- Side effects outside
ctx.task(). Anything called directly (not through a task) won’t be checkpointed and will re-execute on replay. - Missing idempotency keys. Without one, a retried payment can charge twice. With one, it can’t.
- Mixing durable and in-memory state. Workflow-local variables don’t survive a crash. Use State & Memory for data that must persist.
Learn more
- Agents — when control flow is LLM-decided
- State & Memory — persistent state across runs
- Human in the Loop — pause for user input mid-run
- Durable execution — what checkpoint and replay guarantee