Get started Workflows

Workflows

Multi-step orchestration with automatic checkpointing

A Workflow is a multi-step process where each step is automatically checkpointed. If a workflow fails, it resumes from the last completed step — not from the beginning.

Think of a Workflow like a try/catch block that spans hours — if anything fails, you resume from the last successful step, not from the beginning.

Why It Exists

When you fetch papers, analyze them with an LLM, then generate a summary — if the summary fails, you don’t want to re-fetch or re-analyze. Each step is expensive. Normal async code doesn’t checkpoint, so failures mean starting over.

Workflows solve this. Each step is checkpointed. Failures resume from the last checkpoint. You write normal async Python, AGNT5 makes it durable.

When to Use

  • Multi-step pipelines with expensive operations (LLM calls, API requests)
  • Processes that must survive crashes mid-execution
  • Orchestration that needs progress tracking
  • Human-in-the-loop flows that pause for user input

Example

from agnt5 import workflow, WorkflowContext

@workflow()
async def research_pipeline(ctx: WorkflowContext, topic: str) -> dict:
    # Step 1: Fetch papers (checkpointed)
    papers = await ctx.task(fetch_papers, topic)

    # Step 2: Analyze with LLM (checkpointed)
    analysis = await ctx.task(analyze_papers, papers)

    # Step 3: Generate synthesis (checkpointed)
    synthesis = await ctx.task(generate_synthesis, analysis)

    return {"topic": topic, "synthesis": synthesis}

Call it via the client:

from agnt5 import Client

client = Client()
result = client.run("research_pipeline", {"topic": "quantum computing"}, component_type="workflow")

If the workflow crashes after Step 2:

  • Step 1 replays from checkpoint (papers not re-fetched)
  • Step 2 replays from checkpoint (LLM not re-called)
  • Step 3 executes fresh

How It Works

Workflows provide three guarantees:

  • Automatic Checkpointing — Each ctx.task() or ctx.step() saves its result before proceeding
  • Resume on Failure — Crashes resume from the last checkpoint, not from the beginning
  • Parallel Executionctx.parallel() runs multiple tasks concurrently with combined checkpointing
      sequenceDiagram
    participant C as Client
    participant W as Workflow
    participant S as State Store

    C->>W: run(workflow, input)
    W->>W: Execute step 1
    W->>S: Checkpoint result
    W->>W: Execute step 2
    W->>S: Checkpoint result
    Note over W: Crash here
    C->>W: Resume workflow
    W->>S: Load checkpoints
    S-->>W: Step 1 & 2 results
    W->>W: Execute step 3 (fresh)
    W->>S: Checkpoint result
    W-->>C: Final result

    

Configuration

Task Execution

Use ctx.task() to execute functions with checkpointing:

@workflow()
async def pipeline(ctx: WorkflowContext, doc_url: str) -> dict:
    # Each task is checkpointed independently
    content = await ctx.task(fetch_document, doc_url)
    summary = await ctx.task(summarize, content)
    return {"summary": summary}

Parallel Execution

Run multiple tasks concurrently:

@workflow()
async def multi_source(ctx: WorkflowContext, topic: str) -> dict:
    # All three run in parallel, checkpointed together
    arxiv, wiki, news = await ctx.parallel(
        ctx.task(search_arxiv, topic),
        ctx.task(search_wikipedia, topic),
        ctx.task(search_news, topic),
    )
    return {"arxiv": arxiv, "wiki": wiki, "news": news}

Named Steps

Use ctx.step() for explicit checkpoint names:

@workflow()
async def research(ctx: WorkflowContext, topic: str) -> dict:
    research = await ctx.step("initial_research", call_llm(f"Research: {topic}"))
    analysis = await ctx.step("deep_analysis", call_llm(f"Analyze: {research}"))
    return {"analysis": analysis}

Workflow State

Track progress with durable state:

@workflow()
async def process_documents(ctx: WorkflowContext, docs: list[str]) -> dict:
    ctx.state.set("total", len(docs))
    ctx.state.set("processed", 0)

    for doc in docs:
        await ctx.task(process_doc, doc)
        ctx.state.set("processed", ctx.state.get("processed") + 1)

    return {"processed": ctx.state.get("processed")}
Method Description
ctx.state.get(key, default) Get value from workflow state
ctx.state.set(key, value) Set value in workflow state
ctx.state.delete(key) Delete key from state
ctx.state.clear() Clear all state

Human-in-the-Loop

Pause for user input with ctx.wait_for_user():

@workflow(chat=True)
async def deployment_approval(ctx: WorkflowContext, changes: dict) -> dict:
    validation = await ctx.task(validate_deployment, changes)

    if not validation["safe"]:
        return {"status": "rejected", "reason": validation["issues"]}

    # Pause for human approval
    decision = await ctx.wait_for_user(
        question=f"Approve deployment of {len(changes['files'])} files?",
        input_type="approval",
        options=[
            {"id": "approve", "label": "Approve"},
            {"id": "reject", "label": "Reject"}
        ]
    )

    if decision == "approve":
        result = await ctx.task(execute_deployment, changes)
        return {"status": "deployed", "result": result}
    else:
        return {"status": "cancelled"}

Input types:

  • approval — Yes/no decisions with options
  • choice — Multiple choice selection
  • text — Free-form text input

The workflow literally pauses mid-execution. When the user responds, it resumes from where it left off.

Guidelines

Common Patterns

Workflow → Function → Function (sequential pipeline)
Workflow → parallel(Function, Function, Function) (fan-out)
Workflow → Entity (persist state beyond workflow)
Workflow → Agent (autonomous reasoning step)
Workflow → wait_for_user (human approval gate)

Common Pitfalls

  • Don’t make LLM calls outside ctx.task() or ctx.step() — They won’t be checkpointed. On retry, they’ll re-execute.

  • Don’t use workflow state for persistent data — Workflow state is temporary. Use Entities for data that should survive beyond the workflow.

  • Don’t forget chat=True for HITL workflows — Human-in-the-loop requires chat mode for session management.

What Workflows Don’t Do

  • Not for single operations — Use Functions for atomic, stateless computation
  • Not for persistent state — Use Entities for state that survives across workflows
  • Not for autonomous reasoning — Use Agents for dynamic tool selection and planning

API Reference

  • @workflow() — Decorator to define a durable workflow
  • WorkflowContext — Context with task, step, parallel, state, and HITL methods
  • ctx.task(fn, *args) — Execute function with checkpointing
  • ctx.step(name, awaitable) — Checkpoint with explicit name
  • ctx.parallel(*tasks) — Run tasks concurrently
  • ctx.state — Durable state for progress tracking
  • ctx.wait_for_user() — Pause for human input (requires chat=True)