Human-in-the-loop
Pause workflows for durable human input, approvals, selections, and follow-up decisions.
Human-in-the-loop (HITL) lets a workflow pause mid-execution, wait for a human to respond, then continue from exactly where it left off. The pause is durable. If the worker restarts while waiting, the workflow resumes correctly when the user eventually replies.
Call ctx.wait_for_user() anywhere inside a workflow to trigger a pause.
How it works
Call ctx.wait_for_user() at any point in your workflow. The workflow pauses there, shows the question to the user, and resumes from that exact point once they respond. The pause survives worker restarts. The state is saved and nothing is lost.
ctx.wait_for_user() parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
question | str | required | The text shown to the user |
input_type | str | "text" | Input mode (see below) |
options | list[dict] | None | Choices for approval, select, and multiselect. Each dict needs "id" and "label" |
allow_custom | bool | False | Adds a free-text “Something else” option to select and multiselect |
skippable | bool | False | Adds a Skip button. Returns None when skipped |
Input types
text: free text
The user types any response. Use this for open-ended input like names, instructions, or edited content.
name = await ctx.wait_for_user("What should we call this report?") approval: yes / no
Present a clear action and let the user approve or reject it.
decision = await ctx.wait_for_user(
question="Deploy to production?",
input_type="approval",
options=[
{"id": "approve", "label": "Approve"},
{"id": "reject", "label": "Reject"},
],
)
if decision == "reject":
return {"status": "cancelled"} select: pick one
The user picks a single option from a list.
format = await ctx.wait_for_user(
question="Which output format do you want?",
input_type="select",
options=[
{"id": "pdf", "label": "PDF"},
{"id": "markdown", "label": "Markdown"},
{"id": "html", "label": "HTML"},
],
) multiselect: pick many
The user picks one or more options. The return value is a comma-separated string of the selected ids.
topics = await ctx.wait_for_user(
question="Which topics should the report cover?",
input_type="multiselect",
options=[
{"id": "market", "label": "Market analysis"},
{"id": "tech", "label": "Technology trends"},
{"id": "risk", "label": "Risk factors"},
],
)
# topics might be "market,tech"
selected = topics.split(",")Options: allow_custom and skippable
allow_custom=True adds a free-text “Something else” field to select or multiselect.
skippable=True adds a Skip button. The return value is None when the user skips.
preference = await ctx.wait_for_user(
question="Pick a tone for the report:",
input_type="select",
options=[
{"id": "formal", "label": "Formal"},
{"id": "casual", "label": "Casual"},
],
allow_custom=True,
skippable=True,
)
if preference is None:
preference = "formal" # default when skippedMultiple pauses in one workflow
You can call ctx.wait_for_user() as many times as you need. Each call gets its own pause index so the right cached answer is returned on replay.
@workflow
async def review_workflow(ctx: WorkflowContext, draft: str) -> str:
# First pause: approval
decision = await ctx.wait_for_user(
question=f"Approve this draft?\n\n{draft}",
input_type="approval",
options=[
{"id": "approve", "label": "Approve"},
{"id": "edit", "label": "Edit"},
{"id": "reject", "label": "Reject"},
],
)
if decision == "reject":
return "Rejected."
if decision == "edit":
# Second pause: get edited version
draft = await ctx.wait_for_user(
question="Paste your revised draft:",
input_type="text",
)
# Third pause: final confirmation before publishing
confirm = await ctx.wait_for_user(
question="Publish now?",
input_type="approval",
options=[
{"id": "yes", "label": "Publish"},
{"id": "no", "label": "Save as draft"},
],
)
return "Published." if confirm == "yes" else "Saved as draft."Real-world example
A customer submits a refund request. An AI agent reviews it and produces a recommendation. A support agent then steps in at four points: decide the outcome, adjust the amount if needed, pick the refund method, and confirm before money moves.
from agnt5 import Agent, workflow, WorkflowContext, function, FunctionContext
from agnt5.lm import BuiltInTool
# ── Step 1: AI analyses the refund request ──────────────────────────────────
@function
async def analyse_refund(ctx: FunctionContext, order_id: str, reason: str) -> dict:
"""Look up the order and produce a refund recommendation."""
# In practice: fetch order from your database
return {
"order_id": order_id,
"order_total": 149.99,
"eligible": True,
"suggested_amount": 149.99,
"reason": reason,
"summary": f"Order {order_id}, £149.99, eligible for full refund. Reason: {reason}",
}
# ── Step 2: Process the approved refund ─────────────────────────────────────
@function
async def process_refund(
ctx: FunctionContext,
order_id: str,
amount: float,
method: str,
notify_channels: list[str],
) -> dict:
"""Issue the refund and send notifications."""
# In practice: call your payments API and notification service
ctx.logger.info(f"Refund processed: £{amount} via {method} for order {order_id}")
return {
"status": "refunded",
"order_id": order_id,
"amount": amount,
"method": method,
"notified_via": notify_channels,
}
# ── Workflow ─────────────────────────────────────────────────────────────────
@workflow
async def refund_approval_workflow(
ctx: WorkflowContext,
order_id: str,
reason: str,
) -> dict:
# Stage 1: AI reviews the request
analysis = await ctx.step(analyse_refund, order_id, reason)
if not analysis["eligible"]:
return {"status": "ineligible", "order_id": order_id}
# ── HITL pause 1: approve / modify / reject ──────────────────────────────
if not ctx._is_replay:
ctx.logger.info("Waiting for support agent decision...")
decision = await ctx.wait_for_user(
question=(
f"{analysis['summary']}\n\n"
f"Suggested refund: £{analysis['suggested_amount']}\n\n"
"What would you like to do?"
),
input_type="select",
options=[
{"id": "approve", "label": "Approve full refund"},
{"id": "modify", "label": "Approve with a different amount"},
{"id": "reject", "label": "Reject refund"},
],
)
if decision == "reject":
return {"status": "rejected", "order_id": order_id}
# ── HITL pause 2: custom amount (only if modifying) ──────────────────────
refund_amount = analysis["suggested_amount"]
if decision == "modify":
raw = await ctx.wait_for_user(
question=f"Enter the refund amount (order total: £{analysis['order_total']}):",
input_type="text",
)
refund_amount = float(raw)
# ── HITL pause 3: pick refund method ─────────────────────────────────────
method = await ctx.wait_for_user(
question="How should the refund be returned to the customer?",
input_type="select",
options=[
{"id": "original", "label": "Original payment method"},
{"id": "store_credit", "label": "Store credit"},
{"id": "bank", "label": "Bank transfer"},
],
)
# ── HITL pause 4: pick notification channels ──────────────────────────────
channels_raw = await ctx.wait_for_user(
question="Notify the customer via:",
input_type="multiselect",
options=[
{"id": "email", "label": "Email"},
{"id": "sms", "label": "SMS"},
{"id": "push", "label": "Push notification"},
],
)
notify_channels = channels_raw.split(",") if channels_raw else ["email"]
# ── HITL pause 5: final confirmation before money moves ───────────────────
confirm = await ctx.wait_for_user(
question=(
f"Ready to process:\n"
f" Order: {order_id}\n"
f" Amount: £{refund_amount}\n"
f" Method: {method}\n"
f" Notify: {', '.join(notify_channels)}\n\n"
"Confirm?"
),
input_type="approval",
options=[
{"id": "confirm", "label": "Confirm & Process"},
{"id": "cancel", "label": "Cancel"},
],
)
if confirm == "cancel":
return {"status": "cancelled", "order_id": order_id}
# Stage 2: process the refund
result = await ctx.step(process_refund, order_id, refund_amount, method, notify_channels)
return resultWhat each pause does:
| Pause | Input type | Purpose |
|---|---|---|
| 1 | select | Support agent decides: full refund, modified amount, or reject |
| 2 | text | Agent enters a custom amount (only shown when modifying) |
| 3 | select | Agent picks the refund method |
| 4 | multiselect | Agent picks which channels to notify the customer on |
| 5 | approval | Final review before the refund is actually processed |
Under the hood: replay behaviour
Understanding this helps when things do not behave the way you expect.
When ctx.wait_for_user() is called for the first time, the workflow saves its state and stops. When the user responds, AGNT5 runs the workflow function again from the very top. All the code before wait_for_user() runs again, but this time wait_for_user() finds the saved answer and returns it immediately without pausing.
This means anything before the pause fires twice: once before the user responds, and once on resume.
First run: generate_draft() → wait_for_user() → pauses ⏸
On resume: generate_draft() → wait_for_user() → returns answer → publish() ✅Wrap side effects with if not ctx._is_replay: to make them fire only once:
@workflow
async def publish_workflow(ctx: WorkflowContext, topic: str) -> str:
draft = await generate_draft(topic)
# Without the guard, this log fires twice (before and after the pause)
if not ctx._is_replay:
ctx.logger.info("Draft ready, waiting for approval...")
decision = await ctx.wait_for_user( # never guarded, must run on both passes
question=f"Approve this draft?\n\n{draft}",
input_type="approval",
options=[
{"id": "approve", "label": "Approve"},
{"id": "discard", "label": "Discard"},
],
)
if decision == "discard":
return "Discarded."
return await publish(draft)Edge cases
User skips the question
When skippable=True, the return value is None if the user clicks Skip. Always handle None explicitly to avoid a TypeError later.
note = await ctx.wait_for_user(
question="Any special instructions?",
input_type="text",
skippable=True,
)
instructions = note or "No special instructions."User enters unexpected text in a text input
wait_for_user() returns whatever the user typed as a plain string. If you need a number or a specific format, validate and convert it yourself:
raw = await ctx.wait_for_user("Enter the refund amount (numbers only):")
try:
amount = float(raw)
except ValueError:
return {"status": "error", "message": f"Invalid amount: {raw}"}Conditional pause: pause only sometimes
wait_for_user() can be inside an if block. The pause index is tracked per call that actually executes, so conditional pauses work correctly across replays.
decision = await ctx.wait_for_user(
question="Approve or modify?",
input_type="select",
options=[
{"id": "approve", "label": "Approve"},
{"id": "modify", "label": "Modify amount"},
],
)
if decision == "modify":
# This pause only runs when the agent chose "modify"
raw = await ctx.wait_for_user("Enter the new amount:")
amount = float(raw)Side effects before a pause that must not repeat
If you need to send a notification or call an external API before the pause, wrap it in if not ctx._is_replay: so it only fires on the first pass:
if not ctx._is_replay:
await send_slack_alert(f"Refund request {order_id} needs review.")
decision = await ctx.wait_for_user("Approve refund?", input_type="approval", options=[...])