Skip to content
Docs
Build Human-in-the-loop

Human-in-the-loop

Pause workflows for durable human input, approvals, selections, and follow-up decisions.

Human-in-the-loop (HITL) lets a workflow pause mid-execution, wait for a human to respond, then continue from exactly where it left off. The pause is durable. If the worker restarts while waiting, the workflow resumes correctly when the user eventually replies.

Call ctx.wait_for_user() anywhere inside a workflow to trigger a pause.


How it works

Call ctx.wait_for_user() at any point in your workflow. The workflow pauses there, shows the question to the user, and resumes from that exact point once they respond. The pause survives worker restarts. The state is saved and nothing is lost.


ctx.wait_for_user() parameters

Parameter Type Default Description
question str required The text shown to the user
input_type str "text" Input mode (see below)
options list[dict] None Choices for approval, select, and multiselect. Each dict needs "id" and "label"
allow_custom bool False Adds a free-text “Something else” option to select and multiselect
skippable bool False Adds a Skip button. Returns None when skipped

Input types

text: free text

The user types any response. Use this for open-ended input like names, instructions, or edited content.

name = await ctx.wait_for_user("What should we call this report?")

approval: yes / no

Present a clear action and let the user approve or reject it.

decision = await ctx.wait_for_user(
    question="Deploy to production?",
    input_type="approval",
    options=[
        {"id": "approve", "label": "Approve"},
        {"id": "reject", "label": "Reject"},
    ],
)

if decision == "reject":
    return {"status": "cancelled"}

select: pick one

The user picks a single option from a list.

format = await ctx.wait_for_user(
    question="Which output format do you want?",
    input_type="select",
    options=[
        {"id": "pdf", "label": "PDF"},
        {"id": "markdown", "label": "Markdown"},
        {"id": "html", "label": "HTML"},
    ],
)

multiselect: pick many

The user picks one or more options. The return value is a comma-separated string of the selected ids.

topics = await ctx.wait_for_user(
    question="Which topics should the report cover?",
    input_type="multiselect",
    options=[
        {"id": "market", "label": "Market analysis"},
        {"id": "tech", "label": "Technology trends"},
        {"id": "risk", "label": "Risk factors"},
    ],
)
# topics might be "market,tech"
selected = topics.split(",")

Options: allow_custom and skippable

allow_custom=True adds a free-text “Something else” field to select or multiselect.

skippable=True adds a Skip button. The return value is None when the user skips.

preference = await ctx.wait_for_user(
    question="Pick a tone for the report:",
    input_type="select",
    options=[
        {"id": "formal", "label": "Formal"},
        {"id": "casual", "label": "Casual"},
    ],
    allow_custom=True,
    skippable=True,
)

if preference is None:
    preference = "formal"   # default when skipped

Multiple pauses in one workflow

You can call ctx.wait_for_user() as many times as you need. Each call gets its own pause index so the right cached answer is returned on replay.

@workflow
async def review_workflow(ctx: WorkflowContext, draft: str) -> str:
    # First pause: approval
    decision = await ctx.wait_for_user(
        question=f"Approve this draft?\n\n{draft}",
        input_type="approval",
        options=[
            {"id": "approve", "label": "Approve"},
            {"id": "edit", "label": "Edit"},
            {"id": "reject", "label": "Reject"},
        ],
    )

    if decision == "reject":
        return "Rejected."

    if decision == "edit":
        # Second pause: get edited version
        draft = await ctx.wait_for_user(
            question="Paste your revised draft:",
            input_type="text",
        )

    # Third pause: final confirmation before publishing
    confirm = await ctx.wait_for_user(
        question="Publish now?",
        input_type="approval",
        options=[
            {"id": "yes", "label": "Publish"},
            {"id": "no", "label": "Save as draft"},
        ],
    )

    return "Published." if confirm == "yes" else "Saved as draft."

Real-world example

A customer submits a refund request. An AI agent reviews it and produces a recommendation. A support agent then steps in at four points: decide the outcome, adjust the amount if needed, pick the refund method, and confirm before money moves.

from agnt5 import Agent, workflow, WorkflowContext, function, FunctionContext
from agnt5.lm import BuiltInTool


# ── Step 1: AI analyses the refund request ──────────────────────────────────

@function
async def analyse_refund(ctx: FunctionContext, order_id: str, reason: str) -> dict:
    """Look up the order and produce a refund recommendation."""
    # In practice: fetch order from your database
    return {
        "order_id": order_id,
        "order_total": 149.99,
        "eligible": True,
        "suggested_amount": 149.99,
        "reason": reason,
        "summary": f"Order {order_id}, £149.99, eligible for full refund. Reason: {reason}",
    }


# ── Step 2: Process the approved refund ─────────────────────────────────────

@function
async def process_refund(
    ctx: FunctionContext,
    order_id: str,
    amount: float,
    method: str,
    notify_channels: list[str],
) -> dict:
    """Issue the refund and send notifications."""
    # In practice: call your payments API and notification service
    ctx.logger.info(f"Refund processed: £{amount} via {method} for order {order_id}")
    return {
        "status": "refunded",
        "order_id": order_id,
        "amount": amount,
        "method": method,
        "notified_via": notify_channels,
    }


# ── Workflow ─────────────────────────────────────────────────────────────────

@workflow
async def refund_approval_workflow(
    ctx: WorkflowContext,
    order_id: str,
    reason: str,
) -> dict:

    # Stage 1: AI reviews the request
    analysis = await ctx.step(analyse_refund, order_id, reason)

    if not analysis["eligible"]:
        return {"status": "ineligible", "order_id": order_id}

    # ── HITL pause 1: approve / modify / reject ──────────────────────────────
    if not ctx._is_replay:
        ctx.logger.info("Waiting for support agent decision...")

    decision = await ctx.wait_for_user(
        question=(
            f"{analysis['summary']}\n\n"
            f"Suggested refund: £{analysis['suggested_amount']}\n\n"
            "What would you like to do?"
        ),
        input_type="select",
        options=[
            {"id": "approve", "label": "Approve full refund"},
            {"id": "modify",  "label": "Approve with a different amount"},
            {"id": "reject",  "label": "Reject refund"},
        ],
    )

    if decision == "reject":
        return {"status": "rejected", "order_id": order_id}

    # ── HITL pause 2: custom amount (only if modifying) ──────────────────────
    refund_amount = analysis["suggested_amount"]

    if decision == "modify":
        raw = await ctx.wait_for_user(
            question=f"Enter the refund amount (order total: £{analysis['order_total']}):",
            input_type="text",
        )
        refund_amount = float(raw)

    # ── HITL pause 3: pick refund method ─────────────────────────────────────
    method = await ctx.wait_for_user(
        question="How should the refund be returned to the customer?",
        input_type="select",
        options=[
            {"id": "original",     "label": "Original payment method"},
            {"id": "store_credit", "label": "Store credit"},
            {"id": "bank",         "label": "Bank transfer"},
        ],
    )

    # ── HITL pause 4: pick notification channels ──────────────────────────────
    channels_raw = await ctx.wait_for_user(
        question="Notify the customer via:",
        input_type="multiselect",
        options=[
            {"id": "email", "label": "Email"},
            {"id": "sms",   "label": "SMS"},
            {"id": "push",  "label": "Push notification"},
        ],
    )
    notify_channels = channels_raw.split(",") if channels_raw else ["email"]

    # ── HITL pause 5: final confirmation before money moves ───────────────────
    confirm = await ctx.wait_for_user(
        question=(
            f"Ready to process:\n"
            f"  Order:   {order_id}\n"
            f"  Amount:  £{refund_amount}\n"
            f"  Method:  {method}\n"
            f"  Notify:  {', '.join(notify_channels)}\n\n"
            "Confirm?"
        ),
        input_type="approval",
        options=[
            {"id": "confirm", "label": "Confirm & Process"},
            {"id": "cancel",  "label": "Cancel"},
        ],
    )

    if confirm == "cancel":
        return {"status": "cancelled", "order_id": order_id}

    # Stage 2: process the refund
    result = await ctx.step(process_refund, order_id, refund_amount, method, notify_channels)
    return result

What each pause does:

Pause Input type Purpose
1 select Support agent decides: full refund, modified amount, or reject
2 text Agent enters a custom amount (only shown when modifying)
3 select Agent picks the refund method
4 multiselect Agent picks which channels to notify the customer on
5 approval Final review before the refund is actually processed

Under the hood: replay behaviour

Understanding this helps when things do not behave the way you expect.

When ctx.wait_for_user() is called for the first time, the workflow saves its state and stops. When the user responds, AGNT5 runs the workflow function again from the very top. All the code before wait_for_user() runs again, but this time wait_for_user() finds the saved answer and returns it immediately without pausing.

This means anything before the pause fires twice: once before the user responds, and once on resume.

First run:   generate_draft() → wait_for_user() → pauses ⏸
On resume:   generate_draft() → wait_for_user() → returns answer → publish() ✅

Wrap side effects with if not ctx._is_replay: to make them fire only once:

@workflow
async def publish_workflow(ctx: WorkflowContext, topic: str) -> str:
    draft = await generate_draft(topic)

    # Without the guard, this log fires twice (before and after the pause)
    if not ctx._is_replay:
        ctx.logger.info("Draft ready, waiting for approval...")

    decision = await ctx.wait_for_user(   # never guarded, must run on both passes
        question=f"Approve this draft?\n\n{draft}",
        input_type="approval",
        options=[
            {"id": "approve", "label": "Approve"},
            {"id": "discard", "label": "Discard"},
        ],
    )

    if decision == "discard":
        return "Discarded."

    return await publish(draft)
Tip: Rule: Guard side effects, never guard the pause itself.

Edge cases

User skips the question

When skippable=True, the return value is None if the user clicks Skip. Always handle None explicitly to avoid a TypeError later.

note = await ctx.wait_for_user(
    question="Any special instructions?",
    input_type="text",
    skippable=True,
)

instructions = note or "No special instructions."

User enters unexpected text in a text input

wait_for_user() returns whatever the user typed as a plain string. If you need a number or a specific format, validate and convert it yourself:

raw = await ctx.wait_for_user("Enter the refund amount (numbers only):")

try:
    amount = float(raw)
except ValueError:
    return {"status": "error", "message": f"Invalid amount: {raw}"}

Conditional pause: pause only sometimes

wait_for_user() can be inside an if block. The pause index is tracked per call that actually executes, so conditional pauses work correctly across replays.

decision = await ctx.wait_for_user(
    question="Approve or modify?",
    input_type="select",
    options=[
        {"id": "approve", "label": "Approve"},
        {"id": "modify",  "label": "Modify amount"},
    ],
)

if decision == "modify":
    # This pause only runs when the agent chose "modify"
    raw = await ctx.wait_for_user("Enter the new amount:")
    amount = float(raw)

Side effects before a pause that must not repeat

If you need to send a notification or call an external API before the pause, wrap it in if not ctx._is_replay: so it only fires on the first pass:

if not ctx._is_replay:
    await send_slack_alert(f"Refund request {order_id} needs review.")

decision = await ctx.wait_for_user("Approve refund?", input_type="approval", options=[...])
© 2026 AGNT5
llms.txt