Get started Workflows

Workflows

Durable, multi-step orchestration with automatic checkpointing and retries

A Workflow is a durable, multi-step program where every step is checkpointed and every call is retried. If the process crashes mid-run, the workflow resumes from the last completed step — not from the start.

A Workflow is a try/catch block that spans hours. Functions are the units of work inside it: each one retries on failure, and each result is saved before the next step runs.

Mental model

You write normal async Python. AGNT5 records every input and every result. When something fails, it replays the record — the function doesn’t re-execute. When a step is new, it runs and its output is checkpointed before the workflow moves on.

Two primitives compose here. Functions are stateless units of computation with automatic retries and optional idempotency. Workflows chain functions into multi-step processes with checkpointing and parallelism.

A minimal workflow

from agnt5 import workflow, function, WorkflowContext

@function(retries=3, backoff="exponential")
async def fetch_papers(topic: str) -> list[dict]:
    return await arxiv_search(topic)

@function()
async def summarize(papers: list[dict]) -> str:
    return await call_llm(f"Summarize: {papers}")

@workflow()
async def research_pipeline(ctx: WorkflowContext, topic: str) -> dict:
    papers = await ctx.task(fetch_papers, topic)
    summary = await ctx.task(summarize, papers)
    return {"topic": topic, "summary": summary}

Invoke it:

from agnt5 import Client

client = Client()
result = client.run("research_pipeline", {"topic": "quantum computing"}, component_type="workflow")

If the process dies after fetch_papers, the next run replays that checkpoint and only re-executes summarize.

Functions: the unit of work

A Function is a durable, retryable computation. Failures retry with backoff. Every call is traced. Idempotency keys guarantee exactly-once execution.

from agnt5 import function, FunctionContext

@function(retries=3)
async def call_flaky_api(ctx: FunctionContext, query: str) -> dict:
    ctx.log("Searching", query=query, attempt=ctx.attempt)
    return await external_api.search(query)

FunctionContext is optional. When present, it provides structured logging, a run ID, and the current retry attempt:

Property Description
ctx.log(msg, **kv) Structured logging with correlation IDs
ctx.run_id Unique execution identifier
ctx.attempt Current retry attempt (0-indexed)

Idempotency

Pass an idempotency_key when retries could cause side effects — payments, emails, record creation:

client.run("charge_payment", {"amount": 99.99}, idempotency_key=f"order-{order_id}")

Same key returns the cached result. Different key executes fresh. Use deterministic keys derived from the logical operation (payment-{order_id}, email-{user_id}-{campaign_id}).

Retry policies

Configure retry behavior on the decorator:

from agnt5 import function, RetryPolicy, BackoffPolicy, BackoffType

@function(
    retries=RetryPolicy(max_attempts=5, initial_interval_ms=1000),
    backoff=BackoffPolicy(type=BackoffType.EXPONENTIAL, multiplier=2.0),
)
async def call_flaky_api(query: str) -> dict:
    return await external_api.search(query)
Strategy Pattern Use case
Exponential 1s → 2s → 4s → 8s Rate limits, overloaded services
Linear 1s → 2s → 3s → 4s Predictable recovery time
Constant 1s → 1s → 1s → 1s Fast retry for transient errors

Composition

ctx.task() runs a function with checkpointing. ctx.parallel() runs several at once:

@workflow()
async def multi_source(ctx: WorkflowContext, topic: str) -> dict:
    arxiv, wiki, news = await ctx.parallel(
        ctx.task(search_arxiv, topic),
        ctx.task(search_wikipedia, topic),
        ctx.task(search_news, topic),
    )
    return {"arxiv": arxiv, "wiki": wiki, "news": news}

ctx.step(name, awaitable) names a checkpoint explicitly:

@workflow()
async def research(ctx: WorkflowContext, topic: str) -> dict:
    research = await ctx.step("initial_research", call_llm(f"Research: {topic}"))
    analysis = await ctx.step("deep_analysis", call_llm(f"Analyze: {research}"))
    return {"analysis": analysis}

Execution model

      sequenceDiagram
    participant C as Client
    participant W as Workflow
    participant S as State Store

    C->>W: run(workflow, input)
    W->>W: Execute step 1
    W->>S: Checkpoint result
    W->>W: Execute step 2
    W->>S: Checkpoint result
    Note over W: Crash
    C->>W: Resume
    W->>S: Load checkpoints
    W->>W: Execute step 3
    W-->>C: Final result

    

Every ctx.task() writes to the checkpoint log before the next instruction runs. On replay, reads from that log short-circuit re-execution.

When to use workflows

Workflows shine when the shape of the work is known ahead of time:

  • Multi-step pipelines where each step is expensive and you don’t want to re-run the early ones on failure
  • Processes that must survive crashes mid-execution
  • Fan-out / fan-in over N inputs with combined checkpointing
  • Anything where the sequence of steps is a property of the code, not a decision made at runtime

If the next step depends on something you only learn at runtime — a tool result, an LLM’s reasoning, a user’s reply — reach for an Agent instead.

Common pitfalls

  • Side effects outside ctx.task(). Anything called directly (not through a task) won’t be checkpointed and will re-execute on replay.
  • Missing idempotency keys. Without one, a retried payment can charge twice. With one, it can’t.
  • Mixing durable and in-memory state. Workflow-local variables don’t survive a crash. Use State & Memory for data that must persist.

Learn more