# AGNT5 — full documentation > One runtime for durable AI agent execution — checkpoint every step, trace every call, replay any run for evals. > For the structured index, see [llms.txt](/llms.txt). For agent operating guidance, see [skill.md](/skill.md). Source: https://agnt5.com This file concatenates every page an AI agent should know about, in one fetch. Pages are separated by `---` rules; each starts with an `## H2` title. # Core Concepts --- ## Agents _Source: https://agnt5.com/docs/concepts/agents.md_ > LLM-driven loops — instructions plus model plus tools, hosted inside step boundaries so their non-determinism is contained. > An **agent** is an `Agent`-class instance — an LLM-driven loop that takes instructions, picks actions, and produces output. Agents are non-deterministic by design and run inside step boundaries so the runtime can journal their result. ```python from agnt5 import Agent researcher = Agent( name="researcher", model="openai/gpt-4o-mini", instructions=( "Research the topic the user provides. Use the available tools to fetch " "facts. Summarize your findings in three sentences." ), tools=[search_database, fetch_article], max_iterations=5, temperature=0.3, ) result = await researcher.run_sync("What is durable execution?") print(result.output) ``` The `researcher` instance is configured once and called many times. Each call starts a loop: the model proposes an action, the runtime executes it (a tool call, a handoff, or a final answer), the loop continues until the model produces a final answer or the iteration limit is hit. ## The mental model An agent is **configuration plus a loop**. Configuration is the constructor: `name`, `model`, `instructions` (the system prompt), `tools` (capabilities the model can invoke), `handoffs` (other agents the model can transfer to), `max_iterations` (the safety limit), `temperature`. The loop is what `run_sync` (or its async siblings) drives: each iteration, the model sees the conversation state, proposes an action, and the runtime executes it. There are three kinds of action a model can propose. **A tool call** invokes a `@tool`-decorated callable from the agent's `tools=[...]` list; the tool runs, its output goes back to the model, and the loop continues. **A handoff** transfers control to another agent listed in `handoffs=[...]`; the receiving agent takes over and produces the final answer. **A final answer** ends the loop; the runtime returns an `AgentResult` whose `.output` is the answer. Non-determinism is the defining property. The same input may produce different outputs across runs, different tool calls within one run, different handoff decisions across versions of the same model. AGNT5 reconciles this with deterministic workflows by hosting the agent's call inside a step boundary. The agent runs once, the step journals the `AgentResult`, and the workflow body sees a deterministic value on replay. ## Why it works this way LLM agency requires a loop, and a loop requires a host that contains its non-determinism. AGNT5 puts that host at the step boundary: when a workflow calls a `@function` that runs an agent, the function executes inside `ctx.step`, the agent's loop runs inside the function, and the journal records the function's return value. Replay reads the recorded value; the agent does not run again. The constructor pattern (configure once, call many times) is also intentional. Agent configuration includes the system prompt, tools, and model — all of which influence behavior in subtle ways. Centralizing them in one `Agent` instance means there is one place to audit the agent's capabilities, one place to tune its temperature, one place to swap its model. ## Edge cases and gotchas - **Never call an agent from a workflow body without `ctx.step`.** Calling `agent.run_sync(...)` directly inside a `@workflow`-decorated function is a determinism violation: the agent's output will differ across replays. Wrap it in a `@function` and reach it through `ctx.step`. - **`max_iterations` is the safety net for runaway loops.** Without it, a model that keeps proposing tool calls without converging will loop indefinitely. Set it explicitly; do not rely on the default. - **Handoffs run inside the original step.** When agent A hands off to agent B inside one step, the journal records one result — agent B's output. The handoff is invisible to the workflow above. - **Agents-as-tools follow the same rule.** Pass another `Agent` to `tools=[...]` and the parent agent can invoke it as a tool. The whole composition runs inside the step that started it. - **`agent` is lowercase in prose.** The Python class is `Agent`; in body text the noun is `agent`, never "AI agent" or "Agent". - **Streaming changes the API, not the durability model.** Streaming variants (`run_stream`, agent streaming events) deliver tokens as they generate, but the loop and step boundary work the same way; the journal still records the final result. - **`run_sync` is async.** The `_sync` suffix refers to the agent's loop completing before the call returns, not to blocking the event loop. Always `await` it. ## Related concepts - [Tools](/docs/concepts/tools.md) — the capabilities agents invoke during their loop. - [Functions](/docs/concepts/functions.md) — the host an agent runs inside, so workflows can checkpoint it. - [Determinism — why workflows have rules](/docs/concepts/determinism.md) — why agents must run inside step boundaries. - [Picking the right primitive](/docs/concepts/picking-the-right-primitive.md) — when to reach for an agent versus a workflow. **Code primitives**: `Agent` class (Python and TypeScript SDKs); agent loops live inside `ctx.step(...)` boundaries **Related CLI**: [agnt5 dev](/cli/deploy.md) (local iteration), [agnt5 deploy](/cli/deploy.md) (production) --- ## Architecture overview _Source: https://agnt5.com/docs/concepts/architecture-overview.md_ > The map — Gateway, Engine, Coordinator, plus your workers — and how they fit together in a single binary. > AGNT5 is **one runtime binary** with three components — Gateway (ingress), Engine (workflow scheduling + journal), Coordinator (worker dispatch) — plus **your workers**, which are separate processes that connect out to the Coordinator over gRPC. ``` ┌─────────────────────────────────────┐ │ AGNT5 runtime binary │ │ │ client ──HTTP──► │ Gateway ──► Engine ──► Coord. │ ──gRPC──► worker │ ◄── ◄── │ ◄──────── (your code) │ │ │ journal · S3 archive · query │ └─────────────────────────────────────┘ ``` A single process serves all three components by default. Larger deployments split them with `--target gateway | engine | coordinator | all`. Workers are always separate. ## The mental model **Gateway** is the front door. It accepts HTTP from clients (REST, SSE for streaming) and forwards run starts, signals, and queries to the Engine. It is stateless: no run progresses through the Gateway, only requests pass through it. **Engine** is the brain. It owns the journal — every step's input, output, error, and timing — and the lease manager — which worker holds which run. When a workflow calls `ctx.step(...)`, the Engine decides whether to replay from the journal or dispatch the step to a worker; it writes the outcome to the journal regardless. **Coordinator** is the worker bridge. Workers connect outbound to the Coordinator over gRPC and stay connected. When the Engine needs a step executed, it hands the call to the Coordinator, which routes it over the worker's open stream. Worker output flows back the same way. **Workers** host your code. They are separate processes you run — `agnt5 dev` for local, container deployments for managed environments. A worker registers its `@workflow`/`@function`/`@tool`/`Agent` instances at startup, then waits for dispatch from the Coordinator. Multiple workers can serve the same project; the Coordinator routes by `(tenant_id, deployment_id, component_id)`. The single-binary default makes local development one command — `agnt5 dev` starts one process and you have a working runtime. The split-binary mode (`--target`) lets larger deployments scale Gateway, Engine, and Coordinator independently. The client-facing surface is identical in both modes, so code does not change between them. ## Why it works this way A single binary makes the runtime fit on a developer's laptop, in a Docker container, or on a small Fly machine — Railway, Render, Fly.io, even a Raspberry Pi can host the whole runtime. Splitting only when you need to scale keeps the operations story clean: one process, one config, one log stream until traffic forces otherwise. Worker-initiated gRPC connections invert the usual ingress model. Instead of the runtime needing to route inbound to workers (which means knowing every worker's address, opening firewalls, and managing TLS for each), workers dial out to a single coordinator endpoint. That endpoint can sit behind a load balancer, the workers can live anywhere with outbound network access, and TLS terminates once at the LB. The journal-and-lease pattern in the Engine is the single source of truth for run state. Every other component (Gateway, Coordinator, query layer) reads from or routes around the journal — there is no second source of truth to keep consistent. ## Edge cases and gotchas - **`--target` flag splits the binary.** `all` (default) runs everything in one process. `gateway`, `engine`, `coordinator` each run that one component. The same binary serves every target — selection is at startup time only. - **Gateway is stateless; Engine is the stateful one.** Engine holds the journal and lease manager. Scaling Engine is a different problem from scaling Gateway — Engine needs HA-aware storage; Gateway needs only more replicas. - **Workers connect out, not in.** The runtime never opens a connection to a worker. This means workers can run inside private networks, behind NAT, or in environments that block inbound traffic, as long as they can reach the Coordinator endpoint. - **The Coordinator endpoint must use the `http://` scheme.** Tonic (the Rust gRPC client) does not normalize bare `host:port` strings. Worker config must include the scheme — the local dev stack pins this in `config.managed.yml`. - **Standalone and HA modes share the client surface.** A single-node `agnt5 dev` and a three-node Envoy-fronted HA cluster expose the same gRPC services on the same ports. Worker code does not change. - **The runtime's storage is RocksDB + S3 + DuckDB.** RocksDB holds the active journal (write-ahead log). Sealed segments are uploaded to S3 as Parquet. DuckDB queries the Parquet over S3 for the trace UI and eval reads. Storage choices are visible to operators; user code never touches them. ## Related concepts - [What the runtime owns vs. your code](/docs/concepts/runtime-vs-your-code.md) — the responsibility boundary across this picture. - [Durable execution](/docs/concepts/durable-execution.md) — the guarantee the Engine + journal implements. - [Versioning and deployment model](/docs/concepts/versioning-and-deployment.md) — how code changes propagate through the architecture. - [Sandbox isolation tiers](/docs/concepts/sandbox-isolation-tiers.md) — how worker execution environments are configured. **Runtime components** (single binary): Gateway (HTTP/SDK ingress), Engine (workflow execution), Coordinator (worker routing) **Storage layers**: RocksDB (WAL), S3 (sealed segments, snapshots, Parquet archives), DuckDB (query layer) **Related CLI**: [agnt5 deploy](/cli/deploy.md) --- ## Determinism — why workflows have rules _Source: https://agnt5.com/docs/concepts/determinism.md_ > The contract on workflow code — replay must arrive at the same step calls in the same order — and how to keep your code on the right side of it. > Workflow code is **deterministic by contract**: given the same inputs and journal, it produces the same sequence of `ctx.step(...)` calls. Anything that varies between runs has to live inside a step, where its result is journaled. ```python from agnt5 import WorkflowContext, workflow # WRONG — clock read in workflow body @workflow async def daily_summary_bad(ctx: WorkflowContext) -> str: today = datetime.utcnow().date() # different value on replay rows = await ctx.step(load_rows, today) return await ctx.step(summarize, rows) # RIGHT — clock read inside a step @workflow async def daily_summary_good(ctx: WorkflowContext) -> str: today = await ctx.step("today", lambda: datetime.utcnow().date()) rows = await ctx.step(load_rows, today) return await ctx.step(summarize, rows) ``` The bad version replays differently on a Tuesday than it did on a Monday — `load_rows` would be journaled with Monday's date, then re-called with Tuesday's, and the runtime sees two different inputs at the same call site. Replay drift error. The good version journals the date as a step result, so replay reads the original Monday value and reaches the same `load_rows` call. ## The mental model Replay walks the workflow body and matches each `ctx.step(...)` call to a journal entry by **call order**. If your code reaches the same calls in the same order on every run, replay works. If the code's behavior depends on something that changes between runs — a clock, a random number, a network response, the iteration order of a Python set — replay reaches different calls and the runtime cannot tell which journal entry belongs to which call site. The fix is always the same: **move the non-deterministic value into a step**. Once it is journaled, replay reads the original value and the workflow body is deterministic again. `ctx.step("name", lambda: ...)` exists for exactly this purpose — it lets you wrap an arbitrary expression so its result is captured. This contract is **not enforced at compile time**. No Python type system can prove a function is deterministic. Violations show up as replay-drift errors at runtime, often only when a worker crash forces a real replay. Treat the rule as a discipline; tests that simulate replay (worker restart mid-run) are the cheapest way to catch drift before production. ## Why it works this way Determinism is the price AGNT5 pays for not persisting full process memory at each step. The runtime needs a stable mapping from "where am I in the recipe" to "what should I do next" — and the only sustainable mapping is: walk the recipe deterministically, match calls in order, read journaled outcomes for completed calls, run the next call live. The alternative — full memory snapshots, distributed transactions, or hash-based call-site identification — is either slower, more fragile, or both. The workflow-body constraint is small in practice (most logic is naturally deterministic) and explicit (you can see exactly which calls would violate it). ## Edge cases and gotchas - **Common offenders to move into steps:** - `time.time()`, `datetime.utcnow()`, `datetime.now()`, any clock read - `random.choice(...)`, `random.random()`, `uuid.uuid4()` - Network calls, file I/O, database reads - Reading environment variables that may change between runs - Iterating over a `dict` whose key insertion order differs between runs - **Loops are fine; their bounds must be deterministic.** `for item in journaled_list: ...` is safe — the loop count comes from a journaled value. `for _ in range(some_random_count)` is not. - **Conditional `ctx.step(...)` calls are fine if the condition is deterministic.** A branch whose condition reads a journaled value (or the workflow input) takes the same path on replay. A branch whose condition reads a clock or RNG does not. - **In-process caches are a hidden source of drift.** A module-level `_cache: dict = {}` populated during the original run is empty on a fresh worker. Any code that depends on cache state will reach different call sites. Caches must live inside steps if their values matter. - **Replay drift errors point at the call site, not the source.** When you see a drift exception, the offending non-determinism is somewhere *upstream* of the named step — the step itself is fine; the inputs reaching it differ from what was journaled. - **`agnt5 inspect trace` shows the exact step sequence.** When debugging suspected drift, compare the trace from the original run to the trace from replay. The first call site that differs is where the non-determinism lives. ## Related concepts - [Event sourcing and replay](/docs/concepts/event-sourcing-and-replay.md) — the mechanism that makes determinism necessary. - [Workflows](/docs/concepts/workflows.md) — where the determinism contract applies. - [Functions](/docs/concepts/functions.md) — the host for non-determinism (functions are free to be as non-deterministic as you need). **Code primitives**: `@workflow` decorator (Python), `workflow(...)` factory (TypeScript); external effects go through `ctx.step("name", lambda: ...)` **Allowed inside workflow body**: `await ctx.step(...)`, deterministic control flow over journaled state **Forbidden inside workflow body**: direct I/O, wall-clock time, randomness, threading — call these from inside a step instead --- ## Durable execution _Source: https://agnt5.com/docs/concepts/durable-execution.md_ > The runtime guarantee that a workflow's progress survives crashes — completed steps are not re-run. > **Durable execution** is a runtime guarantee that a workflow's progress survives process crashes, network failures, and restarts: completed steps are replayed from the journal, not re-run. ```python from agnt5 import FunctionContext, WorkflowContext, function, workflow @function async def charge_card(ctx: FunctionContext, order_id: str) -> str: # Real side effect: a charge happens at most once per order_id. return await payments.charge(order_id) @function async def send_receipt(ctx: FunctionContext, order_id: str, txn: str) -> None: await email.send(order_id, txn) @workflow async def checkout(ctx: WorkflowContext, order_id: str) -> str: txn = await ctx.step(charge_card, order_id) # If the worker dies here, the next attempt skips charge_card # (its result is in the journal) and runs send_receipt. await ctx.step(send_receipt, order_id, txn) return txn ``` If the worker crashes between `charge_card` returning and `send_receipt` starting, the next attempt does not charge the card again. The runtime reads the recorded `txn` from the journal, advances past `charge_card`, and runs `send_receipt` against that value. ## The mental model Think of the workflow body as a **recipe** and the journal as the **cooked-pot history**: a record of what has already been prepared. Replay walks the recipe step by step. At each step, the runtime asks one question: do I have a recorded result for this call in this run? If yes, replay returns the recorded value and moves on. If no, the runtime executes the step for real, writes the input and output to the journal, then returns the value. This means your code stays the shape of ordinary `async` Python. There is no `try/except` for transient infrastructure errors at the workflow level, no resumption flags, no manual checkpoint tables. The recovery contract lives in the runtime; you write business logic. The unit of durability is the **step**, not the line. Anything that happens between two `ctx.step(...)` calls is workflow body code — branches, variable assignments, calls to deterministic helpers — and is re-executed on replay. Anything *inside* a step is a side effect that runs at most once per run, modulo the gotcha below. ## Why it works this way The alternative is to make every line a checkpoint. That has been tried; it produces unreadable code and unbounded journals. The opposite extreme is to checkpoint only at workflow boundaries, which makes any non-trivial multi-step process unrecoverable without manual cleanup. The step boundary is the compromise: explicit enough that you can see where the durability bargain is being made, coarse enough that the journal stays bounded, fine enough that recovery is automatic. The cost is a constraint on workflow code: the body must be deterministic. Replay must arrive at the same `ctx.step(...)` calls in the same order, every time. AGNT5 trades this constraint for an automatic recovery model — without it, the system would have no way to tell which journaled result belongs to which call site. ## Edge cases and gotchas - **Durability is not idempotency at the side effect.** If `charge_card` partially succeeded — the network call left your process, the bank charged the card, but the response never came back — the runtime cannot tell. On retry it will run `charge_card` again. Design side-effecting steps to be idempotent at the external boundary (idempotency keys, conditional inserts, `INSERT ... ON CONFLICT`). - **Long-running steps hold a lease.** A step that takes hours blocks the run from progressing past it. Set a `step_timeout` and surface partial progress through smaller steps rather than waiting indefinitely inside one call. - **The workflow body must stay deterministic.** Wall-clock reads, random numbers, network calls, and in-process caches in the workflow body are replay hazards. Move them inside a step, where their result is recorded. See [Determinism](/docs/concepts/determinism.md) for the full list. - **Replay reads the journal first.** If the journal entry for a step is missing — the run was started fresh, the step is new code, or the journal was trimmed — the runtime executes the step for real. There is no "fail closed" mode for missing entries: missing means run-fresh. - **Durability is per-run, not per-input.** Re-invoking the same workflow with the same input creates a new run with a new ID and a new journal. The runtime does not deduplicate on input. If you need at-most-once semantics across submissions, dedupe at the caller. ## Related concepts - [Event sourcing and replay](/docs/concepts/event-sourcing-and-replay.md) — the journal mechanics that make durable execution work. - [Determinism — why workflows have rules](/docs/concepts/determinism.md) — the constraint replay imposes on workflow code. - [What the runtime owns vs. your code](/docs/concepts/runtime-vs-your-code.md) — the responsibility split this concept creates. **Code primitives**: `@workflow` decorator, `ctx.step("name", lambda: ...)`; every step's input and output is journaled **Guarantee**: completed steps are not re-run after a crash; in-flight steps re-execute from the last checkpoint **Related CLI**: [agnt5 deploy](/cli/deploy.md), [agnt5 logs](/cli/deployments.md) --- ## Event sourcing and replay _Source: https://agnt5.com/docs/concepts/event-sourcing-and-replay.md_ > How AGNT5 records every step's input and output to a journal, and how replay reads the journal to skip work that already ran. > AGNT5 records every step's input, output, error, and timing to an append-only **journal**. Recovery — and idempotent re-execution — works by **replaying** that journal: at each step the runtime asks whether a record already exists, returns it if so, runs the step for real if not. ``` Run start ├─ ctx.step(fetch_article, url) ──► journal: { step: 1, in: url, out: } ├─ ctx.step(summarize, html) ──► journal: { step: 2, in: html, out: } └─ return Crash + restart, same run Run resume ├─ ctx.step(fetch_article, url) ──► journal HIT → returns , no fetch ├─ ctx.step(summarize, html) ──► journal HIT → returns , no LLM call └─ return ``` The crashed worker did not lose state. The journal is the state. The new worker walks the same recipe and reads each step's outcome from disk. ## The mental model Think of the journal as a **logbook** the runtime keeps next to your workflow. Every time the workflow body crosses a `ctx.step(...)` call, the runtime opens the logbook to that page and asks: have I already written down what happened here? If yes, replay returns the recorded value and moves on. If no, the runtime executes the step, records what happened, and only then returns control to the workflow. The unit recorded is **the step**, not the line. Code between two `ctx.step(...)` calls — branches, variable assignments, deterministic helpers — re-executes on every replay; that's why the workflow body must be deterministic. Anything inside a step is opaque to replay; the runtime sees only the input it received and the output it returned. The journal is **append-only**. Steps record success and failure outcomes; a failed step that retried until it succeeded leaves a trail of attempts plus the final success. The journal is also the source of every other observability artifact AGNT5 produces — traces, eval datasets, debug snapshots all read from it. ## Why it works this way Event sourcing is the cheapest mechanism that gives you exactly-the-required-amount of recovery: the runtime can resume a crashed run without your code knowing it crashed, and without re-running side effects you have already paid for. The alternative — persisting full process memory at every step — is orders of magnitude more expensive and fragile across deploys. It also makes **observability free**. Because the journal already records every step's inputs and outputs, the trace UI, eval comparisons, and `agnt5 inspect` are all readers of the same data structure. No separate logging path is needed to power them. The trade is that your workflow body must stay deterministic so replay reaches the same call sites — see [Determinism](/docs/concepts/determinism.md) for the constraint that buys this. ## Edge cases and gotchas - **Replay reads the journal first.** If the journal entry for a step is missing — the run was started fresh, the step is new code, or the journal was trimmed — the runtime executes the step for real. There is no "fail closed" mode for missing entries: missing means run-fresh. - **The journal grows unbounded per run.** A workflow with thousands of steps produces a long journal. Long-running workflows that loop should periodically checkpoint a summarized state and resume from it rather than relying on millions of journal entries. - **Non-deterministic workflow bodies break replay.** If `ctx.step(...)` calls happen in a different order on replay than on the original run, the runtime cannot match journal entries to call sites. The error surfaces as a replay-drift exception. Move the offending non-determinism inside a step so its result is journaled. - **Side effects can partially succeed.** The journal records the runtime's view of a step (what it sent in, what came back). It cannot tell you whether an HTTP POST committed at the destination before the network failed. Design side-effecting steps to be idempotent at the external boundary. - **Replay is not a debug feature, it is the recovery mechanism.** Every restart triggers replay. The cost of replay is paid on the happy path too — the runtime walks the journal even when nothing crashed. - **The journal outlives the worker.** Workers can come and go; the journal lives in the engine's storage. A new worker picking up a paused run reads the same journal the original worker was writing to. ## Related concepts - [Durable execution](/docs/concepts/durable-execution.md) — the guarantee event sourcing implements. - [Determinism — why workflows have rules](/docs/concepts/determinism.md) — the constraint that makes replay tractable. - [What the runtime owns vs. your code](/docs/concepts/runtime-vs-your-code.md) — where the journal sits in the responsibility split. **Journal layers**: per-run sequence of step input/output records; RocksDB (live, hot) and S3 (sealed segments + Parquet archives) **Replay**: deterministic re-execution of workflow body against the journal, skipping completed steps **Code primitive**: every `ctx.step(...)` call appends a journal entry; replay reads it back instead of re-running --- ## Functions _Source: https://agnt5.com/docs/concepts/functions.md_ > Registered, addressable units of work — the smallest primitive in AGNT5 and the building block steps and tools both reach for. > A **function** is a `@function`-decorated handler — a registered, addressable unit of work the runtime can call by name. It is AGNT5's smallest primitive. ```python import httpx from agnt5 import FunctionContext, function @function async def fetch_article(ctx: FunctionContext, url: str) -> str: """Fetch the body of a URL.""" async with httpx.AsyncClient() as client: response = await client.get(url) return response.text ``` The same handler is reachable from two call sites: ```python # Direct invocation: a client calls the function by name. result = await client.run("fetch_article", {"url": "https://example.com"}) # Workflow invocation: the same handler is checkpointed inside a step. @workflow async def research(ctx: WorkflowContext, url: str) -> str: body = await ctx.step(fetch_article, url) return summarize(body) ``` ## The mental model A function is **a Python `async def` you have decorated and registered**. Once decorated, the runtime knows the handler exists, knows its name, and can route invocations to it. The decorator does two jobs: it adds the handler to the global registry (`_FUNCTION_REGISTRY` in the SDK source), and it gives the handler the `FunctionContext` that the runtime needs to thread tracing, retries, and logging through. The same registered function plays two roles. **Standalone**, a client invokes it by name (`client.run("fetch_article", ...)`); the runtime spins up one execution, hands the handler a `FunctionContext`, and returns the result. **Inside a workflow**, the workflow calls `ctx.step(fetch_article, url)`; the workflow's runtime captures the input, runs the function, and writes the output to the journal. Same code, different host. Durability is **not in the decorator**. A function called standalone runs once: if its process crashes mid-execution, the call fails and there is no automatic resume. Durability comes from the `ctx.step` boundary in a workflow, which is what causes the input and output to be journaled and the call to be skipped on replay. The `@function` decorator gives you registration; the workflow's `ctx.step` gives you durability. ## Why it works this way Splitting registration from durability lets one handler serve every role AGNT5 needs from it. A handler can be called by a client, called by a workflow inside a step, used as a tool by an agent (when also decorated with `@tool`), or scheduled by cron — without changing its signature. The runtime treats the handler as a leaf node and the caller decides what guarantees wrap it. The split also keeps `@function` cheap. Not every callable in your application warrants the cost of journaling. A pure deterministic helper (parsing a string, computing a hash) gains nothing from being checkpointed. Decorating it with `@function` registers it for invocation but does not impose durability overhead unless a workflow opts in. ## Edge cases and gotchas - **A standalone function is not durable.** If you `client.run("fetch_article", ...)` and the worker crashes, you get an error and no automatic retry. Durable execution requires wrapping the call in a workflow's `ctx.step`. - **`FunctionContext` is not `WorkflowContext`.** The function context is stateless: it has `log()`, `sleep()` (non-durable), and tracing helpers, but no `step()`. To checkpoint inside business logic, write a workflow and call the function from it. - **`ctx.sleep()` inside a function is plain `asyncio.sleep`.** It will not survive a crash. Use a workflow if you need durable timers. - **Names must be unique in the registry.** Two `@function async def fetch_article` declarations in the same worker raise at registration time. Pass `@function(name="fetch_article_v2")` to disambiguate. - **A `@function` can also be a `@tool`.** Decorating a handler with both makes it externally callable (registry entry) and agent-callable (tool list). The decorators do different jobs and stack cleanly. - **Functions return whatever they return.** The runtime serializes the return value when the function is the target of a step or a remote call. Stick to JSON-serializable shapes (primitives, dicts, lists, dataclasses) — opaque Python objects round-trip poorly. ## Related concepts - [Workflows](/docs/concepts/workflows.md) — the durable orchestrator that wraps function calls in step boundaries. - [Tools](/docs/concepts/tools.md) — how a function becomes available to an agent. - [Durable execution](/docs/concepts/durable-execution.md) — what the step boundary buys a function call. - [Picking the right primitive](/docs/concepts/picking-the-right-primitive.md) — when to reach for a function versus a workflow or agent. **Code primitives**: `@function` decorator (Python), `function(...)` factory (TypeScript); same callable can wear `@tool` for agent use **Addressable by**: registered name; invoked via `ctx.step(...)` from workflows or attached as `agent.tools` **Boundary**: a function call from inside a workflow is journaled (durable); a function call from outside (e.g., script) is plain Python/TS --- ## Picking the right primitive _Source: https://agnt5.com/docs/concepts/picking-the-right-primitive.md_ > Functions, workflows, agents, and tools — what each one is for, and which one to reach for when. > AGNT5 has four primitives: **functions** (registered units of work), **workflows** (durable orchestrators that call functions), **agents** (LLM-driven loops), and **tools** (capabilities agents can invoke). Reach for the smallest one that does the job. ```python import httpx from agnt5 import Agent, Context, FunctionContext, WorkflowContext, function, tool, workflow @tool async def fetch_url(ctx: Context, url: str) -> str: # A tool: a capability the agent can call when it decides it needs to. async with httpx.AsyncClient() as client: response = await client.get(url) return response.text researcher = Agent( name="researcher", model="openai/gpt-4o-mini", instructions="Use fetch_url to read articles. Summarize in three sentences.", tools=[fetch_url], ) @function async def summarize(ctx: FunctionContext, url: str) -> str: # A function: a registered unit of work the workflow checkpoints. result = await researcher.run(f"Summarize {url}") return result.output @workflow async def research(ctx: WorkflowContext, url: str) -> str: # A workflow: the durable orchestrator. Calls functions through ctx.step. return await ctx.step(summarize, url) ``` The `research` workflow drives one step. The step runs the `summarize` function. The function runs the `researcher` agent. The agent calls the `fetch_url` tool when its plan requires reading a page. Four primitives, one chain of responsibility. ## The mental model The shortest path through the decision is a question: **what is the smallest primitive that does this job?** - **Plain Python.** No durability, no checkpointing, no agent loop — write a function and call it. AGNT5 does not need to know about it. - **A function (`@function`).** A unit of work the runtime can call by name, log, retry, and (when invoked through `ctx.step`) checkpoint. Reach for this when something needs to be addressable from outside the process or callable from a workflow. - **A workflow (`@workflow`).** A durable orchestrator that strings functions together and survives crashes between them. Reach for this when the multi-step process must be resumable — payment → fulfillment → notification, or research → summarize → publish. - **An agent (`Agent`).** An LLM-driven loop that picks actions based on a goal and produces output. Reach for this when you cannot enumerate the steps in advance — the model decides what to do. - **A tool (`@tool`).** A capability you make available to an agent. Reach for this when an agent needs to read or write something the LLM cannot do on its own (HTTP calls, database queries, calculations, other agents). The four primitives compose along one axis: durability boundaries get coarser as you go up. A workflow's step boundary is the unit of replay. Inside a step, a function runs. If that function runs an agent, the agent's loop fires inside the function. The tools the agent invokes fire inside the loop. The runtime's recovery model sees the step boundary only; everything below it runs fresh on retry. ## Why this split The split exists so each primitive does exactly one job. Workflows orchestrate but do not decide. Agents decide but do not orchestrate. Functions execute but do not loop. Tools provide capabilities but do not own state. When a primitive starts doing two jobs, the durability model breaks: a workflow that calls an LLM directly cannot be replayed without re-billing the prompt, and an agent that orchestrates other agents has no checkpoint between iterations. Stratifying the four primitives also gives you four places to insert observability. Every workflow run produces a trace. Every step records its input and output. Every agent iteration logs its plan and tool calls. Every tool call logs its arguments and return value. The trace UI walks this hierarchy directly. ## Edge cases and gotchas - **A workflow can call an agent directly.** `await ctx.step(some_function_that_runs_an_agent, ...)` is the canonical shape. The agent's non-determinism lives inside the step boundary, where it is journaled and replayed. - **An agent cannot call a workflow as a tool.** Workflows are top-level, addressable units; tools are local capabilities the agent invokes during its loop. Use a `@function` (which can itself trigger a sub-workflow) when you need that shape. - **`@function` and `@tool` are not the same decorator.** `@function` is a registered, externally callable unit; `@tool` marks a callable an agent is allowed to invoke. A handler can be both — register a `@function` that wraps a tool, and the same logic is reachable from clients and from agent loops. - **"Step" is a verb, not a primitive.** `ctx.step(handler, ...)` is the call site that creates a checkpoint inside a workflow. The unit being called is a function; the checkpoint is the step. - **Tools that mutate state must be idempotent.** An agent's plan may invoke the same tool multiple times in a single iteration. Tools touching external systems should rely on idempotency keys, conditional updates, or safe-by-design operations. - **`agent` is lowercase in prose.** The Python class is `Agent`; in body text the noun is `agent`, never "AI agent" or "Agent". ## Related concepts - [Functions](/docs/concepts/functions.md) — the registered, callable unit. - [Workflows](/docs/concepts/workflows.md) — the durable orchestrator. - [Agents](/docs/concepts/agents.md) — the LLM-driven loop. - [Tools](/docs/concepts/tools.md) — capabilities agents can invoke. - [Durable execution](/docs/concepts/durable-execution.md) — what the step boundary buys you. **The four primitives**: function (`@function`), tool (`@tool`), workflow (`@workflow`), agent (`Agent`) **Decision shorthand**: function for pure work; tool to expose to an agent; workflow for durable multi-step orchestration; agent for LLM-driven loops **Composition**: workflows call steps; steps wrap functions or agents; agents invoke tools; tools are decorated functions --- ## What the runtime owns vs. your code _Source: https://agnt5.com/docs/concepts/runtime-vs-your-code.md_ > The responsibility boundary — what AGNT5 takes care of for you, and what stays in your code. > The runtime owns **scheduling, journaling, retries, replay, and lease management**. Your code owns **business logic, side-effect implementation, step boundaries, and idempotency at external systems**. | Concern | AGNT5 runtime owns | Your code owns | |---|---|---| | Scheduling | Picking a worker, dispatching a run | Registering the workflow / function | | Journal | Recording every step's input and output | Choosing where step boundaries go (`ctx.step`) | | Retries on failure | Retrying steps per the configured policy | Marking which exceptions are retryable | | Replay on restart | Reading journal entries and skipping completed steps | Keeping the workflow body deterministic | | Lease management | Tracking which worker holds which run | Returning from steps inside the timeout | | Tracing | Capturing inputs/outputs/errors of every step | Adding domain context (tenant id, user id, span attrs) | | Idempotency at the runtime level | Replay returns recorded results, not duplicate calls | Idempotency at the **external** system (HTTP, DB) | | Worker lifecycle | Health checks, reconnection, graceful drain | The handler implementation inside the worker | Your job ends at the step boundary; AGNT5's begins. ## The mental model Picture two columns. The left column is the runtime — a single process (Gateway + Engine + Coordinator) that knows how to schedule work, write to a journal, and dispatch to a worker. The right column is your code — `@workflow`, `@function`, `@tool`, `Agent` instances. The two columns touch in exactly two places: the runtime calls into your code at a step boundary, and your code calls into the runtime when it invokes `ctx.step(...)`. Everything that depends on the **shape of the run** — what the workflow looks like, when steps fire, what side effects they have — is your code. Everything that depends on **the run surviving over time** — recovery, retry, observability, scaling — is the runtime. The split exists so you can write business logic without weaving infrastructure concerns into every function. The contract goes both ways. Because the runtime owns retries, your code does not need `try/except` around every transient network error in a step. Because your code owns step boundaries, the runtime cannot tell on its own which side effects are safe to retry — that is what `@function` and `ctx.step` exist to communicate. ## Why it works this way The split mirrors the split between Kubernetes and your container, or between Postgres and your SQL. Infrastructure that is reused across applications goes in the runtime; logic that varies per application stays in user code. This is the only split that scales — bundling retry logic into every workflow makes every workflow a fragile reimplementation of the same retry strategy; pushing retry into the runtime makes it consistent and audit-friendly. It also keeps the SDK surface small. The Python SDK has roughly five user-facing primitives (`@workflow`, `@function`, `@tool`, `Agent`, `ctx.step`). Everything else — the journal, the lease manager, the reconnect logic — is handled below the surface. The reader of your workflow code does not need to understand any of it to follow the business logic. ## Edge cases and gotchas - **Retries are runtime-driven.** Do not add `try/except` around transient errors at the workflow level — the runtime will retry the step. Your code raises; the runtime decides whether to replay. - **Database connection management lives in your step code.** The runtime does not pool connections for you. A step that opens a connection without closing it leaks resources. - **Observability is provided; enrichment is yours.** The runtime captures inputs, outputs, errors, and timings. Adding tenant ids, user ids, or domain-specific tags happens through your code calling into the trace context. - **The runtime does not enforce idempotency at the external system.** Journaling protects against duplicate *journaled outcomes*; it cannot stop a partially-completed HTTP POST from committing twice across attempts. Use idempotency keys, conditional updates, or safe-by-design operations. - **Lease timeouts are a runtime concern; staying inside them is yours.** A long-running step that overruns its lease loses ownership of the run. Either tune `step_timeout` upward or break the work into smaller steps that surface progress. - **Worker code crashes are recoverable; runtime crashes are too.** The runtime is durable across its own restarts (journal in storage, leases reissued). Worker code crashes are also recoverable — replay picks up where the journal left off. Both halves survive independently. ## Related concepts - [Durable execution](/docs/concepts/durable-execution.md) — the guarantee this split enables. - [Architecture overview](/docs/concepts/architecture-overview.md) — what the runtime side actually looks like. - [Event sourcing and replay](/docs/concepts/event-sourcing-and-replay.md) — the mechanism the runtime uses to fulfill its half. - [Determinism — why workflows have rules](/docs/concepts/determinism.md) — the constraint your half must respect. **Runtime owns**: scheduling, journaling, replay, retries, version pinning, sandboxing, trace storage **Your code owns**: workflow body, step bodies, tool bodies — staying inside the determinism contract **SDK surface**: `@workflow`, `@function`, `@tool`, `Agent`, `ctx.step(...)`, `ctx.signal(...)` --- ## Sandbox isolation tiers _Source: https://agnt5.com/docs/concepts/sandbox-isolation-tiers.md_ > How worker code is isolated — process, container, microVM — and the cost, latency, and security tradeoffs at each tier. > AGNT5 supports tiered execution sandboxes — **process**, **container**, **microVM** — with different cost, latency, and isolation tradeoffs. The runtime contract (durability, replay, journaling) is identical across tiers; what changes is the boundary between your worker code and other workers' code. | Tier | Boundary | Cold start | Cost | Use when | |---|---|---|---|---| | Process | Linux user / namespace | ms | lowest | Trusted code only; same tenant | | Container | OCI container per worker | hundreds of ms | medium | Default for managed deployments | | MicroVM | Firecracker / similar per run | seconds | highest | Untrusted code, agent-generated code, multi-tenant code execution | Pick the **weakest tier that meets your security needs.** The runtime guarantee does not change; only the blast radius of a compromise does. ## The mental model A sandbox tier is the **wall around your worker process**. At the process tier, the wall is OS-level isolation — separate user, separate namespace, but everything still runs on a shared kernel. At the container tier, the wall is an OCI container — separate filesystem, separate process namespace, still a shared kernel but with cgroup-enforced resource limits. At the microVM tier, the wall is a hypervisor — separate kernel, separate memory space, separate device access. Stronger walls cost more — cold start grows from milliseconds (process) to hundreds of milliseconds (container) to seconds (microVM). They also constrain integrations differently. A process-tier worker can share a filesystem mount with another worker; a microVM cannot. A container-tier worker can use host networking; a microVM has its own virtual NIC. Tier selection is **per-deployment**, not per-step. Every step inside a deployment runs in the deployment's tier. A workflow that calls a `@function` with sensitive logic and a `@function` that runs an untrusted agent must put both inside the same tier — typically the stronger of the two, since the runtime cannot mix tiers within a single deployment. ## Why it works this way Per-deployment tier selection keeps the routing model compact — the routing key `(tenant_id, deployment_id, component_id)` already names the deployment, and the deployment's tier is part of its manifest. Per-step tier selection would mean every dispatch decision involves a tier lookup, every worker manages multiple sandboxes, and the runtime carries a much larger configuration surface. The tiered model gives you a way to **opt into stronger isolation only where it matters**. A trusted internal workflow runs at process tier with millisecond cold starts. A user-facing service that runs agent-generated code runs at microVM tier and pays the cold-start cost in exchange for hard isolation. Both share the same SDK, same control plane, same observability. ## Edge cases and gotchas - **Tier selection is per-deployment, not per-step.** If one step inside a deployment needs microVM isolation, the entire deployment runs in microVM. Split deployments along tier boundaries when you want different isolation for different workloads. - **Cold-start cost is paid on first dispatch to a fresh sandbox.** Process tier amortizes near-zero. Container tier pays hundreds of milliseconds when scaling up; warm pools mitigate this. MicroVM tier pays seconds; pre-warming is essential for latency-sensitive paths. - **Some integrations only work in lower tiers.** Shared filesystem mounts (the host-mounted path), host networking, and access to specific host devices are typically process or container only. MicroVM workers have their own virtualized stack. - **The runtime contract is identical across tiers.** Durability, replay, journaling, retries — none of these change with tier. A workflow's correctness does not depend on its sandbox; only its blast radius does. - **Agent-generated code runs at the deployment's tier.** When an agent generates and executes code (for example, a Python REPL tool), that code inherits the surrounding sandbox. If you allow agent-generated code, default to microVM unless you have a specific reason not to. - **Multi-tenant code execution belongs at microVM tier.** Process and container tiers share a kernel; a kernel exploit reaches every workload on the host. MicroVM is the level that gives you per-run kernel isolation. - **Tier upgrades require redeployment.** You cannot promote a running deployment from container to microVM; create a new deployment with the stronger tier and shift traffic. ## Related concepts - [Versioning and deployment model](/docs/concepts/versioning-and-deployment.md) — tier is part of the deployment manifest. - [What the runtime owns vs. your code](/docs/concepts/runtime-vs-your-code.md) — the runtime provides the sandbox; your code runs inside it. - [Architecture overview](/docs/concepts/architecture-overview.md) — workers are the things being sandboxed. **Tiers**: `process` (lowest overhead, no isolation), `container` (Docker-level, default), `microVM` (kernel-level, highest cost) **Selection**: `tier:` field in the `agnt5.yaml` deployment manifest; per-deployment **Related CLI**: [agnt5 deploy](/cli/deploy.md) --- ## The improvement loop _Source: https://agnt5.com/docs/concepts/the-improvement-loop.md_ > The trace → eval → edit → deploy cycle — the loop that turns durable execution into a product, not only a runtime feature. > The **improvement loop** is the cycle every production agent system needs: every run produces a **trace**, traces feed **evals**, evals expose regressions, edits ship as new **deployments**. Every other concept in AGNT5 exists to make this loop fast. ``` ┌─────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐ │ run │ ────► │ trace │ ────► │ eval │ ────► │ edit │ └─────────┘ └────────┘ └─────────┘ └──────────┘ ▲ │ │ ▼ │ ┌──────────┐ ┌─────────────┐ │ └───────────│ run │ ◄──── │ deployment │ ◄─────┘ └──────────┘ └─────────────┘ ``` The trace is the **system of record**. Without traces, every other step in the loop is impossible — you cannot eval what you cannot inspect, and you cannot tell whether an edit improved or regressed behavior. ## The mental model Treat AGNT5 as a **loop accelerator**, not only a workflow runtime. The runtime captures every step's input and output to the journal. The trace UI reads from the journal. Eval frameworks read from the trace. Edits land as new deployments. New runs produce new traces, which feed the next round of evals. The faster you can complete one rotation, the faster your agent system improves. Each stage has a clear input and output: - **Run** produces a trace. Inputs are the run's arguments; outputs are every step's input/output, error, timing, and (for LLM steps) prompts/responses/token counts. - **Trace** is browsed, exported, or piped into an eval. Inputs are the trace IDs you select; outputs are the trace data structures with everything the runtime captured. - **Eval** scores traces against rubrics, references, or LLM judges. Inputs are a dataset of traces; outputs are scores and per-row diffs. - **Edit** changes a workflow, prompt, model, or tool. Inputs are eval signals; outputs are a new deployment. - **Deploy** ships the edit. Inputs are the new code; outputs are a new deployment artifact and (when the environment pointer advances) routing of new runs to it. The loop is **the product**. Durable execution is a means; the trace-as-system-of-record is the bridge that lets evals replay old runs against new code; deployments-as-immutable-versions are what make A/B comparison meaningful. ## Why it works this way Agent systems are not deterministic enough to ship-and-forget. The same prompt produces different outputs across model versions; the same workflow produces different tool calls across runs; the same eval rubric scores differently as the dataset drifts. The only sustainable strategy is to **measure continuously and edit deliberately** — and to do that, you need every run to be inspectable, every edit to be comparable, and every comparison to be auditable. AGNT5 picks one mechanism (event sourcing) that gives you all three at once. The journal makes runs inspectable (it's the trace). The journal makes edits comparable (replay an old run against new prompts; the inputs are still on disk). The journal makes comparisons auditable (you can show exactly which calls fired, in what order, with what arguments). A separate sidecar logging path could give you traces. A separate eval database could give you comparisons. A separate audit log could give you accountability. Picking one mechanism that gives you all three is the simplification — and it is what makes the loop fast. ## Edge cases and gotchas - **Replaying old traces against new prompts requires deterministic workflow code.** If the workflow body's call sequence depended on a clock or RNG, replay drifts and the comparison is meaningless. Determinism (see [Determinism](/docs/concepts/determinism.md)) is a precondition for the eval half of the loop. - **Eval datasets drift unless versioned.** The set of traces you eval against today may include traces you would not pick tomorrow. Snapshot the dataset (trace IDs + timestamps) before each eval run; otherwise comparing scores across time is comparing different denominators. - **Comparison across deployments needs stable trace IDs.** The runtime generates trace IDs that are stable per run; reusing the same ID across replays is what lets eval frameworks pair "before edit" and "after edit" results. - **The loop is per-component, not per-system.** A team improving one workflow's prompt should not be blocked on a system-wide eval pipeline. Treat each workflow's loop as independent and run them on their own cadences. - **Skipping traces breaks the loop.** It is tempting to log only "interesting" runs. Every run produces a trace anyway in AGNT5; the cost of saving them all is what makes the loop sustainable. Filtering happens at eval time, not capture time. ## Related concepts - [Durable execution](/docs/concepts/durable-execution.md) — the runtime mechanism that makes traces possible. - [Event sourcing and replay](/docs/concepts/event-sourcing-and-replay.md) — the journal that powers every stage of the loop. - [Versioning and deployment model](/docs/concepts/versioning-and-deployment.md) — how edits ship as new deployments. - [Determinism — why workflows have rules](/docs/concepts/determinism.md) — the constraint that lets you replay old traces. **Loop stages**: trace (journal write on every run) → eval (replay against a scorer) → edit (prompt/model change) → deploy (new immutable version) **Related CLI**: [agnt5 deploy](/cli/deploy.md) **Reuses**: evals replay frozen journals against new code; the same journal that makes execution durable powers regression testing --- ## Tools _Source: https://agnt5.com/docs/concepts/tools.md_ > Capabilities agents can invoke during their loop — `@tool`-decorated callables, with idempotency and serializable returns as the disciplines that matter. > A **tool** is a `@tool`-decorated callable an agent can invoke during its loop. Other agents passed to `tools=[...]` are also tools. ```python from agnt5 import Agent, Context, tool @tool async def get_weather(ctx: Context, city: str) -> str: """Get the current weather for a city. Args: city: Name of the city, e.g. "London" or "Tokyo". """ response = await weather_api.fetch(city) return f"Weather in {city}: {response.temp}F, {response.condition}" @tool async def calculate(ctx: Context, expression: str) -> str: """Evaluate a mathematical expression. Args: expression: A math expression, e.g. "2 + 2" or "15 * 23". """ result = eval(expression, {"__builtins__": {}}, {}) return f"{expression} = {result}" assistant = Agent( name="assistant", model="openai/gpt-4o-mini", instructions="Help users plan their day. Use get_weather for weather questions and calculate for math.", tools=[get_weather, calculate], max_iterations=5, ) ``` The `assistant`'s plan picks tools from its `tools=[...]` list. The runtime executes the tool, feeds the return value back to the model, and the loop continues. ## The mental model A tool extends an agent **beyond text generation**. The model on its own can produce language; it cannot read live data, perform calculations safely, or affect external systems. Tools fill those gaps. The agent's plan-act-observe loop relies on tools to be its "act" step: each iteration, the model decides whether to call a tool, the runtime executes the call, and the result becomes context for the next iteration. Two halves make a tool work. The **decorator half** registers the callable as agent-invokable: it expects `(ctx: Context, ...)` as its signature, runs as ordinary `async` Python, and returns a value the runtime serializes back to the model. The **docstring half** is the prompt the model sees: the description tells the model what the tool does, and the `Args:` block tells it how to fill in the arguments. A tool with a vague docstring will be called incorrectly or not at all — the LLM treats the docstring as the contract. Other agents can be tools. Passing an `Agent` instance to `tools=[...]` makes it invokable the same way a `@tool` callable would be. This is the **agents-as-tools** pattern: a coordinator agent delegates to specialist agents, picks which one to call based on the question, and synthesizes their outputs. The whole composition runs inside the step boundary that started the coordinator. ## Why it works this way Separating tools from agents lets one capability serve multiple agents. Three different agents — a coordinator, a researcher, an analyst — can all share the same `fetch_article` tool without re-implementing the HTTP call. The agent decides *when* to use a capability; the tool implements *what* the capability does. The decorator-as-registration pattern is also what allows tools to compose with the rest of the runtime. A `@tool` is registered the way a `@function` is registered — through the SDK's import-time decorator hooks — so the runtime knows the tool exists and can dispatch to it. A handler decorated with both `@function` and `@tool` is reachable from clients (registry entry) and from agent loops (tool list); the decorators do different jobs and stack cleanly. ## Edge cases and gotchas - **Tools that mutate state must be idempotent.** An agent's plan may invoke the same tool twice in one iteration. A `send_email` tool with no idempotency key will send the email twice. Use idempotency keys, conditional updates, or safe-by-design operations. - **Docstrings are prompts.** Write them clearly, describe what the tool does, document each argument with its expected shape. The LLM uses the docstring to decide whether and how to call the tool. - **Return types must serialize.** The runtime sends the tool's return value back to the model as text. Strings round-trip cleanly. Dicts and lists serialize as JSON. Opaque Python objects do not — keep returns to JSON-compatible shapes. - **`@tool` is not `@function`.** A handler can be both, but the decorators do different jobs. `@function` registers a callable for client invocation and `ctx.step` wrapping; `@tool` registers a callable for agent invocation. Decorate accordingly; if you need both, stack the decorators. - **Tools run inside the agent's host step.** When a workflow calls a function that runs an agent that calls a tool, every layer is inside the workflow's step boundary. The journal records one result for the step — the function's return value — not one per tool call. - **Heavy tools should be made idempotent at the boundary.** A tool that triggers a paid API or a long-running job will be re-invoked if the agent decides it needs to call it again. Push the idempotency check to the API itself, not to the tool wrapper. ## Related concepts - [Agents](/docs/concepts/agents.md) — the loop that invokes tools. - [Functions](/docs/concepts/functions.md) — the registered unit; can also be a tool when decorated with `@tool`. - [What the runtime owns vs. your code](/docs/concepts/runtime-vs-your-code.md) — where tool execution sits in the overall responsibility split. - [Picking the right primitive](/docs/concepts/picking-the-right-primitive.md) — when an agent capability should be a tool versus a step. **Code primitive**: `@tool` decorator (Python) / `tool(...)` factory (TypeScript); wraps a callable so an agent can invoke it **Disciplines**: idempotency (replay-safe), JSON-serializable arguments and returns **Relation to functions**: a tool is a function with `@tool` applied — same registration, broader access (callable from inside agent loops) --- ## Versioning and deployment model _Source: https://agnt5.com/docs/concepts/versioning-and-deployment.md_ > How AGNT5 handles workflow versioning — deployments are immutable, in-flight runs stay on their version, environments are pointers. > A **deployment** is an immutable artifact of your code. Workflows are versioned by deployment. **In-flight runs continue on the version they started with**; new runs use the latest deployment. **Environments** (staging, prod) are pointers that resolve to a deployment at run-start time. ``` deployment_v1 ◄── run_42 (in flight, started before v2) deployment_v2 ◄── run_43, run_44 (started after v2) ▲ │ env "prod" ──► deployment_v2 agnt5 deploy ──► creates deployment_v3 env "prod" advances to v3 run_42 still on v1 run_43, run_44 still on v2 new runs go to v3 ``` The routing key for every dispatchable unit is `(tenant_id, deployment_id, component_id)`. Pinning runs to deployment_id is what lets the same workflow code change without breaking inflight executions. ## The mental model Treat a deployment as a **frozen snapshot**: a tarball of your code, a content hash, a container image. Once published, it never changes. The control plane records the deployment's manifest — which workflows, functions, tools, and agents it registers — and the runtime keeps the deployment available as long as any run is still using it. An **environment** is a **named pointer** that maps to a deployment. `prod`, `staging`, `dev` are not deployments themselves; they are aliases. `agnt5 deploy --env prod` creates a new deployment and atomically moves the `prod` pointer to it. New runs that target environment `prod` get routed to the deployment the pointer currently names; runs that started under the previous deployment keep running on it. The runtime resolves the environment pointer **at ingress** — the moment the run is created. After that, the run's identity includes its `deployment_id`, and every step routes back to the same deployment for the entire lifetime of the run. A long-running workflow that started on `v2` continues calling `v2`'s `@function` handlers even after `v3` ships, because the routing key it carries is fixed. This is what makes rolling deploys safe: you can ship breaking changes to your workflow body, knowing in-flight runs are not subjected to the new code mid-flight. ## Why it works this way Workflows can run for hours, days, or weeks. A naive "always use the latest code" model would mean a workflow that started Monday could see Wednesday's code partway through, with no compatibility story between the two. Pinning runs to their deployment is the smallest mechanism that gives you a consistent code view per run. Environments-as-pointers separates **deploying code** from **routing traffic to it**. You can publish a deployment without pointing prod at it, then advance the pointer when you are ready. Rollback is the same operation in reverse — point the environment back at the previous deployment; in-flight runs on the buggy deployment finish on it (they would have anyway), and new runs go to the safe one. The `(tenant_id, deployment_id, component_id)` routing key is the foundation of multi-tenancy and version coexistence. The runtime never has to ask "which version of this function should I call" — the routing key already encodes it. ## Edge cases and gotchas - **Long-running workflows can outlive several deployments.** A workflow that runs for two weeks may span five deployments. Plan for this when refactoring: keep `@function` signatures backwards-compatible if any in-flight run still calls them. - **Removing a workflow does not orphan its runs.** A deployment that no longer registers a workflow can still serve in-flight runs from earlier deployments — the runtime keeps the older deployment alive while runs reference it. - **Renaming a function is a breaking change for in-flight runs.** Inside a workflow, `ctx.step(handler, ...)` resolves by registered name. If `v3` renames `fetch_article` to `fetch_url`, a `v2` run mid-flight that next calls `ctx.step(fetch_article, ...)` still routes to `v2`'s registry — fine. But if you delete `v2` deployment artifacts before `v2` runs drain, the routing fails. - **Signals and queries must be compatible across versions.** A signal sent to a run uses the run's deployment routing — but the signal's *payload schema* must match what the run's workflow expects. Treat signal/query schemas as a public API. - **Environment promotion is not run promotion.** Promoting `staging` to point at deployment `v5` does not move `staging`'s in-flight runs to `v5`. They stay on whatever deployment they started on. - **Cohorted upgrades require explicit gating.** AGNT5 does not automatically run two versions of a workflow side-by-side and pick the better one. If you want canary or A/B deployments, gate the routing yourself — for example, two environments pointing at two deployments, with traffic split at the caller. ## Related concepts - [Architecture overview](/docs/concepts/architecture-overview.md) — where deployments and environments sit in the runtime. - [What the runtime owns vs. your code](/docs/concepts/runtime-vs-your-code.md) — the runtime owns deployment artifacts and version pinning; your code owns staying compatible. - [Sandbox isolation tiers](/docs/concepts/sandbox-isolation-tiers.md) — tier selection is per-deployment. **Deployment model**: deployments are immutable (each push creates a new version); environments are pointers to specific versions **In-flight runs**: stay on the version they started under; new runs follow the latest pointer **Related CLI**: [agnt5 deploy](/cli/deploy.md), [agnt5 list](/cli/deployments.md) --- ## Workflows _Source: https://agnt5.com/docs/concepts/workflows.md_ > Durable orchestrators — async functions whose progress survives crashes through journaled step boundaries. > A **workflow** is a `@workflow`-decorated `async` function whose body orchestrates steps and whose progress survives crashes. Each `ctx.step(...)` call is the unit of replay. ```python from agnt5 import WorkflowContext, function, workflow @function async def validate_order(ctx, order_id: str, items: list) -> dict: return {"valid": len(items) > 0, "item_count": len(items)} @function async def charge_card(ctx, order_id: str) -> str: return await payments.charge(order_id) @function async def create_shipment(ctx, order_id: str, txn: str) -> str: return await shipping.create(order_id, txn) @workflow async def order_fulfillment(ctx: WorkflowContext, order_id: str, items: list) -> dict: validation = await ctx.step(validate_order, order_id, items) if not validation["valid"]: return {"order_id": order_id, "status": "rejected"} txn = await ctx.step(charge_card, order_id) tracking = await ctx.step(create_shipment, order_id, txn) return {"order_id": order_id, "status": "fulfilled", "txn": txn, "tracking": tracking} ``` If the worker crashes between `charge_card` and `create_shipment`, the next attempt skips `validate_order` and `charge_card` (their results are journaled) and runs `create_shipment` against the recorded `txn`. ## The mental model A workflow body looks like ordinary `async` Python: variable assignments, branches, loops, exception handlers. The runtime treats the body as a deterministic recipe and the journal as the cooked-pot history. On every replay, the runtime walks the recipe and asks one question at each `ctx.step(...)`: do I have a recorded result for this call in this run? If yes, replay returns the journaled value and continues. If no, the runtime executes the step, writes the input and output to the journal, then returns. The unit of durability is the **step**, not the line. Code between two `ctx.step(...)` calls — branches, variable assignments, calls to deterministic helpers — re-executes on every replay. Code inside a step is a side effect that runs at most once per run, modulo the [durable-execution gotcha](/docs/concepts/durable-execution.md#edge-cases-and-gotchas) about partial side effects. `WorkflowContext` is richer than `FunctionContext`. It carries the workflow's run identifier, session and user identifiers for memory scoping, an entity for state changes, and the step counter the runtime uses for journaling. The context is your handle on the durability machinery; the body is the recipe. ## Why it works this way Step boundaries are explicit so you can see where the durability bargain is being made. Implicit checkpointing — at every `await`, every line, every function call — produces unreadable code and unbounded journals. Boundary-only checkpointing makes the journal proportional to your business logic, not your control flow. The cost is a constraint on workflow code: the body must be deterministic. Replay must arrive at the same `ctx.step(...)` call sites in the same order, every time. AGNT5 trades this constraint for an automatic recovery model. Without it, the system would have no way to tell which journaled result belongs to which call site. ## Edge cases and gotchas - **The body must be deterministic.** Wall-clock reads, random numbers, network calls, and in-process caches in the workflow body are replay hazards. Move them inside a step. See [Determinism](/docs/concepts/determinism.md). - **Three forms of `ctx.step`.** `ctx.step(handler, *args)` calls a `@function` (the recommended form). `ctx.step("name", awaitable)` checkpoints arbitrary async work. `ctx.step("name", lambda: ...)` checkpoints a synchronous callable. Pick one form per workflow and stay with it. - **Long-running steps hold a lease.** A step that takes hours blocks the run from progressing past it. Surface progress through smaller steps instead of waiting indefinitely inside one call. - **Runs are not deduplicated by input.** Re-invoking the same workflow with the same input creates a new run with a new ID and a new journal. Dedupe at the caller if you need at-most-once semantics across submissions. - **`ctx.task(...)` still works.** Older code uses `ctx.task` for the same shape. New code uses `ctx.step` everywhere; both currently coexist. - **In-flight runs stay on their version.** When a new deployment ships, runs that started on the previous version keep running on it. New runs use the new version. See [Versioning and deployment model](/docs/concepts/versioning-and-deployment.md). ## Related concepts - [Functions](/docs/concepts/functions.md) — the units a workflow calls through `ctx.step`. - [Durable execution](/docs/concepts/durable-execution.md) — the runtime guarantee a workflow provides. - [Determinism — why workflows have rules](/docs/concepts/determinism.md) — the constraint replay imposes on the body. - [Picking the right primitive](/docs/concepts/picking-the-right-primitive.md) — when to reach for a workflow versus a plain function. **Code primitive**: `@workflow` decorator (Python) / `workflow(...)` factory (TypeScript) **Anatomy**: async function whose body is a sequence of `await ctx.step(...)` calls; arguments and return values are JSON-serializable **Related CLI**: [agnt5 deploy](/cli/deploy.md), [agnt5 logs](/cli/deployments.md) --- ## Workflows, steps, and agents _Source: https://agnt5.com/docs/concepts/workflows-steps-and-agents.md_ > The three primitives in AGNT5 — what they are, how they fit together, and which one you reach for when. > A **workflow** orchestrates work; a **step** is a checkpointed unit of that work; an **agent** is an LLM-driven loop that runs inside a step. ```python import httpx from agnt5 import Agent, FunctionContext, WorkflowContext, function, workflow researcher = Agent( name="researcher", model="openai/gpt-4o-mini", instructions="Summarize the article in three sentences.", ) @function async def fetch_article(ctx: FunctionContext, url: str) -> str: # Side effect lives in a step. The workflow body never makes the HTTP call. async with httpx.AsyncClient() as client: response = await client.get(url) return response.text @function async def summarize(ctx: FunctionContext, body: str) -> str: # The agent's non-determinism is contained inside this step. result = await researcher.run(body) return result.output @workflow async def research(ctx: WorkflowContext, url: str) -> str: article = await ctx.step(fetch_article, url) summary = await ctx.step(summarize, article) return summary ``` The `research` workflow is the orchestrator. `fetch_article` and `summarize` are steps. The `researcher` Agent is the agent. All three primitives appear in nine lines of orchestration code. ## The mental model A **workflow** is a function decorated with `@workflow` that drives a sequence of steps to produce a result. Its body looks like ordinary `async` Python — variables, branches, loops, exception handlers — but AGNT5 treats it as a recipe to be executed reliably across crashes. The workflow body must be deterministic: replay must arrive at the same call sites in the same order, every time. A **step** is the unit of work the workflow delegates. Steps are where side effects happen — HTTP calls, database writes, file I/O, LLM calls. Each call to `ctx.step(...)` checkpoints its input and output to the run's journal. On recovery, replay reads the checkpoint instead of re-running the side effect. You can pass a `@function`-decorated handler (the recommended form, shown above) or a name plus a callable when the step wraps arbitrary async work. An **agent** is an LLM-driven loop: given instructions, a model, and optional tools, it picks actions and refines its output until it satisfies the goal or hits an iteration limit. Because an agent's output depends on the model's stochastic sampling, it is non-deterministic by definition. The way AGNT5 reconciles that with deterministic workflows is to host the agent's call inside a step. The agent runs once, the step journals its result, and the workflow body sees a deterministic value on replay. ## Why it works this way Three primitives, one separation of concerns: **orchestrate, execute, decide**. The split exists so each piece can do exactly one job. The workflow stays deterministic and replay-safe; the step is the single chokepoint where non-determinism is allowed and recorded; the agent is free to be as stochastic as the model permits, because its output is captured the first time and replayed thereafter. You could imagine an alternative where workflows directly call LLMs without a step boundary. AGNT5 rejects that shape because there would be no way to recover a crashed run without re-billing every prompt — and re-running a tool-using agent against the same input does not in general produce the same tool calls. The step boundary is what makes the durability guarantee tractable. ## Edge cases and gotchas - **`ctx.step` versus `ctx.task`.** Older code in this repository uses `ctx.task(...)`. New code uses `ctx.step(...)`. Both still work; lead with `ctx.step` everywhere. - **An agent is not a peer of a workflow.** Agents always run inside a step boundary, even when invoked directly from a `@function`. There is no `ctx.agent(...)`; you call `Agent.run(...)` (or its async variant) from inside a `@function`, and the workflow reaches the agent via `ctx.step`. - **The word "step" is overloaded.** A *step* in a workflow (this page) is a checkpointed call. A *reasoning step* inside an agent loop is one iteration of the agent's plan-act-observe cycle. They are not the same thing — when ambiguity matters, say "workflow step" or "agent iteration". - **Agents calling agents are still inside steps.** When one agent uses another agent as a tool, or when one agent hands off to another, the whole chain runs inside the step that invoked the first agent. The journal records one step result, not a sub-tree. - **`agent` is lowercase in prose.** The Python class is `Agent`; in body text the noun is `agent`, never "AI agent" or "Agent". ## Related concepts - [Durable execution](/docs/concepts/durable-execution.md) — what the step boundary buys you. - [Determinism — why workflows have rules](/docs/concepts/determinism.md) — what the workflow body is and is not allowed to do. - [Event sourcing and replay](/docs/concepts/event-sourcing-and-replay.md) — how the journal turns a crashed run into a resumable one. **Primitives**: `@workflow` (orchestrator), `ctx.step("name", lambda: ...)` (boundary), `Agent` (LLM loop hosted inside a step) **Composition**: workflow body calls steps; steps wrap agents, function calls, or other side-effecting work; agents invoke tools **Determinism boundary**: workflow body deterministic; step bodies free to be non-deterministic # Get Started --- ## Build _Source: https://agnt5.com/docs/get-started/build.md_ > Build your first workflow locally — workflows, steps, agents, and tools. This is stage **1 of 5** of [the AGNT5 loop](/docs/get-started/loop.md). You'll build a workflow on your laptop with checkpointed steps, run it with `agnt5 dev`, and see the trace in Studio. The fastest path is the [Quickstart](/docs/get-started/quickstart.md) — it walks the build stage end to end on a Hacker News digest workflow. Use the Quickstart first, then come back here for the deeper material on each primitive. This page is being filled in. --- ## Build locally _Source: https://agnt5.com/docs/get-started/build-locally.md_ > Use agnt5 dev to iterate on a workflow with hot reload, inspect runs in Studio, and verify durability across worker restarts. You finished the [quickstart](/docs/get-started/quickstart.md). The `my-investigator` project on your laptop runs end-to-end. This page walks you through the local development loop you'll use to extend it: hot-reload edits, trace inspection in Studio, and a durability test that kills the worker mid-run. **Time:** about 10 minutes. **You'll learn:** - Edit the workflow and see changes apply on the next run - Read the trace for a paused run in Studio - Kill the worker mid-pause, restart, and watch the workflow resume **Prerequisites:** - Completed the [quickstart](/docs/get-started/quickstart.md). You have `my-investigator/` checked out and `agnt5 dev` in one terminal. ## Step 1: Edit the workflow Open `src/agnt5_quickstart/workflows.py` and tighten the prompt: ```python INVESTIGATOR_PROMPT = ( "You investigate technical and operational questions for an engineering team. " "Use the DeepWiki MCP tools to read documentation and ask questions about " "GitHub repositories — that's your primary evidence source. If web search is " "available, use it sparingly to corroborate community signal. " "Separate first-party evidence (docs, source code) from public commentary. " "Return a concise brief: answer, evidence, risks, recommendation, open questions. " "Cite specific file paths and commit ranges from the source repo when relevant." # NEW ) ``` Save the file. The terminal running `agnt5 dev` shows the worker reconnect: ``` File changed: src/agnt5_quickstart/workflows.py Reloading components... Registered components: investigate_with_review, save_report Worker connected ``` No restart needed. The next run picks up the new prompt. ## Step 2: Trigger another run and watch the trace In your second terminal: ```bash agnt5 run investigate_with_review --input '{ "question": "Should we adopt Polars to replace Pandas in our analytics pipeline?" }' ``` Open Studio (default `https://app.agnt5.com`; `agnt5 context show` if your context is custom). The new run shows up at the top of your project's runs list. Click into it. The trace shows: - The workflow input (`question`). - The MCP `connect` step against `mcp.deepwiki.com/mcp`. - Each model call inside the agent loop, with input messages and output. - Each tool call: DeepWiki `read_wiki_structure`, `ask_question`, plus any `web_search_preview` calls if you're on the provider-hosted path. Built-in tool calls are marked `built_in: true`. - The `wait_for_user` step, paused with the brief and the three options. The trace is a record of every checkpointed boundary. There is no "agent black box" — every model call and every tool call is its own step. ## Step 3: Verify durability The HITL pause is the hard one. AGNT5 promises the workflow is not held in process memory. Verify it: 1. With the run still paused at review, switch to the terminal running `agnt5 dev` and stop it: `Ctrl-C`. 2. Wait 10 seconds. Confirm the worker is gone (`ps aux | grep agnt5_quickstart` returns nothing). 3. Restart it: `agnt5 dev`. Watch it reconnect and re-register `investigate_with_review` and `save_report`. 4. Open Studio and approve the brief. The workflow resumes from the `wait_for_user` step. The agent does not re-call the model. The MCP server does not get re-queried. The `save_report` step runs and writes the file. ```bash cat .agnt5/reports/*.md ``` The report contains the brief that was drafted before you killed the worker, with whatever edits Studio captured. ## What that demonstrated You exercised three properties that distinguish AGNT5 from a plain agent loop: - **Hot reload.** Source edits register without a process bounce. The dev session is the development surface; you don't redeploy locally. - **Glass-box trace.** Every model call, tool call, and human-review pause is a discrete step in Studio. The trace is the artifact you'll come back to when something is wrong. - **Durable pauses.** A long-running pause (a human review, a webhook callback, a scheduled wait) is not a process. It's a checkpoint. Workers come and go; the workflow does not. These are the same properties that make the same workflow run unchanged in cloud. That's the next page. ## Next steps - **[Run in cloud](/docs/get-started/run-in-cloud.md)** — promote the same workflow to a managed environment with `agnt5 deploy`. - **[Workflows](/docs/concepts/workflows.md)** — the durable-execution model that makes the trace and the resume possible. - **[Agents](/docs/concepts/agents.md)** — the model→tool→model loop and how `Agent` composes with `@workflow`. --- ## Deploy to AGNT5 Cloud _Source: https://agnt5.com/docs/get-started/deploy.md_ > Promote the workflow you ran in your dev session to a managed environment and trigger it remotely. You will take the project from [Your first workflow](/docs/get-started/your-first-workflow.md) and run it on a managed environment. Same code, same workflow — only the session host changes from your laptop to AGNT5's managed runtime. **Time:** about five minutes. **You'll learn:** - Push your API key as a project-scoped secret - Build and ship the project to a managed environment - Trigger the deployed workflow and view its trace **Prerequisites:** - The `my-quickstart` project from [Your first workflow](/docs/get-started/your-first-workflow.md), authenticated with `agnt5 auth login`. ## Step 1: Push the OpenAI key as a secret Local `.env` files do not travel to the cloud. From inside the project directory: ```bash agnt5 secrets set --name OPENAI_API_KEY --type api_key ``` The CLI prompts for the value and stores it scoped to this project. Confirm: ```bash agnt5 secrets list ``` ## Step 2: Deploy ```bash agnt5 deploy ``` `agnt5 deploy` defaults to the `preview` environment. The command: 1. Builds your project image with a managed BuildKit instance. 2. Pushes the image to the registry. 3. Creates a deployment record. 4. Provisions workers and waits until they report ready. Output streams progress in real time. When it finishes you get a deployment ID. ``` ✓ Build complete ✓ Image pushed ✓ Deployment created: d8f3a2b1-1234-5678-9abc-def012345678 ✓ Workers ready (1/1) ``` ## Step 3: Verify the deployment ```bash agnt5 deploy status ``` The output names the environment, the replica count, and the worker health. Use `--watch` to refresh every two seconds. You can also list every deployment in the project: ```bash agnt5 deployments ``` ## Step 4: Trigger the workflow remotely ```bash agnt5 run research --env preview --input '{"url": "https://en.wikipedia.org/wiki/Durable_function"}' ``` Without `--env preview` the CLI routes to your active dev session; with it the request hits the deployed environment. ## Step 5: Inspect the deployed run ```bash agnt5 inspect runs ls --env preview agnt5 inspect trace -r ``` Same trace shape as your dev run, now sourced from the deployed environment. ## What you built The same workflow you ran in your dev session is now running under managed workers. Updates flow with another `agnt5 deploy`. Promote `preview` to `staging` or `production` by passing `--env`. What you did **not** write or configure: - Container build pipeline - Image registry credentials - Worker process supervision and health checks - Secret distribution ## Next steps - **[What you just built](/docs/get-started/what-you-just-built.md)** — the mental model behind durable workflows and the eval loop. - **[Where to go next](/docs/get-started/next.md)** — pick a direction based on what you want to build. --- ## Improve _Source: https://agnt5.com/docs/get-started/improve.md_ > Close the loop — add an eval, fix the failure, see the diff. This is stage **5 of 5** of [the AGNT5 loop](/docs/get-started/loop.md) — the part that makes the loop a loop. You already see runs in [Observe](/docs/get-started/observe.md); this stage turns observation into action. **The flow:** 1. Pick a bad run from Studio (a regression, a model that hallucinated, a tool that timed out). 2. Capture its input into a dataset. 3. Write an eval — a function that grades a run's output against expected behavior. 4. Make a change — prompt, model, retry policy, or code. 5. Replay the dataset against the new version. Read the diff in Studio. 6. Gate the deploy on the eval if you want it enforced in CI. This is how `gpt-5-mini → claude` swaps stop being scary and become measurable. Deeper material on datasets, eval functions, and CI gating is being filled in. --- ## Improve with evals _Source: https://agnt5.com/docs/get-started/improve-with-evals.md_ > Score your investigator agent against a custom scorer, change the prompt, and measure whether the score moved. You ran the [quickstart](/docs/get-started/quickstart.md) and saw a brief. Was it a good brief? "Looks fine" is the answer most teams give to this question. Evals turn that judgment into a measurement you can repeat. This page walks you through writing a scorer for the quickstart workflow, running it, changing the prompt, and comparing the score. The same pattern is what you'll use when you change the model, swap a tool, or add a step. **Time:** about 15 minutes. **You'll learn:** - Write a custom scorer with the `@scorer` decorator - Run it against the `investigate_with_review` workflow with `client.eval` - Change the prompt and compare the score before and after **Prerequisites:** - Completed the [quickstart](/docs/get-started/quickstart.md) and the [Build locally](/docs/get-started/build-locally.md) walkthrough. - `agnt5 dev` is running for the `my-investigator` project. ## Step 1: Decide what "good" means A brief is good if it has all four sections (`Answer`, `Evidence`, `Risks`, `Recommendation`) and at least one open question. That's a structural property — score it deterministically, no LLM judge needed for the first pass. Sketch the rule out loud: > A brief passes if every section header is present and the open-questions section has at least one bullet. Score is the fraction of sections found. That's a scorer. ## Step 2: Write the scorer Add a new file `src/agnt5_quickstart/scorers.py`: ```python import re from agnt5.eval import scorer from agnt5.eval.types import EvalContext, ScorerResultPy REQUIRED_SECTIONS = ("Answer:", "Evidence:", "Risks:", "Recommendation:", "Open questions:") @scorer(name="brief_structure") def brief_has_required_sections(ctx: EvalContext) -> ScorerResultPy: """Score whether the brief contains all required sections plus at least one open question.""" output = str(ctx.output or "") found = [s for s in REQUIRED_SECTIONS if s in output] open_qs_match = re.search( r"Open questions:\s*(?:\n+\s*-\s*\S.*)+", output, flags=re.MULTILINE, ) has_open_questions = open_qs_match is not None score = len(found) / len(REQUIRED_SECTIONS) if not has_open_questions and score == 1.0: score = 0.8 # all headers present but no actual open questions passed = score == 1.0 and has_open_questions missing = [s for s in REQUIRED_SECTIONS if s not in found] explanation = ( f"Found {len(found)}/{len(REQUIRED_SECTIONS)} sections. " f"Missing: {missing or 'none'}. " f"Open questions present: {has_open_questions}." ) return ScorerResultPy(score=score, passed=passed, explanation=explanation) ``` Importing the module registers the scorer with the SDK. You can confirm registration once at the start of an eval script: ```python import agnt5_quickstart.scorers # noqa: F401 — register the scorer from agnt5.eval import list_custom_scorers print(list_custom_scorers()) # ["brief_structure", ...] ``` ## Step 3: Run the eval Create `eval_brief.py` at the project root: ```python import asyncio from agnt5 import Client import agnt5_quickstart.scorers # noqa: F401 — register brief_structure async def main() -> None: client = Client() result = await client.eval( component="investigate_with_review", component_type="workflow", input_data={"question": "Should we migrate from Redis to Valkey?"}, scorers=["brief_structure"], ) for score in result.scores: print(f"{score.scorer}: score={score.score:.2f} passed={score.passed}") print(f" {score.explanation}") if __name__ == "__main__": asyncio.run(main()) ``` Run it: ```bash python eval_brief.py ``` `client.eval` runs the workflow end-to-end through `agnt5 dev`, captures the output, and applies the scorer. The workflow still pauses at the human review step — approve in Studio to let the eval finish. Expected output on a healthy run: ``` brief_structure: score=1.00 passed=True Found 5/5 sections. Missing: none. Open questions present: True. ``` ## Step 4: Change the prompt and re-run Edit `INVESTIGATOR_PROMPT` in `workflows.py` and remove the line that lists the required sections: ```python INVESTIGATOR_PROMPT = ( "You investigate technical and operational questions for an engineering team. " "Use the DeepWiki MCP tools to read documentation and ask questions about " "GitHub repositories — that's your primary evidence source. " # Removed: "Return a concise brief: answer, evidence, risks, recommendation, open questions." ) ``` Hot reload picks up the change. Run the eval again: ```bash python eval_brief.py ``` The score drops because the model no longer knows the required structure: ``` brief_structure: score=0.40 passed=False Found 2/5 sections. Missing: ['Risks:', 'Recommendation:', 'Open questions:']. Open questions present: False. ``` You have a measurement. The structural prompt instruction was load-bearing — removing it cost three sections. Restore the line. The score returns to 1.00. ## Step 5: Beyond structure — LLM-as-judge Structural scoring catches format regressions. Quality regressions need a model in the loop. Swap the scorer: ```python from agnt5.eval import LLMJudge result = await client.eval( component="investigate_with_review", component_type="workflow", input_data={"question": "Should we migrate from Redis to Valkey?"}, scorers=[ "brief_structure", LLMJudge( criteria=( "Does the brief separate first-party evidence (docs, source) " "from public commentary, and does the recommendation follow " "from the evidence?" ), ), ], ) ``` Run both scorers in the same eval. Treat the LLM judge's score as a noisy signal — useful in aggregate over many cases, less reliable on any single case. ## What you built You wrote a deterministic scorer, ran it against a real workflow, made a change that moved the score, and saw the score move. That loop — write a scorer, eval, change, eval again, compare — is how you guard a workflow against regressions when you change a prompt, model, or tool. What you did **not** write or configure: - A workflow runner — `client.eval` reuses your dev session - A scorer registry — the `@scorer` decorator handles registration - An LLM-judge prompt template — `LLMJudge` ships one, configurable ## Next steps - **[Workflows](/docs/concepts/workflows.md)** — the durable-execution model that makes runs reproducible enough to score. - **[Templates](/templates)** — start from a workflow close to what you want to build. --- ## Install the CLI _Source: https://agnt5.com/docs/get-started/install.md_ > Install the AGNT5 CLI, verify it, and authenticate with your account. The `agnt5` CLI scaffolds projects, runs `agnt5 dev` against AGNT5 from your laptop, and ships projects to managed environments. **Prerequisites:** - macOS (Apple Silicon or Intel), Linux (x86_64 or ARM64), or Windows via WSL2. Native Windows binaries are not yet shipped. - An AGNT5 account. Sign up at [app.agnt5.com](https://app.agnt5.com).
Set up with an AI coding assistant — paste this prompt into Claude Code, Cursor, Copilot, etc. The assistant will run the install end-to-end. Use this if you'd rather have an AI driver handle the steps below.
## Step 1: Install The installer writes `agnt5` to `~/.agnt5/bin` and appends that directory to your shell's `PATH`. Open a new terminal, or reload the current shell: ```bash source ~/.zshrc # zsh source ~/.bashrc # bash source ~/.config/fish/config.fish # fish ``` ## Step 2: Verify ```bash agnt5 version ``` Expected output (version numbers will vary): ``` agnt5-cli version v1.x.x - Go version: go1.25.x - Platform: darwin/arm64 ``` ## Step 3: Authenticate Sign in via OAuth in your browser: ```bash agnt5 auth login ``` The CLI opens your default browser, completes the PropelAuth flow, and writes the issued API key to `~/.agnt5/config.yaml`. Confirm: ```bash agnt5 auth status ``` The output names the authenticated user, the active environment, and the API base URL. For CI or non-interactive environments, pass an API key directly or set `AGNT5_API_KEY` in the environment: ```bash agnt5 auth login --api-key agnt5_sk_... # or export AGNT5_API_KEY=agnt5_sk_... ``` API keys are issued during the OAuth flow above, or generated in [app.agnt5.com](https://app.agnt5.com) account settings. See the [auth command reference](/cli/auth.md) for `agnt5 auth logout` and the full flag list. ## Troubleshooting **`command not found: agnt5`** — your shell hasn't picked up the new `PATH` entry. Confirm `~/.agnt5/bin` is on `PATH`: ```bash echo $PATH | tr ':' '\n' | grep agnt5 ``` If nothing prints, add this line to your shell config and reload: ```bash export PATH="$HOME/.agnt5/bin:$PATH" # bash, zsh # fish: fish_add_path "$HOME/.agnt5/bin" ``` **`agnt5 version` still fails after fixing `PATH`** — the binary did not download. Re-run the install command and check its output for errors before further `PATH` debugging. **Authentication errors after running `agnt5 dev` or `agnt5 deploy`** — re-authenticate with `agnt5 auth login`. If you switched accounts or contexts, run `agnt5 auth logout` first. ## Next steps - **[Quickstart](/docs/get-started/quickstart.md)** — build your first agent workflow with tools, MCP, and human review. - **[CLI Reference](/cli)** — every command and flag. --- ## The AGNT5 Loop _Source: https://agnt5.com/docs/get-started/loop.md_ > Build → Ship → Run → Observe → Improve. The production loop for reliable AI workflows. Building reliable AI workflows usually means stitching together an orchestrator, an observability stack, an eval pipeline, and custom glue code. AGNT5 brings those layers into one production loop, so the workflow you ship, the run you debug, and the eval you use to improve it all share the same execution context. The differentiator is not any single phase. Durable execution exists elsewhere. So do observability tools and eval platforms. What AGNT5 ships is the connection between them: traces are tied to real runs, evals can point back to the executions that produced them, and fixes can be compared against the failures they are meant to resolve. --- ## Where to go next _Source: https://agnt5.com/docs/get-started/next.md_ > Pick a direction based on what you want to build next. You ran an agentic workflow, deployed it, and have the mental model for the runtime. Pick a path based on what you want to build next. ## Build agents and workflows Orchestration patterns: retries, timeouts, signals, fan-out and fan-in. Model-driven loops with tools, memory, and structured output. Pause a workflow for approval and resume with the user's response. Coordinate several agents on a single run via tools or handoffs. ## Read the concepts What survives a crash, and how journaling makes it possible. The rules workflow code must follow so replay produces the same result. How traces, datasets, and evals work together. ## Run in production The `preview` → `staging` → `production` flow. Traces, metrics, logs, and what to watch on call. Per-run, per-component, per-model accounting. ## Improve what's running Turn a trace into a dataset and score new runs against it. Diff two runs and surface regressions before promotion. Version prompts and replay history against the new one. ## Reference Every command and flag. Current SDK reference. --- ## Observe _Source: https://agnt5.com/docs/get-started/observe.md_ > Inspect traces, retries, state, and failure recovery in Studio. This is stage **4 of 5** of [the AGNT5 loop](/docs/get-started/loop.md). Every run produces a durable trace — open Studio and walk it step by step. **What's in a trace:** - Each step's input, output, and duration - Model calls — prompt, response, token count, cost - Retries — how many attempts, the error each time, the final outcome - State at each checkpoint — what survives a worker restart - Failure points — exactly which step raised, where replay would resume **Where to look:** - [Studio → Runs](https://app.agnt5.com/runs) — every invocation, searchable by workflow, status, input - Click a run → drill into the trace tree → click any step for its full record When you find a regression, jump to [Improve](/docs/get-started/improve.md) to capture it into an eval. --- ## Quickstart: Run your first workflow in AGNT5 Cloud _Source: https://agnt5.com/docs/get-started/quickstart.md_ > Create a workflow locally, connect it to an AGNT5 Cloud dev environment, invoke it, and inspect the execution trace. In this quickstart you'll create a workflow locally, connect it to an AGNT5 Cloud dev environment, invoke it, and inspect the execution trace. The workflow itself summarizes the top Hacker News stories — the API is public so there's no Hacker News token to chase; you just bring your own OpenAI key. **You'll need:** Python 3.12+ Node.js 20+ (with pnpm, npm, or yarn) , an [OpenAI API key](https://platform.openai.com/api-keys), and ~3 minutes. We install the `agnt5` CLI below.
Set up with an AI coding assistant — paste this prompt into Claude Code, Cursor, Copilot, etc. **Python:** **TypeScript:**
## Install the CLI The installer writes `agnt5` to `~/.agnt5/bin` and adds it to your `PATH`. Open a new terminal (or `source` your shell's rc file) so `agnt5` resolves. ```bash agnt5 auth login ``` Opens a browser window to sign you into AGNT5. Once it returns, future `agnt5` commands run against your account. For verification, troubleshooting, and API-key auth, see the [full Install guide](/docs/get-started/install.md). ## Run it **Python:** ```bash agnt5 create --template python/quickstart my-agnt5-quickstart cd my-agnt5-quickstart ``` **TypeScript:** ```bash agnt5 create --template typescript/quickstart my-agnt5-quickstart cd my-agnt5-quickstart ``` `agnt5 create` downloads the template, registers the project with the Control Plane, and writes the scaffolded files into `my-agnt5-quickstart/`. ```bash agnt5 dev ``` `agnt5 dev` starts a local worker, registers your components with the runtime, and prints a Studio URL: **Python:** ``` Registered components: digest, fetch_top_ids, fetch_story, summarize, assemble_digest Worker connected Studio: https://app.agnt5.com/anon/ Watching project files ``` **TypeScript:** ``` Registered components: digest, fetchTopIds, fetchStory, summarize, assembleDigest Worker connected Studio: https://app.agnt5.com/anon/ Watching project files ``` Open the Studio URL from the terminal in your browser. The components your worker just registered show up live — the `digest` workflow is at the top of the list. In Studio: 1. Pick the `digest` workflow. 2. Set the input to `{"limit": 5}`. 3. Click **Run**. The trace renders live as each step lands. Click any step to inspect its input, output, and (for model calls) the prompt, response, and cost. ## Notes - Default model is `openai/gpt-5-mini`. Change it on the `model="..."` line in `functions.py` `functions.ts` . - Side effects go through `ctx.task(...)`. A bare `await fetch_story(id)` `await fetchStory(id)` would run every replay and break resume. - You can also invoke the workflow from the CLI instead of Studio: `agnt5 run digest --input '{"limit": 5}'`. ## What's next This gets you through the first part of the AGNT5 loop: **Build → Ship → Run → Observe**. For production-ready behavior — promoting to managed environments, invoking from your app, capturing failures into evals — work through [The Loop](/docs/get-started/loop.md). ## Next steps Build → Ship → Run → Observe → Improve. The production-ready version of what you just did. Workflows, steps, and agents — how durable execution actually works under the hood. HITL, deep research, customer support, document processing — production-shaped starting points. --- ## Run _Source: https://agnt5.com/docs/get-started/run.md_ > Invoke your deployed workflow — HTTP, SDK, schedule, or Studio. This is stage **3 of 5** of [the AGNT5 loop](/docs/get-started/loop.md). Once your workflow is deployed, trigger it from wherever the work originates. **From Studio** — pick the workflow, enter input, click **Run**. Same UX as `agnt5 dev`, against the managed environment. **From the CLI:** ```bash agnt5 run digest --input '{"limit": 5}' ``` **From your app — Python SDK:** ```python from agnt5 import Client client = Client(gateway_url="https://api.agnt5.com", api_key="agnt5_sk_...") result = client.run("digest", {"limit": 5}) ``` **Over HTTP:** ```bash curl -X POST https://api.agnt5.com/v1/workflows/digest/run \ -H "Authorization: Bearer agnt5_sk_..." \ -H "Content-Type: application/json" \ -d '{"limit": 5}' ``` Each invocation gets its own trace in Studio — head to [Observe](/docs/get-started/observe.md) next. --- ## Run in cloud _Source: https://agnt5.com/docs/get-started/run-in-cloud.md_ > Promote the workflow from your dev session to a managed AGNT5 environment and trigger it remotely. You will take the `my-investigator` project from the [quickstart](/docs/get-started/quickstart.md) and run it on a managed environment. Same code, same workflow — only the worker host changes from your laptop to AGNT5's managed runtime. **Time:** about 5 minutes. **You'll learn:** - Push your model provider key as a project-scoped secret - Build and ship the project to a managed environment - Trigger the deployed workflow and inspect its trace **Prerequisites:** - The `my-investigator` project from the [quickstart](/docs/get-started/quickstart.md), authenticated with `agnt5 auth login`. ## Step 1: Push the OpenAI key as a secret Local `.env` files do not travel to the cloud. From inside the project directory: ```bash agnt5 secrets set --name OPENAI_API_KEY --type api_key ``` The CLI prompts for the value and stores it scoped to this project. Confirm: ```bash agnt5 secrets list ``` If you also configured an alternative search provider (`AGNT5_BRAVE_SEARCH_API_KEY`, `AGNT5_TAVILY_API_KEY`, or `AGNT5_SEARXNG_URL`), push that secret the same way. The provider-hosted built-in path needs only `OPENAI_API_KEY`. ## Step 2: Deploy ```bash agnt5 deploy ``` `agnt5 deploy` defaults to the `preview` environment. The command: 1. Builds your project image with a managed BuildKit instance. 2. Pushes the image to the registry. 3. Creates a deployment record. 4. Provisions workers and waits until they report ready. Output streams progress in real time. When it finishes you get a deployment ID: ``` ✓ Build complete ✓ Image pushed ✓ Deployment created: d8f3a2b1-1234-5678-9abc-def012345678 ✓ Workers ready (1/1) ``` ## Step 3: Verify the deployment ```bash agnt5 deploy status ``` The output names the environment, the replica count, and the worker health. Pass `--watch` to refresh every two seconds. To list every deployment in the project: ```bash agnt5 deployments ``` ## Step 4: Trigger the workflow remotely ```bash agnt5 run investigate_with_review --env preview --input '{ "question": "Should we migrate from Redis to Valkey?" }' ``` Without `--env preview` the CLI routes to your active dev session. With it, the request hits the deployed environment. The workflow runs the same path as it did locally: 1. Connects to DeepWiki over Streamable HTTP. 2. Drafts a brief. 3. Pauses for human review. ## Step 5: Approve in Studio Open Studio (default `app.agnt5.com`; `agnt5 context show` for custom contexts). Switch the project's environment selector from `dev` to `preview`. The deployed run shows up at the top of the runs list, paused at the human review step. Approve, edit, or reject as you would in dev. After approval, the workflow saves the report on the managed worker. The file lands inside the worker container — see the deployment's storage configuration for how to retrieve it. ## Step 6: Inspect the deployed run from the CLI ```bash agnt5 inspect runs --env preview agnt5 inspect trace -r ``` The trace shape matches your dev run: the same MCP step, the same agent loop, the same `wait_for_user`, the same `save_report`. Workers changed; the workflow did not. ## What you built The same workflow you ran in your dev session is now running under managed workers. Updates flow with another `agnt5 deploy`. Promote `preview` to `staging` or `production` by passing `--env`. What you did **not** write or configure: - A container build pipeline - Image registry credentials - Worker process supervision and health checks - Secret distribution to workers ## Next steps - **[Improve with evals](/docs/get-started/improve-with-evals.md)** — capture deployed runs as eval data and measure changes before you redeploy. - **[Workflows](/docs/concepts/workflows.md)** — the durable-execution model that lets the same code run unchanged in dev and cloud. --- ## Ship _Source: https://agnt5.com/docs/get-started/ship.md_ > Ship your workflow to AGNT5 Cloud — agnt5 deploy, no Dockerfile, no registry. This is stage **2 of 5** of [the AGNT5 loop](/docs/get-started/loop.md). You'll take the workflow you ran in the [Quickstart](/docs/get-started/quickstart.md) and push it to a managed environment with `agnt5 deploy`. ```bash agnt5 deploy ``` `agnt5 deploy` builds and uploads your project — no Docker, no Dockerfile, no registry. View the deployment in [Studio → Deployments](https://app.agnt5.com/deployments). Deeper material on environments, rollback, and secrets is being filled in. --- ## What you just built _Source: https://agnt5.com/docs/get-started/what-you-just-built.md_ > The mental model behind the workflow you ran — durable steps, journaled state, and the trace as the system of record. You ran a workflow with two steps. Each step did something a normal Python function would do — an HTTP call and an LLM call — but neither one ran in the way a normal function does. Five concepts make the difference. They will show up on every other page in these docs, so read them once now and the rest fits together. ## Workflow A workflow is a function decorated with `@workflow` that orchestrates a sequence of steps. The body looks like ordinary async code: variables, branches, loops, exception handlers. AGNT5 treats the body as a recipe to be executed reliably, not a one-shot Python call. When you triggered `research`, AGNT5 created a **run** — a single execution of the workflow with a unique ID. Re-running the same workflow with the same input produces a new run with a new ID; runs are not deduplicated. ## Step The work inside `ctx.step(fetch_article, url=url)` is a **step** — a unit of work the workflow delegates. A step is the place where side effects happen: HTTP calls, file I/O, LLM calls, database writes. Workflow code itself stays deterministic so AGNT5 can replay it; the unpredictable parts live inside steps. The two steps you saw — `fetch_article` and `summarize` — are functions decorated with `@function`. The workflow calls them through `ctx.step`, which captures the input and the return value to the journal. ## Checkpoint When a step returns, AGNT5 writes the input and output to a journal. That record is a **checkpoint**. The next call to `ctx.step` does not run if a checkpoint already exists for that step in this run — it returns the recorded output instead. This is what makes workflows resumable. If your worker crashed after `fetch_article` succeeded but before `summarize` ran, the retry would skip the HTTP call (cached) and run `summarize` against the cached article body. No duplicate fetches, no double LLM bills, no manual recovery code. ## Trace A run produces a **trace**: the ordered list of every step, with inputs, outputs, errors, timings, and (for LLM steps) prompts, responses, and token counts. The trace is not a sidecar log. It is the system of record. `agnt5 inspect runs describe` and `agnt5 inspect trace -r` read directly from it. The eval loop reads from it too — any run can be replayed against new prompts or models because its inputs are still on disk. ## Worker A **worker** is the runtime process that hosts your registered components for AGNT5 to dispatch work to. `agnt5 dev` opens a session bound to a worker; `agnt5 deploy` runs workers on managed environments. The shape is identical — only the host of the session changes. Multiple workers can serve the same project. If one disconnects mid-step, another picks up the run from the last checkpoint. Nothing about the runtime — the coordinator, journal, trace store — depends on the worker's host. ## Putting it together The five concepts compose: ``` Run = a worker executing a workflow against an input, whose step outputs are checkpointed, and whose full history is the trace. ``` Every claim AGNT5 makes about durability, observability, and evaluation comes from this picture. Durability is the journal. Observability is the trace. Evaluation is replaying the trace with edits. ## Next steps - **[Where to go next](/docs/get-started/next.md)** — pick a direction based on what you want to build. - **[CLI Reference](/cli)** — every command and flag. --- ## Your first agentic workflow _Source: https://agnt5.com/docs/get-started/your-first-workflow.md_ > Build a support-triage agent that picks tools, calls them, drafts a reply — all under a durable workflow with a full trace. By the end of this page you will have run an AGNT5 agent end-to-end: an LLM-driven loop that categorizes a support ticket, fetches customer info, searches a knowledge base, and drafts a reply — wrapped in a durable workflow with a full trace. **Time:** about ten minutes. **You'll learn:** - Scaffold an agent project from a template - Read the three primitives: `tool`, `Agent`, `workflow` - Open a dev session connected to AGNT5 - Trigger an agentic workflow and inspect its trace **Prerequisites:** - The `agnt5` CLI installed and authenticated. See [Install](/docs/get-started/install.md). - Python 3.12 or newer. - An OpenAI API key. Get one at [platform.openai.com](https://platform.openai.com/api-keys). ## Step 1: Scaffold the project ```bash agnt5 create my-support-triage --template python/support-triage cd my-support-triage ``` The template lays down a runnable agentic project: ``` my-support-triage/ ├── agnt5.yaml # project config ├── app.py # entry point — registers components with AGNT5 ├── data/tickets.jsonl # sample tickets for testing ├── pyproject.toml └── src/support_triage/ ├── tools.py # the @tools the agent can call ├── agent.py # the Agent definition ├── workflows.py # template workflow (HITL pipeline — out of scope here) └── functions.py # underlying functions the workflow uses ``` The template's shipping workflow includes a human-in-the-loop approval step. That's a richer pattern than fits in a first tutorial — we'll cover it in the [HITL guide]. For now you'll add a smaller workflow that just runs the agent end-to-end. ## Step 2: Read the three primitives The whole agent fits in three short concepts. Open the files in order. **`tools.py`** — three tools the agent can decide to call: Subscription. Price difference is prorated.", }, ] CUSTOMER_DB = { "TCK-1001": {"name": "Alice Johnson", "email": "alice@example.com", "plan": "Pro"}, "TCK-1002": {"name": "Bob Smith", "email": "bob@example.com", "plan": "Enterprise"}, "default": {"name": "Unknown", "email": "unknown@example.com", "plan": "Free"}, } TOOL_FAILURE_RATE = float(os.getenv("AGNT5_TOOL_FAILURE_RATE", "0.3")) @tool(auto_schema=True) async def categorize_ticket_tool(ctx: Context, subject: str, body: str) -> str: """Categorize the ticket; returns category + suggested priority.""" combined = f"{subject} {body}".lower() if any(w in combined for w in ["refund", "cancel", "money", "charge"]): return "Category: Billing\\nPriority: High" if any(w in combined for w in ["password", "login", "access", "locked"]): return "Category: Account Access\\nPriority: High" if any(w in combined for w in ["bug", "error", "broken", "not working"]): return "Category: Technical Issue\\nPriority: High" if any(w in combined for w in ["upgrade", "plan", "features"]): return "Category: Sales\\nPriority: Medium" return "Category: General Inquiry\\nPriority: Medium" @tool(auto_schema=True) async def fetch_customer_info_tool(ctx: Context, ticket_id: str) -> str: """Look up the customer in the CRM. Simulates a flaky API.""" if random.random() < TOOL_FAILURE_RATE: raise ConnectionError("CRM API timeout — transient error, please retry.") customer = CUSTOMER_DB.get(ticket_id, CUSTOMER_DB["default"]) return ( f"Name: {customer['name']}\\n" f"Email: {customer['email']}\\n" f"Plan: {customer['plan']}" ) @tool(auto_schema=True) async def search_kb_tool(ctx: Context, query: str) -> str: """Search the knowledge base for relevant docs.""" q = query.lower() hits = [doc for doc in KNOWLEDGE_BASE if any(kw in q for kw in doc["keywords"])][:3] if not hits: return "No relevant documentation found." return "\\n\\n".join(f"**{d['title']}**\\n{d['content']}" for d in hits)`} /> A `@tool` is a typed function the model invokes by name. `auto_schema=True` derives the JSON schema the model sees from the function signature. `fetch_customer_info_tool` is intentionally flaky — it raises `ConnectionError` on roughly 30% of calls (controlled by `AGNT5_TOOL_FAILURE_RATE`). The agent doesn't handle that; AGNT5 retries the tool inside the agent loop and the agent sees only the eventual result. The template ships a fuller `tools.py` with more sample data and richer logging — open the file in your project to read it; the version above runs identically against the same agent. **`agent.py`** — the agent that decides when to call which tool: The `Agent` is an LLM-driven loop: it sends the conversation to the model, inspects the response, calls a tool if the model requested one, feeds the result back, and repeats until the model returns a final answer. **Your minimal workflow.** Create `src/support_triage/quick.py`: str: prompt = ( f"Triage this ticket.\\n\\n" f"ID: {ticket['ticket_id']}\\n" f"Subject: {ticket['subject']}\\n" f"Body: {ticket['body']}" ) result = await support_agent.run(prompt, context=ctx) return result.output`} /> Register it by importing the module from `app.py` (open `app.py` and add `from support_triage import quick # noqa: F401` near the other imports), or rely on auto-discovery if the template's `pyproject.toml` already lists `src/support_triage` as a source path. `run` runs the agent loop — model call, tool call if requested, model call again with the tool result, repeat — and returns the final answer. Each invocation of `quick_triage` is one self-contained run; conversation history is not retained between runs. Every model call, every tool call, and every loop iteration inside the run is a checkpoint. ## Step 3: Configure the API key ```bash echo "OPENAI_API_KEY=sk-..." > .env ``` `agnt5 dev` loads `.env` automatically. ## Step 4: Start a dev session ```bash agnt5 dev ``` `agnt5 dev` opens a development session connected to AGNT5. Your project's components are registered with the runtime, runs route through, and traces come back — no coordinator, journal, or broker to stand up on your machine. You should see: ``` INFO Registered components: quick_triage, SupportTriageAgent, categorize_ticket_tool, fetch_customer_info_tool, search_kb_tool, ... INFO Worker connected to coordinator INFO Watching ./src for changes ``` Leave this running. File edits hot-reload your registered components. ## Step 5: Triage a ticket In another terminal, send one of the sample tickets: ```bash agnt5 run quick_triage --input '{ "ticket": { "ticket_id": "TCK-1001", "subject": "Need a refund", "body": "I accidentally upgraded to the premium plan and would like my money back please." } }' ``` The agent decides to call `categorize_ticket_tool` (Billing, High priority), `fetch_customer_info_tool` (Alice Johnson, Pro plan), and `search_kb_tool` ("Refund Policy"), then drafts a reply: ``` Hi Alice, I understand you'd like a refund for the premium upgrade — I can help with that. Our refund policy allows full refunds within 30 days of purchase, and for subscription plans we issue a prorated refund for unused time. Since you're on the Pro plan, the upgrade portion is fully refundable. I'll start the process now. You'll see the refund on your original payment method within 5–7 business days. Best, The Support Team ``` Run it again. You may see the second run finish faster — and you may notice `fetch_customer_info_tool` reporting transient failures in the dev session log. The template wires `AGNT5_TOOL_FAILURE_RATE=0.3` into the CRM tool so it raises a `ConnectionError` on roughly 30% of calls. The agent loop retries the tool until it succeeds; the agent itself never sees the failure. ## Step 6: Inspect the trace ```bash agnt5 inspect runs ls ``` Pull the trace for the latest run: ```bash agnt5 inspect trace -r ``` The trace shows the full agent loop: - The prompt sent to the model. - The first tool call the model emitted (`categorize_ticket_tool`) with its arguments and return value. - The second tool call (`fetch_customer_info_tool`) — including any retries on the simulated failure. - The third tool call (`search_kb_tool`). - The final prompt to the model with all three tool results appended. - The model's drafted reply, with token counts and latencies for each turn. ## Critical rules A few things to know before you start writing your own agents: - **ALWAYS** use `agnt5 create --template` to scaffold. The project layout (`app.py` entry, `pyproject.toml` source paths, `agnt5.yaml` config) has implicit conventions the runtime relies on. - **ALWAYS** register components by importing their modules from `app.py` or by relying on auto-discovery from the source paths in `pyproject.toml`. A `@workflow` or `@tool` that isn't imported won't appear in the dev session. - **NEVER** use `@workflow(chat=True)`. It is not supported. Wrap the agent in a plain `@workflow` and let the Agent's own loop handle the model → tool → model cycle. - **NEVER** edit the agent's `result.output` shape from inside the workflow before returning it. The trace records the agent's raw output; downstream evals depend on it. ## What you built An agent that survives. If your dev session had disconnected mid-loop, the retry would have skipped the categorization (cached), skipped any successful customer-info call (cached), and resumed from the model with the tool results already in the conversation. No double LLM bill, no duplicate CRM lookups, no manual recovery code. What you did **not** write: - The agent loop (model → tool → model → answer) - Retry logic for the flaky CRM tool - Checkpointing of each model call and tool invocation - Trace collection across the loop - Token accounting per agent turn - A coordinator, journal, broker, or database to host any of the above ## Next steps - **[Deploy to AGNT5 Cloud](/docs/get-started/deploy.md)** — promote this same agent to a managed environment. - **[What you just built](/docs/get-started/what-you-just-built.md)** — the mental model behind agents, workflows, tools, and durability. # improve --- ## Improve _Source: https://agnt5.com/docs/improve.md_ > Capture feedback, build datasets, run evals, compare models, and gate CI. Close the loop with the runs you ship — collect feedback, turn it into datasets, run replay-based evals, compare model versions, and gate deploys in CI. This section is being built out. # run --- ## Run _Source: https://agnt5.com/docs/run.md_ > Crash resume, trace drill-down, search, cost and latency, dashboards, alerts. Operate deployed workflows in production — observe live runs, search history, drill into individual traces, watch cost and latency, and respond to alerts. This section is being built out. # ship --- ## Ship _Source: https://agnt5.com/docs/ship.md_ > Expose, configure, deploy, promote, and roll back production agents. Get your workflows in front of users — HTTP endpoints, schedules, secrets, environments, and rollback. This section is being built out. In the meantime, the [Deploy guide](/docs/get-started/deploy.md) covers the core flow. # Cookbooks --- ## Run any AI agent framework on a durable workflow runtime _Source: https://agnt5.com/cookbooks/bring-your-own-agent-framework.md_ > Wrap LangGraph, OpenAI Agents SDK, Vercel AI SDK, or plain Python in durable AGNT5 steps. AGNT5 does not require you to throw away an existing agent framework. This cookbook shows how to keep LangGraph, OpenAI Agents SDK, Vercel AI SDK, or a plain Python tool loop while adding durable execution, replay, and traces underneath. ## Scenario Your team already has an agent that calls tools and returns a structured result. It works locally, but production failures are hard to recover from. You want the agent call to run inside a durable workflow without rewriting the agent. ## What you build - A wrapper function around existing agent logic. - A workflow that checkpoints the framework result. - Trace metadata for model calls, tool calls, and outputs. - A recovery path after worker or API failures. - A migration path that keeps framework choice flexible. ## Wrap the existing agent Treat the existing agent as a step implementation. ```python @function async def run_existing_agent(request: AgentRequest) -> AgentResult: result = await existing_agent.invoke( input=request.prompt, metadata={"customer_id": request.customer_id}, ) return AgentResult.model_validate(result) ``` Then orchestrate it with AGNT5. ```python @workflow async def durable_agent_run(ctx: WorkflowContext, request: AgentRequest) -> AgentResult: prepared = await ctx.step(prepare_request, request) result = await ctx.step(run_existing_agent, prepared) return await ctx.step(record_agent_result_once, request.request_id, result) ``` The agent framework remains inside `run_existing_agent`. AGNT5 owns the durable step boundary around it. ## Trace integration At minimum, include: - framework name and version, - model name, - tool names, - final structured output, - external side-effect receipts. If the framework exposes callback hooks, map those events into AGNT5 trace metadata. If it does not, record the final input and output at the step boundary. ## Recovery model On replay, AGNT5 returns the journaled `AgentResult` instead of calling the framework again. On retry after a failed attempt, the wrapper runs again with the same input. Any side effects inside the framework still need idempotency keys. ## Production checks - The framework call only runs inside a step. - The step result is structured and serializable. - Tool side effects use stable idempotency keys. - The trace links framework events back to the AGNT5 run ID. - Replay does not call the model again for completed steps. ## Next steps - [Retry AI workflow steps without duplicate side effects](/cookbooks/retry-without-duplicate-side-effects.md) - [Build a deep research agent](/cookbooks/deep-research-agent.md) - [Debug AI workflows with traces, not scattered logs](/cookbooks/workflow-native-observability.md) --- ## Build a customer support agent _Source: https://agnt5.com/cookbooks/customer-support-agent.md_ > Search docs, draft replies, wait for review, and turn support outcomes into eval cases. This cookbook builds a support agent that handles real production concerns: retrieval, customer context, review before send, trace inspection, and evals from resolved tickets. ## Scenario A customer opens a ticket. The agent searches product docs, checks account state, drafts a response, waits for human approval, and records the final outcome for future evaluation. ## What you build - A support-triage workflow. - Tools for docs search and customer lookup. - A draft response step with structured output. - A human-review pause before sending. - A feedback path into an eval dataset. ## Workflow shape ```python @workflow async def support_agent_workflow(ctx: WorkflowContext, ticket_id: str) -> SupportOutcome: ticket = await ctx.step(load_ticket, ticket_id) account = await ctx.step(load_account_context, ticket.customer_id) docs = await ctx.step(search_support_docs, ticket.body) draft = await ctx.step(draft_support_reply, ticket, account, docs) decision = await ctx.wait_for_signal( "support_reply_review", timeout="3d", metadata={"ticket_id": ticket.id, "draft_id": draft.id}, ) if decision.status != "approved": return SupportOutcome(status="needs_changes", draft_id=draft.id) sent = await ctx.step(send_reply_once, ticket.id, draft.id) await ctx.step(record_support_eval_case, ticket.id, draft.id, sent.id) return SupportOutcome(status="sent", message_id=sent.id) ``` The agent is useful because every step is visible and recoverable. ## Trace review For each support run, reviewers should see: - ticket input, - retrieved docs, - account context used, - draft response, - approval decision, - final send receipt. This makes support QA concrete. A bad answer can be traced to a retrieval miss, an account-state error, or a prompt failure. ## Eval loop When a reviewer edits the draft, record the corrected response as an eval case. Later prompt and model changes can replay the same ticket and compare against the approved answer. ## Production checks - Tenant and deployment IDs are included on every direct HTTP call. - Retrieved docs are stored as trace evidence. - The send step uses an idempotency key. - Rejected drafts stop before the send step. - Reviewer edits can become eval cases. ## Next steps - [Build a durable human-approval AI workflow](/cookbooks/durable-human-approval-ai-workflow.md) - [Turn a failed production AI run into an eval](/cookbooks/production-run-to-eval.md) - [Build a RAG chatbot with memory](/cookbooks/rag-chatbot-memory.md) --- ## Build a data extraction workflow _Source: https://agnt5.com/cookbooks/data-extraction.md_ > Call tools, force JSON outputs, recover from malformed responses, and inspect every extraction step. This cookbook builds a structured extraction workflow for AI outputs that must be parsed, validated, retried, and explained. ## Scenario An analyst submits free-form notes. The workflow extracts accounts, contacts, dates, and next actions as JSON, validates the result, and stores the structured record. ## What you build - A structured-output prompt. - A schema validator. - A repair step for malformed JSON. - A retry policy for transient model failures. - A trace that shows raw and parsed outputs. ## Workflow shape ```python @workflow async def extract_account_update(ctx: WorkflowContext, note_id: str) -> ExtractionResult: note = await ctx.step(load_note, note_id) raw = await ctx.step(call_extraction_agent, note.text) parsed = await ctx.step(parse_and_validate_update, raw) receipt = await ctx.step(store_update_once, note.id, parsed) return ExtractionResult(update_id=receipt.id) ``` Separating model call and parse step makes malformed output easy to inspect. ## Schema-first extraction Define the expected output before writing the prompt. ```python class AccountUpdate(BaseModel): account_name: str contacts: list[str] next_action: str due_date: date | None confidence: float ``` The validator should reject missing required fields and values that do not match business rules. ## Malformed output recovery If parsing fails, run a bounded repair step and keep both versions in the trace. ```python @function async def parse_and_validate_update(raw: str) -> AccountUpdate: try: return AccountUpdate.model_validate_json(raw) except ValidationError: repaired = await repair_json(raw) return AccountUpdate.model_validate_json(repaired) ``` ## Production checks - Raw model output and parsed output are both trace-visible. - Repair attempts are bounded. - Invalid data fails before the storage step. - The storage step is idempotent. - Failed extractions can be converted into eval cases. ## Next steps - [Build a document processing pipeline](/cookbooks/document-processing.md) - [Debug and replay a failed AI workflow](/cookbooks/debug-production-run.md) - [Debug AI workflows with traces, not scattered logs](/cookbooks/workflow-native-observability.md) --- ## Debug and replay a failed AI workflow _Source: https://agnt5.com/cookbooks/debug-production-run.md_ > Build a support workflow that fails on malformed LLM output, inspect the trace, patch the step, and recover without repeating completed work. This cookbook builds one production-shaped failure from start to finish: a customer-support workflow calls an LLM, gets malformed structured output, fails before any external side effect happens, and then gets debugged from the trace. By the end, you should be able to answer the questions that matter during an AI workflow incident: - Which step failed? - What input, prompt, model output, and parsed state led to the failure? - Which steps are already checkpointed? - Is it safe to fix the code and let the workflow continue? - How do we turn this failure into a regression case later? ## What you build A support reply workflow with five steps: 1. Load the ticket. 2. Load the customer profile. 3. Classify the ticket. 4. Draft a structured reply with an agent. 5. Create an internal note after the draft validates. The failure is deliberately placed in step 4. The model returns JSON without a required `confidence` field, so validation fails before `create_internal_note` can run. That gives you a clean incident to debug: earlier reads are checkpointed, later side effects have not happened. ## Prerequisites - The AGNT5 CLI is installed and authenticated. - Python 3.12 or newer. - An OpenAI API key in your project environment. - A local AGNT5 dev session. Start from a support-style project: ```bash agnt5 create support-debug --template python/support-triage cd support-debug ``` Run the dev session in one terminal: ```bash agnt5 dev ``` ## Add the failing workflow Create a small workflow dedicated to this incident. The important design choice is the step boundary: each external read, model call, and side effect is a separate `ctx.step(...)`. ```python from typing import Literal from agnt5 import WorkflowContext, function, workflow from pydantic import BaseModel, Field, ValidationError class Ticket(BaseModel): ticket_id: str customer_id: str subject: str body: str class CustomerProfile(BaseModel): customer_id: str plan: str refund_eligible: bool class Classification(BaseModel): category: Literal["billing", "technical", "account"] priority: Literal["low", "normal", "high"] class DraftReply(BaseModel): body: str confidence: float = Field(ge=0, le=1) class InternalNote(BaseModel): note_id: str ticket_id: str @function async def load_ticket(ticket_id: str) -> Ticket: return Ticket( ticket_id=ticket_id, customer_id="cus_123", subject="Need a refund", body="I upgraded by mistake and would like my money back.", ) @function async def load_customer_profile(customer_id: str) -> CustomerProfile: return CustomerProfile( customer_id=customer_id, plan="pro", refund_eligible=True, ) @function async def classify_ticket(ticket: Ticket, profile: CustomerProfile) -> Classification: return Classification(category="billing", priority="high") @function async def draft_structured_reply( ticket: Ticket, profile: CustomerProfile, classification: Classification, ) -> DraftReply: # In a real project this is an Agent or model call. The malformed payload # simulates the incident: `confidence` is missing. model_output = """ { "body": "You're eligible for a refund. I can start that process now." } """ return DraftReply.model_validate_json(model_output) @function async def create_internal_note(ticket: Ticket, draft: DraftReply) -> InternalNote: # This is the side effect we do not want to run until the draft validates. return InternalNote(note_id=f"note_{ticket.ticket_id}", ticket_id=ticket.ticket_id) @workflow async def support_reply_debug(ctx: WorkflowContext, ticket_id: str) -> dict: ticket = await ctx.step(load_ticket, ticket_id) profile = await ctx.step(load_customer_profile, ticket.customer_id) classification = await ctx.step(classify_ticket, ticket, profile) draft = await ctx.step(draft_structured_reply, ticket, profile, classification) note = await ctx.step(create_internal_note, ticket, draft) return { "ticket_id": ticket.ticket_id, "note_id": note.note_id, "draft": draft.model_dump(), } ``` Import this module from `app.py` or your project package so the workflow is registered when the worker starts. ## Run the failure Trigger the workflow from another terminal: ```bash agnt5 run support_reply_debug --input '{"ticket_id":"TCK-1001"}' ``` The run should fail in `draft_structured_reply`. List recent runs: ```bash agnt5 inspect runs ls --status failed --limit 5 ``` Then inspect the failed run: ```bash agnt5 inspect runs describe agnt5 inspect trace -r --verbose ``` In the trace, confirm the incident shape: - `load_ticket` completed. - `load_customer_profile` completed. - `classify_ticket` completed. - `draft_structured_reply` failed with a validation error. - `create_internal_note` did not run. That last point is the recovery line. A user-visible side effect has not happened yet, so it is safe to patch the draft step and retry from the failed boundary. ## Patch the failed step Now make the draft step production-ready. Keep the raw model output visible, attempt one bounded repair, then validate again. ```python def repair_draft_payload(raw: str) -> str: # Keep this deliberately conservative. In production, make the repair # explicit and trace-visible rather than silently accepting bad data. if '"confidence"' not in raw: return raw.rstrip().rstrip("}") + ', "confidence": 0.62 }' return raw @function async def draft_structured_reply( ticket: Ticket, profile: CustomerProfile, classification: Classification, ) -> DraftReply: model_output = """ { "body": "You're eligible for a refund. I can start that process now." } """ try: return DraftReply.model_validate_json(model_output) except ValidationError: repaired = repair_draft_payload(model_output) return DraftReply.model_validate_json(repaired) ``` Restart the worker so the new function code is registered. ## Re-run and compare traces Run the same input again: ```bash agnt5 run support_reply_debug --input '{"ticket_id":"TCK-1001"}' ``` Inspect the new trace: ```bash agnt5 inspect runs ls --limit 5 agnt5 inspect trace -r --verbose ``` Compare it with the failed trace. The first three steps should have the same inputs. The draft step should now return a valid `DraftReply`, and the `create_internal_note` side effect should run once after validation succeeds. ## What replay proves AGNT5 replay is what makes the trace trustworthy: - Completed step results are journaled. - Workflow body code can be re-entered after a crash or restart. - Replay walks the same `ctx.step(...)` sequence. - Completed steps return their recorded outputs instead of calling external systems again. - The first step without a successful journal entry is where work resumes. In this incident, replay tells you the failed run had not crossed the side effect boundary. That is why the fix is safe. ## Turn the failure into a regression case After patching the incident, keep the bad model output as an eval case. The eval should fail if a future prompt, model, or parser change allows a draft without `confidence` to pass validation. At minimum, save: - workflow input, - raw model output, - validation error, - expected repaired output, - expected side-effect behavior. That eval case is the difference between "we fixed the incident" and "this incident stays fixed." ## Production checklist - Every external read, model call, and side effect is inside `ctx.step(...)`. - The trace shows step input, output, error, and retry attempts. - The failed step is before the first user-visible side effect. - The patch changes the failing step only. - The fixed trace proves the side effect runs once after validation. - The malformed output is added to an eval dataset. ## Next steps - [Retry AI workflow steps without duplicate side effects](/cookbooks/retry-without-duplicate-side-effects.md) - [Turn a failed production AI run into an eval](/cookbooks/production-run-to-eval.md) - [Debug AI workflows with traces, not scattered logs](/cookbooks/workflow-native-observability.md) --- ## Build a deep research agent _Source: https://agnt5.com/cookbooks/deep-research-agent.md_ > Run long research jobs with search tools, streamed progress, trace inspection, and completion notices. This cookbook builds a deep research agent for long-running jobs where progress, durability, and traceability matter more than a single chat response. ## Scenario A user asks for a competitive brief. The agent plans the work, searches the web, reads selected sources, extracts notes, synthesizes a report, and sends a completion notification. ## What you build - A planner step. - Search and fetch tools. - Parallel source reading. - Progress events for the UI. - A final report artifact. - A completion notification that is sent once. ## Workflow shape ```python @workflow async def deep_research(ctx: WorkflowContext, topic: str) -> ResearchResult: plan = await ctx.step(plan_research, topic) sources = await ctx.step(search_sources, plan.queries) notes = await ctx.step(read_sources, sources) report = await ctx.step(write_research_report, topic, notes) notification = await ctx.step(send_completion_once, report.artifact_id) return ResearchResult(report_id=report.artifact_id, notification_id=notification.id) ``` Each long call is isolated. If source reading fails halfway through, completed work can be replayed from the journal. ## Progress model Emit progress from step boundaries rather than from unstructured logs. ```json { "phase": "reading_sources", "completed": 8, "total": 12, "run_id": "run_01JRESEARCH" } ``` The UI can stream these updates while the trace remains the durable record. ## Production checks - Search and fetch outputs are traceable. - The report stores source citations or artifact IDs. - Worker restarts do not lose progress. - Completion notifications are idempotent. - Failed source reads can be retried without restarting the whole report. ## Next steps - [Build a durable research agent with approval and recovery](/cookbooks/durable-research-agent-approval-recovery.md) - [Run any AI agent framework on a durable workflow runtime](/cookbooks/bring-your-own-agent-framework.md) - [Debug AI workflows with traces, not scattered logs](/cookbooks/workflow-native-observability.md) --- ## Build a document processing pipeline _Source: https://agnt5.com/cookbooks/document-processing.md_ > Extract structured fields, validate them, pause for review, and retry failed document steps safely. Document workflows fail in predictable ways: bad scans, missing fields, malformed model output, and partial external writes. This cookbook builds a pipeline that makes each failure inspectable and recoverable. ## Scenario An operations team uploads invoices. The workflow extracts fields, validates the result, pauses for review when confidence is low, and stores approved data in a system of record. ## What you build - A document ingestion workflow. - OCR or text extraction. - Structured field extraction. - Validation and confidence checks. - Human review for exceptions. - An idempotent write to the destination system. ## Workflow shape ```python @workflow async def process_invoice(ctx: WorkflowContext, document_id: str) -> InvoiceOutcome: document = await ctx.step(load_document, document_id) text = await ctx.step(extract_text, document) invoice = await ctx.step(extract_invoice_fields, text) validation = await ctx.step(validate_invoice, invoice) if validation.needs_review: decision = await ctx.wait_for_signal( "invoice_review", timeout="10d", metadata={"document_id": document_id, "issues": validation.issues}, ) invoice = decision.corrected_invoice receipt = await ctx.step(store_invoice_once, document_id, invoice) return InvoiceOutcome(status="stored", receipt_id=receipt.id) ``` The review path is part of the workflow, not an out-of-band spreadsheet. ## Validation rules Use deterministic validation before asking another model to judge the output. - Required fields are present. - Totals add up. - Currency is supported. - Vendor is recognized. - Confidence passes the threshold. ## Production checks - Raw document, extracted text, structured output, and validation errors are in the trace. - Low-confidence extractions pause for review. - The store step uses a stable idempotency key. - Reprocessing a document does not duplicate destination records. - Corrected review output can become an eval case. ## Next steps - [Build a data extraction workflow](/cookbooks/data-extraction.md) - [Retry AI workflow steps without duplicate side effects](/cookbooks/retry-without-duplicate-side-effects.md) - [Turn a failed production AI run into an eval](/cookbooks/production-run-to-eval.md) --- ## Build a durable human-approval AI workflow _Source: https://agnt5.com/cookbooks/durable-human-approval-ai-workflow.md_ > Pause for approval, survive worker restarts, and execute the final side effect exactly once. Human approval is the clearest demo of durable execution. The workflow starts, does useful AI work, waits for a person, survives for hours or days, and resumes when an approval signal arrives. ## Scenario A support agent drafts a refund response and prepares a refund request. The business rule is simple: the AI can draft and recommend, but a human must approve before money moves. ## What you build - A workflow that drafts an action with an agent. - A durable approval pause. - A signal that records approve, reject, or request-changes. - A final side effect that executes once. - A trace that shows the full decision path. ## Workflow shape The workflow separates recommendation from execution. ```python @workflow async def refund_review(ctx: WorkflowContext, ticket_id: str) -> RefundOutcome: ticket = await ctx.step(load_ticket, ticket_id) customer = await ctx.step(load_customer, ticket.customer_id) recommendation = await ctx.step(draft_refund_recommendation, ticket, customer) decision = await ctx.wait_for_signal( "refund_decision", timeout="7d", metadata={"ticket_id": ticket.id, "amount": recommendation.amount}, ) if decision.status != "approved": return RefundOutcome(status="not_approved", reason=decision.reason) receipt = await ctx.step(issue_refund_once, ticket.id, recommendation.amount) return RefundOutcome(status="refunded", receipt_id=receipt.id) ``` The pause is workflow state, not process memory. The worker can restart while the workflow is waiting. ## Approval payload Keep the approval signal explicit. Do not pass free-form text as the only decision record. ```json { "status": "approved", "reviewer_id": "user_123", "reason": "Customer is inside the refund window.", "approved_amount": 4900 } ``` The trace should preserve the recommendation, the reviewer, the decision, and the final side effect receipt. ## Side-effect guard The final step should be idempotent. Use a key derived from the workflow run and the business object. ```python @function async def issue_refund_once(ticket_id: str, amount: int) -> RefundReceipt: idempotency_key = f"refund:{ticket_id}:{amount}" return await stripe.refunds.create( payment_intent=lookup_payment(ticket_id), amount=amount, idempotency_key=idempotency_key, ) ``` ## Production checks - Restart the worker while the workflow is waiting. - Send the approval after the restart. - Confirm the workflow resumes from the waiting point. - Confirm duplicate approval signals do not create duplicate refunds. - Confirm rejected decisions stop before the side-effect step. ## Next steps - [Retry AI workflow steps without duplicate side effects](/cookbooks/retry-without-duplicate-side-effects.md) - [Build a customer support agent](/cookbooks/customer-support-agent.md) - [Build a durable research agent with approval and recovery](/cookbooks/durable-research-agent-approval-recovery.md) --- ## Build a durable research agent with approval and recovery _Source: https://agnt5.com/cookbooks/durable-research-agent-approval-recovery.md_ > Checkpoint search, extraction, artifacts, and final human approval across a long-running report workflow. Research agents are useful when they survive real work: slow searches, document downloads, extraction failures, intermediate artifacts, and human approval before the final report is sent. ## Scenario A research agent investigates a vendor, gathers sources, extracts notes, drafts a report, waits for approval, and then publishes the report to a workspace. ## What you build - A multi-step research workflow. - Checkpoints after search, fetch, extraction, synthesis, and approval. - Artifact records for downloaded files and notes. - A recovery path after a failed source fetch. - Human approval before final publication. ## Workflow shape The workflow is long-running, but each unit of work is small. ```python @workflow async def vendor_research(ctx: WorkflowContext, vendor: str) -> ResearchReport: plan = await ctx.step(plan_research, vendor) sources = await ctx.step(search_sources, plan) documents = await ctx.step(fetch_documents, sources) notes = await ctx.step(extract_notes, documents) draft = await ctx.step(write_report, vendor, notes) decision = await ctx.wait_for_signal( "report_approval", timeout="5d", metadata={"vendor": vendor, "draft_artifact_id": draft.artifact_id}, ) if decision.status != "approved": return ResearchReport(status="needs_changes", draft_id=draft.artifact_id) published = await ctx.step(publish_report_once, draft.artifact_id) return ResearchReport(status="published", url=published.url) ``` If the worker stops after fetching documents, replay resumes from the journaled documents and continues at extraction. ## Artifact checkpoints Store artifact references in the journal instead of large blobs. ```python class ResearchArtifact(BaseModel): artifact_id: str kind: Literal["source", "notes", "draft", "report"] uri: str checksum: str ``` The trace should let a reviewer open the source list, extracted notes, and draft without rerunning the agent. ## Recovery drill Before shipping, force one source download to fail. ```bash agnt5 runs replay --run-id run_01JRESEARCH --local agnt5 runs resume run_01JRESEARCH ``` The recovered run should not repeat successful downloads, and the final report should include a trace back to the notes and sources used. ## Production checks - Every long external call is inside a step. - Artifacts have stable IDs and checksums. - A worker restart during approval does not lose the draft. - The publish step is idempotent. - Reviewers can inspect sources before approving. ## Next steps - [Build a deep research agent](/cookbooks/deep-research-agent.md) - [Build a durable human-approval AI workflow](/cookbooks/durable-human-approval-ai-workflow.md) - [Debug and replay a failed AI workflow](/cookbooks/debug-production-run.md) --- ## Build a model comparison workflow _Source: https://agnt5.com/cookbooks/model-comparison.md_ > Run the same case through multiple models, score outputs, and promote the release candidate. Model changes are production changes. This cookbook builds a workflow for comparing model candidates against the same inputs, scoring outputs, and promoting a winner only when it clears the eval gate. ## Scenario You want to move a classification workflow to a cheaper or stronger model. The team needs evidence that quality does not regress on real production cases. ## What you build - A candidate list of models. - A replayable eval dataset. - A comparison workflow that runs each case through each model. - Deterministic and judge-based scorers. - A release gate for promotion. ## Workflow shape ```python @workflow async def compare_models(ctx: WorkflowContext, case_id: str, models: list[str]) -> ModelComparison: case = await ctx.step(load_eval_case, case_id) outputs = [] for model in models: output = await ctx.step(run_case_with_model, case, model) score = await ctx.step(score_model_output, case.expected, output) outputs.append(ModelOutput(model=model, output=output, score=score)) return ModelComparison(case_id=case_id, outputs=outputs) ``` For larger datasets, fan out by case and aggregate scores in a separate step. ## Scoring strategy Use deterministic scorers when the expected output is structured: - exact class match, - required fields present, - forbidden terms absent, - citation coverage. Use an LLM judge for subjective dimensions, but keep the judge prompt versioned and trace-visible. ## Promotion checks - Candidate model beats or matches baseline on critical cases. - Cost and latency stay inside thresholds. - Failures link to traces for inspection. - Known production failures are included in the dataset. - CI blocks the release when score drops below the threshold. ## Next steps - [Turn a failed production AI run into an eval](/cookbooks/production-run-to-eval.md) - [Build a data extraction workflow](/cookbooks/data-extraction.md) - [Debug and replay a failed AI workflow](/cookbooks/debug-production-run.md) --- ## Turn a failed production AI run into an eval _Source: https://agnt5.com/cookbooks/production-run-to-eval.md_ > Capture a bad production run, convert it into an eval case, and compare fixed prompts before release. The most useful eval cases often start as production failures. This cookbook shows how to capture a bad run, preserve its prompt, tools, state, and output, then replay it against a fixed prompt or model before promoting the change. ## Scenario A workflow classifies enterprise support tickets. A customer reports that a security-sensitive ticket was routed to the wrong queue. The run exists in production with the original input, tool results, and model output. ## What you build - A production failure review flow. - An eval case derived from the failed run. - A scorer that captures the expected behavior. - A replay comparison between current and candidate workflow versions. - A promotion gate based on the fixed case. ## Capture the run Start from the production run, not from a handwritten reproduction. ```bash agnt5 runs describe run_01JSECURITY agnt5 eval cases create --from-run run_01JSECURITY --dataset support-routing-regressions ``` The generated case should include: - workflow input, - relevant tool results, - the model output, - the expected routing outcome, - metadata linking back to the production run. ## Write the scorer Use a deterministic scorer for routing when possible. ```python @scorer(name="routes_security_ticket") def routes_security_ticket(ctx: EvalContext) -> ScorerResultPy: output = SupportRoute.model_validate(ctx.output) passed = output.queue == "security" and output.severity in {"high", "critical"} return ScorerResultPy(score=1.0 if passed else 0.0, passed=passed) ``` The scorer turns the production failure into a guardrail that runs on every future prompt, model, or tool change. ## Replay the candidate Change the routing prompt, model, or tool policy in a candidate workflow version. Replay the captured case before promoting. ```bash agnt5 eval run support-routing-regressions --workflow-version candidate agnt5 eval compare --baseline production --candidate candidate ``` The comparison should show the failed case passing without regressing the rest of the dataset. ## Production checks - The eval case links back to the original run. - The case contains enough state to reproduce the failure offline. - The scorer fails on the production version. - The scorer passes on the candidate version. - CI or a release checklist blocks promotion if this case regresses. ## Next steps - [Build a model comparison workflow](/cookbooks/model-comparison.md) - [Debug and replay a failed AI workflow](/cookbooks/debug-production-run.md) - [Build a customer support agent](/cookbooks/customer-support-agent.md) --- ## Build a RAG chatbot with memory _Source: https://agnt5.com/cookbooks/rag-chatbot-memory.md_ > Retrieve knowledge, preserve user context, isolate tenants, and trace each answer back to evidence. This cookbook builds a RAG chatbot that behaves like a production workflow: tenant-aware retrieval, durable memory updates, traceable evidence, and recoverable answer generation. ## Scenario A SaaS user asks a product question. The chatbot retrieves relevant docs, combines them with user memory, generates an answer, and records useful context for the next turn. ## What you build - Tenant-scoped retrieval. - Session memory lookup. - Evidence-grounded answer generation. - A memory update step. - Trace evidence for every answer. ## Workflow shape ```python @workflow async def answer_chat_turn(ctx: WorkflowContext, request: ChatRequest) -> ChatAnswer: memory = await ctx.step(load_session_memory, request.session_id) passages = await ctx.step(retrieve_docs, request.tenant_id, request.message) answer = await ctx.step(generate_grounded_answer, request.message, memory, passages) await ctx.step(update_memory_once, request.session_id, request.message, answer) return answer ``` The retrieval step must receive the tenant ID. Do not rely on global vector indexes without tenant filters. ## Evidence model Return citations as structured data. ```python class ChatAnswer(BaseModel): answer: str citations: list[DocumentCitation] memory_updates: list[str] ``` This lets the UI show citations and lets the trace explain the answer. ## Production checks - Direct HTTP calls include `X-TENANT-ID` and `X-DEPLOYMENT-ID`. - Retrieval filters by tenant. - The answer stores citations. - Memory updates are idempotent per turn. - A bad answer can be replayed with the same retrieved passages. ## Next steps - [Build a customer support agent](/cookbooks/customer-support-agent.md) - [Build a data extraction workflow](/cookbooks/data-extraction.md) - [Turn a failed production AI run into an eval](/cookbooks/production-run-to-eval.md) --- ## Retry AI workflow steps without duplicate side effects _Source: https://agnt5.com/cookbooks/retry-without-duplicate-side-effects.md_ > Use idempotency keys and journaled receipts so retries do not duplicate emails, tickets, or payments. Retries are necessary in production AI workflows. They are also dangerous when a step talks to Stripe, sends email, creates a ticket, or fires a webhook. This cookbook shows the pattern for retrying safely after a side effect may already have happened. ## Scenario An AI workflow classifies a support request and creates a ticket in a CRM. The CRM request succeeds, but the network connection drops before your worker sees the response. The runtime retries the step. Without an idempotency pattern, the customer gets two tickets. With the pattern, the retry returns the original receipt. ## What you build - A workflow with retryable external side effects. - Stable idempotency keys for each side-effect step. - A receipt that is stored in the AGNT5 journal. - Retry behavior that returns the original external object. - Trace checks that prove only one side effect happened. ## Workflow shape Keep side effects small and named. ```python @workflow async def triage_and_create_ticket(ctx: WorkflowContext, inbound: InboundRequest) -> TicketResult: classification = await ctx.step(classify_request, inbound) ticket = await ctx.step(create_crm_ticket_once, inbound.request_id, classification) email = await ctx.step(send_ack_email_once, inbound.request_id, ticket.id) return TicketResult(ticket_id=ticket.id, email_id=email.id) ``` `create_crm_ticket_once` and `send_ack_email_once` are the only steps that touch external systems. ## Idempotency key Base the key on the business object, not on the retry attempt. ```python def crm_idempotency_key(request_id: str) -> str: return f"crm-ticket:{request_id}" @function async def create_crm_ticket_once( request_id: str, classification: Classification, ) -> CrmTicket: return await crm.create_ticket( subject=classification.subject, priority=classification.priority, idempotency_key=crm_idempotency_key(request_id), ) ``` If the CRM supports idempotency keys, use its native support. If it does not, store a receipt in your own database keyed by the same value before returning. ## Journaled receipt The step should return the external receipt, not just `true`. ```python class CrmTicket(BaseModel): id: str idempotency_key: str created_at: datetime ``` On replay, AGNT5 reads this receipt from the journal. The workflow can continue without creating the ticket again. ## Production checks - Inject a timeout after the CRM creates the ticket. - Confirm the retry uses the same idempotency key. - Confirm only one CRM ticket exists. - Confirm the AGNT5 trace shows the failed attempt and the successful retry. - Confirm replay returns the journaled ticket receipt. ## Next steps - [Build a webhook triage agent](/cookbooks/webhook-triage-agent.md) - [Build a durable human-approval AI workflow](/cookbooks/durable-human-approval-ai-workflow.md) - [Debug and replay a failed AI workflow](/cookbooks/debug-production-run.md) --- ## Build a webhook triage agent _Source: https://agnt5.com/cookbooks/webhook-triage-agent.md_ > Receive events, deduplicate delivery attempts, run async triage, and expose a trace per webhook. Webhook delivery is noisy: providers retry, events arrive late, and downstream systems fail. This cookbook builds a triage agent that deduplicates events and runs async work under a trace. ## Scenario A product receives incident webhooks. The workflow deduplicates events, asks an agent to classify urgency, opens a ticket when needed, and records the result. ## What you build - A webhook entry point. - Event deduplication. - Async triage with an agent. - Idempotent ticket creation. - A trace link returned to the webhook caller or dashboard. ## Workflow shape ```python @workflow async def triage_webhook(ctx: WorkflowContext, event: WebhookEvent) -> WebhookOutcome: deduped = await ctx.step(record_event_once, event.provider_event_id, event) if deduped.already_seen: return WebhookOutcome(status="duplicate", original_run_id=deduped.run_id) classification = await ctx.step(classify_incident_event, event) if classification.priority == "ignore": return WebhookOutcome(status="ignored") ticket = await ctx.step(create_incident_ticket_once, event.provider_event_id, classification) return WebhookOutcome(status="ticket_created", ticket_id=ticket.id) ``` The provider event ID is the key. It protects both dedupe and ticket creation. ## Webhook response Return quickly with a run ID when the provider requires a fast response. ```json { "accepted": true, "run_id": "run_01JWEBHOOK", "trace_url": "https://app.agnt5.com/runs/run_01JWEBHOOK" } ``` The trace becomes the operational record for the asynchronous work. ## Production checks - Duplicate provider deliveries return the original run or receipt. - Ticket creation uses the provider event ID as an idempotency key. - The trace includes raw event payload, classification, and side-effect receipt. - Late events are processed according to explicit business rules. - Failed events can be replayed locally. ## Next steps - [Retry AI workflow steps without duplicate side effects](/cookbooks/retry-without-duplicate-side-effects.md) - [Debug and replay a failed AI workflow](/cookbooks/debug-production-run.md) - [Debug AI workflows with traces, not scattered logs](/cookbooks/workflow-native-observability.md) --- ## Debug AI workflows with traces, not scattered logs _Source: https://agnt5.com/cookbooks/workflow-native-observability.md_ > Compare log-only debugging with workflow-native traces that preserve inputs, outputs, retries, and state. Logs are still useful, but they rarely preserve the full execution context of an AI workflow. This cookbook shows the same failure debugged with scattered logs and then with AGNT5 workflow-native traces. ## Scenario A lead-enrichment workflow returns the wrong company summary. The log line says the LLM call succeeded. The support team needs to know which source documents, tool outputs, prompts, retries, and state produced the answer. ## What you build - A workflow with step-level trace capture. - Minimal logs for infrastructure symptoms. - Trace inspection for inputs, outputs, state, retries, and parent-child calls. - A root-cause review flow that ends in a reproducible case. ## Workflow shape Use steps to make the execution graph explicit. ```python @workflow async def enrich_lead(ctx: WorkflowContext, lead_id: str) -> LeadBrief: lead = await ctx.step(load_lead, lead_id) search_results = await ctx.step(search_company_sources, lead.company) facts = await ctx.step(extract_company_facts, search_results) brief = await ctx.step(write_lead_brief, lead, facts) return await ctx.step(save_brief_once, lead.id, brief) ``` Each step boundary becomes a trace boundary. The trace is the system of record for what the workflow did. ## Log-only debugging With logs alone, you usually see symptoms: ```text INFO write_lead_brief completed model=gpt-4.1 latency_ms=1821 WARN user_reported_bad_summary lead_id=lead_123 ``` This does not answer which source was wrong, whether a retry changed the output, or whether the save step used the intended brief. ## Trace debugging With the AGNT5 trace, inspect: - `search_company_sources` input and source list, - `extract_company_facts` output and confidence, - `write_lead_brief` prompt, model output, and token usage, - retry attempts and final selected result, - the saved brief receipt. The trace points to the root cause: an outdated source was ranked first and passed through extraction. ## Production checks - Every user-visible result links to a run ID. - The trace has enough data to explain the output. - Logs link to run IDs instead of duplicating trace payloads. - Failed traces can be replayed or turned into eval cases. - Sensitive fields are redacted before trace storage where required. ## Next steps - [Turn a failed production AI run into an eval](/cookbooks/production-run-to-eval.md) - [Debug and replay a failed AI workflow](/cookbooks/debug-production-run.md) - [Build a data extraction workflow](/cookbooks/data-extraction.md) # API Reference --- ## Create a contact — `POST /v1/contacts` _Source: https://agnt5.com/api-reference/create-contact.md_ > Add a new contact to your contact list in Protocol. You must provide their Protocol username and phone number. **Endpoint**: `POST /v1/contacts` **Auth**: `Authorization: Bearer ` (required) **Content-Type**: `application/json` **Error envelope**: 4xx/5xx return `{"error": {"code": string, "message": string}}`
## Required attributes ## Optional attributes
## Request example ## Response
--- ## Delete a contact — `DELETE /v1/contacts/{id}` _Source: https://agnt5.com/api-reference/delete-contact.md_ > Delete a contact from your Protocol contact list. Once deleted, it cannot be recovered. **Endpoint**: `DELETE /v1/contacts/{id}` **Auth**: `Authorization: Bearer ` (required) **Path parameters**: `id` — the contact id to delete **Idempotency**: deleting a missing contact returns 404 (not idempotent); deletion is irreversible **Error envelope**: 4xx/5xx return `{"error": {"code": string, "message": string}}`
## Path parameters
## Request example ```json {{ title: 'Response' }} { "success": true, "message": "Contact successfully deleted" } ```
--- ## Get contacts — `GET /v1/contacts` _Source: https://agnt5.com/api-reference/get-contacts.md_ > Retrieve your contacts list from Protocol. You can optionally filter and paginate the results. **Endpoint**: `GET /v1/contacts` **Auth**: `Authorization: Bearer ` (required) **Query parameters**: see "Optional query parameters" below for filter and pagination keys **Error envelope**: 4xx/5xx return `{"error": {"code": string, "message": string}}`
## Optional query parameters
## Request example ```json {{ title: 'Response' }} { "data": [ { "id": "WAz8eIbvDR60rouK", "username": "johndoe", "phone_number": "+1 (555) 123-4567", "avatar_url": "https://assets.protocol.chat/avatars/johndoe.jpg", "display_name": "John Doe", "created_at": 692233200 }, { "id": "hSIhXBhNe8X1d8Et" // ... more contacts } ], "pagination": { "total": 35, "per_page": 20, "current_page": 1, "total_pages": 2 } } ```
--- ## Update a contact — `PATCH /v1/contacts/{id}` _Source: https://agnt5.com/api-reference/update-contact.md_ > Update an existing contact in your Protocol contact list. You can update any of the contact's attributes. **Endpoint**: `PATCH /v1/contacts/{id}` **Auth**: `Authorization: Bearer ` (required) **Content-Type**: `application/json` **Path parameters**: `id` — the contact id to update **Error envelope**: 4xx/5xx return `{"error": {"code": string, "message": string}}`
## Optional attributes
## Request example ```json {{ title: 'Response' }} { "id": "WAz8eIbvDR60rouK", "username": "johndoe", "phone_number": "+1 (555) 987-6543", "avatar_url": "https://assets.protocol.chat/avatars/johndoe.jpg", "display_name": "John Smith", "updated_at": 692233200 } ```
# SDK Reference --- ## SDK Overview _Source: https://agnt5.com/sdk/index_ > Client SDKs for Python, TypeScript, and Go to integrate AGNT5 into your applications # AGNT5 SDKs Build AI workflows with your favorite language using high-level APIs that scale from simple functions to complex multi-agent systems. ## Available SDKs AGNT5 provides native SDKs for the most popular programming languages: ### Python SDK The most full-featured SDK with async/await support, type hints, and Pydantic models. - **Type hints & Pydantic models** - Full type safety and data validation - **Async/await support** - Modern Python async patterns - **Rich debugging tools** - Comprehensive logging and error handling [Get started with Python →](/sdk/python.md) ### TypeScript SDK Full TypeScript support with comprehensive type definitions and excellent IDE integration. - **Full TypeScript support** - Complete type definitions - **Promise-based API** - Modern async JavaScript patterns - **Node.js & browser compatible** - Works everywhere JavaScript runs [Get started with TypeScript →](/sdk/typescript.md) ### Go SDK Lightweight and performant with full concurrency support, perfect for high-throughput applications. - **Goroutine-based concurrency** - Native Go concurrency patterns - **Comprehensive error handling** - Robust error management - **Zero external dependencies** - Minimal, self-contained library [Get started with Go →](/sdk/go.md) ## Quick Start 1. **Install the SDK** for your preferred language 2. **Configure authentication** with your API keys 3. **Create your first workflow** with the SDK [View installation guide →](/sdk/installation.md) --- ## AGNT5 Python SDK _Source: https://agnt5.com/sdk/python_ > Build AI agents and durable workflows with the AGNT5 Python SDK Build AI agents and reliable workflows with automatic recovery. AGNT5 combines agent orchestration and fault-tolerant execution in one lightweight framework. ## Primitives comparison | **Attribute** | **Function** | **Entity** | **Workflow** | **Agent** | **Tool** | |---|---|---|---|---|---| | **What** | Stateless operation with retries | Stateful component with unique key | Multi-step orchestrated process | LLM with instructions and tools | Python function LLMs can call | | **State** | None | Isolated per entity key | Isolated per workflow instance | Conversation history via Entity | None | | **Durability** | Automatic retries, checkpointing | Persistent state across runs | Checkpointed steps, resume on failure | Context preserved in Entity | Runs within agent context | | **Best For** | Document analysis, embeddings generation, LLM API calls | AI chat sessions, agent memory, conversation history | RAG pipelines, content generation with review, AI evals | Customer support, research assistants, code review | Vector search, knowledge base queries, API integrations | ## Key Features - **Automatic recovery** from failures with configurable retry policies - **Checkpointing** resumes from exact failure point - **Multi-agent coordination** via handoffs and composition - **Python-native** - decorators, async/await, type hints - **Multi-provider** - OpenAI, Anthropic, Groq, Azure, Bedrock, OpenRouter - **Built-in tracing** for debugging and monitoring ## Installation ```bash pip install agnt5 ``` ## Quick example ```python from agnt5 import Agent, workflow, tool, Context, WorkflowContext # Define a tool for the agent @tool(auto_schema=True) async def search_docs(ctx: Context, query: str) -> str: """Search documentation for answers.""" # Your search logic here return f"Found documentation about: {query}" # Create an AI agent with tools agent = Agent( name="assistant", model="openai/gpt-4o-mini", instructions="You are a helpful assistant. Search docs when needed.", tools=[search_docs] ) # Create a durable workflow that orchestrates the agent @workflow async def process_question(ctx: WorkflowContext, question: str) -> dict: """Durable workflow for processing questions.""" # Step 1: Get answer from agent (checkpointed) answer = await ctx.step("get_answer", agent.run(question)) # Step 2: Store result (checkpointed) await ctx.step("store", save_answer(question, answer)) return {"question": question, "answer": answer} # If this crashes after step 1, it resumes from step 2 on restart ``` **Note**: Set your `OPENAI_API_KEY` environment variable before running. ## Next Steps ### Getting Started - **[Quickstart](getting-started)** - Installation, first worker, and local development setup - **[Worker Runtime](worker)** - Configure and deploy workers ### Core Primitives - **[Functions](functions)** - Stateless operations with retries - **[Entities](entity)** - Stateful components with unique keys - **[Workflows](workflows)** - Multi-step orchestration patterns - **[Context API](context)** - Orchestration, state, AI, and observability APIs ### Agent Development Kit (ADK) - **[Agents](agent)** - Autonomous LLM-driven systems - **[Sessions](session)** - Conversation containers and multi-agent coordination - **[Tools](tool)** - Callable capabilities that extend agent abilities - **[Memory](memory)** - Long-term knowledge storage with semantic search ### Examples - **[Examples](examples/basic-worker)** - Practical usage examples --- ## Agents _Source: https://agnt5.com/sdk/python/agent_ > Autonomous LLM-driven systems with tool orchestration and reasoning Agents are autonomous LLM-driven systems that reason, plan, and execute tasks using tools. They orchestrate complex multi-step workflows by breaking down problems, selecting appropriate tools, and iterating until complete. ## Key Characteristics - **LLM-Powered** - Driven by language models for reasoning and decision-making - **Tool Orchestration** - Automatically selects and executes appropriate tools - **Memory Integration** - Maintains long-term knowledge across conversations - **Session Aware** - Uses sessions for conversation context - **Streaming Support** - Real-time event streaming for responsive UX - **Durable** - Built on AGNT5 primitives for automatic fault tolerance ## Basic Usage ### Simple Agent ```python from agnt5 import Agent, LanguageModel lm = LanguageModel() agent = Agent( name="assistant", model=lm, instructions="You are a helpful coding assistant." ) # Run agent result = await agent.run("Explain recursion") print(result.output) ``` ### Agent with Tools ```python from agnt5 import Agent, tool, LanguageModel @tool.function(auto_schema=True) def search_docs(query: str, language: str = "python") -> List[Dict]: """Search programming language documentation.""" # Implementation return search_results @tool.function(auto_schema=True) def run_code(code: str, language: str = "python") -> Dict[str, str]: """Execute code and return output.""" # Implementation return {"output": result} lm = LanguageModel() agent = Agent( name="coding_assistant", model=lm, instructions="""You are a coding assistant. Use search_docs to find API references. Use run_code to test code examples.""", tools=[search_docs, run_code] ) result = await agent.run("How do I read a file in Python? Show me an example.") ``` ### Agent with Session and Memory ```python from agnt5 import Agent, Session, Memory, LanguageModel # Create session session = Session( id="tutoring-session-789", user_id="student-123", metadata={"subject": "mathematics"} ) # Create memory memory = Memory(service=VectorMemoryService()) await memory.store("student_level", "Advanced calculus") # Create agent lm = LanguageModel() agent = Agent( name="math_tutor", model=lm, instructions="You are a patient math tutor. Adapt to student's level.", tools=[solve_equation_tool, plot_function_tool], session=session, memory=memory ) result = await agent.run("Help me understand limits") ``` ## Agent Configuration ### Parameters | Parameter | Type | Description | | --- | --- | --- | | `name` | `str` | Unique agent name | | `model` | `LanguageModel` | LLM to use for reasoning | | `instructions` | `str` | System prompt and guidelines | | `tools` | `List[Tool]` | Tools available to agent | | `session` | `Session \| None` | Session for conversation context | | `memory` | `Memory \| None` | Long-term knowledge storage | | `max_iterations` | `int` | Max reasoning loops (default: 10) | ### Instructions Write clear, actionable instructions: ✓ Good ✗ Avoid ```python agent = Agent( name="code_reviewer", model=lm, instructions="""You are an expert code reviewer specializing in Python. Review process: 1. Analyze code for complexity, duplication, style 2. Check for security vulnerabilities 3. Suggest improvements with code examples 4. Prioritize: security > correctness > performance > style Be constructive and explain your reasoning.""" ) ``` ```python agent = Agent( name="helper", model=lm, instructions="Help the user with stuff." # Too vague ) ``` ## Streaming Agents Stream events in real-time: ```python async for event in agent.stream("Analyze this dataset", session=session): match event.type: case "thinking": print(f"🤔 {event.content}") case "tool_call": print(f"🔧 Calling {event.tool_name}({event.arguments})") case "tool_result": print(f"✓ Result: {event.result}") case "response": print(f"💬 {event.content}") case "error": print(f"❌ Error: {event.error}") ``` ## Agent Planning Preview execution plan before running: ```python # Get plan without executing plan = agent.plan("Analyze competitor pricing strategies") print(f"Estimated steps: {len(plan.steps)}") for step in plan.steps: print(f"- {step.type}: {step.description}") if step.tool: print(f" Tool: {step.tool.name}") # Review and execute if approved if user_approves(plan): result = await agent.run("Analyze competitor pricing strategies") ``` ## Common Patterns ### Research Agent ```python from agnt5 import Agent, Session, Memory, tool @tool.function(auto_schema=True) def search_academic(query: str, year_from: int = 2020) -> List[Dict]: """Search academic papers.""" pass @tool.function(auto_schema=True) def extract_insights(paper_text: str) -> Dict[str, List[str]]: """Extract key insights from paper.""" pass # Create research agent session = Session(id="research-ai-safety-001", user_id="researcher-123") memory = Memory(service=VectorMemoryService()) lm = LanguageModel() research_agent = Agent( name="research_agent", model=lm, instructions="""You are a research assistant specializing in AI safety. Research process: 1. Search for relevant recent papers 2. Extract key insights from each paper 3. Identify common themes and gaps 4. Synthesize findings into comprehensive summary""", tools=[search_academic, extract_insights], session=session, memory=memory ) result = await research_agent.run( "Survey the current state of AI alignment research" ) # Store findings in memory await memory.ingest_from_session(session, strategy="smart") ``` ### Multi-Agent Workflow ```python # Shared session for coordination session = Session(id="product-launch-001", user_id="pm-456") # Specialized agents market_researcher = Agent( name="market_analyst", model=lm, tools=[market_data_tool, competitor_analysis_tool], session=session, instructions="Analyze market opportunities and competitive landscape." ) product_designer = Agent( name="designer", model=lm, tools=[design_tool, user_research_tool], session=session, instructions="Design products based on market research and user needs." ) technical_lead = Agent( name="tech_lead", model=lm, tools=[architecture_tool, feasibility_tool], session=session, instructions="Assess technical feasibility and propose architecture." ) # Sequential execution with shared context market_analysis = await market_researcher.run( "Analyze market for AI-powered code review tools" ) product_specs = await product_designer.run( "Design product based on market analysis" ) tech_assessment = await technical_lead.run( "Evaluate technical feasibility of proposed product" ) ``` ### Agent Handoff ```python from agnt5.tools import AgentTool # Create specialized agents billing_agent = Agent( name="billing_specialist", model=lm, tools=[payment_tool, invoice_tool, refund_tool], instructions="Handle billing, payments, and refunds." ) technical_agent = Agent( name="tech_support", model=lm, tools=[diagnostic_tool, fix_tool], instructions="Diagnose and fix technical issues." ) # Coordinator with handoff capability coordinator = Agent( name="coordinator", model=lm, tools=[ classify_request_tool, AgentTool(target_agent=billing_agent), AgentTool(target_agent=technical_agent) ], instructions="""You are a support coordinator. Classify requests and hand off to appropriate specialist. Hand off to: - billing_specialist: payment, invoice, refund questions - tech_support: technical issues, bugs, troubleshooting""" ) session = Session(id="support-ticket-789", user_id="customer-123") result = await coordinator.run( "I was charged twice for my subscription", session=session ) ``` ### Human-in-the-Loop Agent ```python @tool.function(auto_schema=True, confirmation=True) def deploy_to_production(version: str) -> Dict[str, str]: """Deploy application to production. Warning: Requires human approval. """ pass deployment_agent = Agent( name="deployer", model=lm, tools=[run_tests_tool, deploy_to_production], instructions="""Run all tests before deploying. Always request human approval for production deployments.""" ) result = await deployment_agent.run("Deploy version 2.0 to production") # Agent runs tests, then waits for human approval before deploying ``` ### Iterative Problem Solving ```python debugging_agent = Agent( name="debugger", model=lm, tools=[ analyze_logs_tool, run_diagnostic_tool, apply_fix_tool, verify_fix_tool ], instructions="""You are a debugging assistant. Process: 1. Analyze error logs to identify root cause 2. Run diagnostics to confirm hypothesis 3. Apply potential fix 4. Verify fix works 5. If not fixed, iterate (max 3 attempts) Always verify fixes before considering issue resolved.""" ) result = await debugging_agent.run( "Users are experiencing 500 errors on the checkout page" ) ``` ## Best Practices ### 1. Write Clear Instructions Provide specific, actionable guidance: ```python # ✓ Good - Specific process agent = Agent( name="analyst", instructions="""Analyze data systematically: 1. Identify data patterns and anomalies 2. Calculate key statistics 3. Generate visualizations 4. Provide actionable insights""" ) # ✗ Bad - Too vague agent = Agent( name="helper", instructions="Help with analysis" ) ``` ### 2. Use Sessions for Coordination Share context across agents: ```python # Create shared session session = Session(id="project-workflow-123", user_id="user-456") # Set shared context session.set_state("project_name", "ai-safety-research") session.set_state("deadline", "2024-12-31") # All agents access shared context agent1 = Agent(name="agent1", session=session, ...) agent2 = Agent(name="agent2", session=session, ...) ``` ### 3. Leverage Memory Use memory for persistent knowledge: ```python # Store user preferences await memory.store("user_expertise", "Expert in React and TypeScript") await memory.store("coding_style", "Prefers functional programming") # Agent recalls automatically agent = Agent( name="assistant", model=lm, tools=[code_gen_tool], memory=memory ) result = await agent.run("Help me build a component") # Agent uses stored preferences ``` ### 4. Limit Iterations Prevent infinite loops: ```python agent = Agent( name="bounded_agent", model=lm, max_iterations=5, # Stop after 5 reasoning loops instructions="Solve problems efficiently." ) ``` ## Agent Architecture Agents orchestrate AGNT5 primitives: 1. **LLM Core** - Language model for reasoning 2. **Tool Execution** - Tools built on Function primitive 3. **State Management** - Sessions use Entity for state 4. **Long-Term Storage** - Memory uses Entity for persistence 5. **Orchestration** - Workflow patterns for multi-step tasks 6. **Streaming** - Real-time event emission ``` Agent ├── LanguageModel (reasoning) ├── Tools (actions via Function) ├── Session (context via Entity) ├── Memory (knowledge via Entity) └── Planner (orchestration) ``` ## Comparison with Primitives | Aspect | Function | Workflow | Agent | | --- | --- | --- | --- | | Autonomy | None | Scripted | Autonomous | | Decision Making | Pre-programmed | Control flow | LLM-driven | | Tool Use | N/A | Explicit calls | Dynamic selection | | Adaptability | Fixed | Fixed steps | Adaptive reasoning | | Use Case | Single operation | Multi-step process | Complex tasks | **When to use Function:** - Single, deterministic operation - No decision-making needed **When to use Workflow:** - Pre-defined multi-step process - Explicit control flow **When to use Agent:** - Complex, open-ended tasks - Requires reasoning and adaptation - Dynamic tool selection needed ## Next Steps - [Session](session) - Agent conversation context - [Tool](tool) - Agent capabilities - [Memory](memory) - Agent long-term knowledge - [Workflows](workflows) - Orchestration patterns - [Context API](context) - Agent execution context --- ## Decorators API _Source: https://agnt5.com/sdk/python/api/decorators_ > Complete API reference for AGNT5 Python SDK decorators Complete API reference for decorator-based component registration in the AGNT5 Python SDK. ## `@function` Register a Python callable as an invokable component. ### Signature ```python def function(name: str | None = None) -> Callable[[Callable[..., Any]], Callable[..., Any]] ``` ### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `name` | `str \| None` | Override the registered function name. Defaults to the original function name. | ### Returns Decorated function with AGNT5 metadata annotations. ### Examples ```python from agnt5 import function # Basic function registration @function() def greet(name: str) -> str: return f"Hello, {name}!" # Custom function name @function("math.add") def add_numbers(a: int, b: int) -> int: return a + b # Function with context @function() def context_handler(ctx: ExecutionContext, data: dict) -> dict: return { "invocation_id": ctx.invocation_id, "data": data } ``` ### Function Annotations The decorator adds these attributes to the decorated function: | Attribute | Type | Description | |-----------|------|-------------| | `_agnt5_handler_name` | `str` | Registered handler name | | `_agnt5_is_function` | `bool` | Always `True` for functions | ## `@handler` Alias for `@function` decorator. ```python from agnt5 import handler @handler() def my_handler(data: str) -> str: return data.upper() ``` ## `@workflow` Register a workflow definition factory. ### Signature ```python def workflow(name: str | None = None) -> Callable[[Callable[[], FlowDefinition]], Callable[[], FlowDefinition]] ``` ### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `name` | `str \| None` | Override the registered workflow name. Defaults to the factory function name. | ### Returns Decorated workflow factory function. ### Examples ```python from agnt5 import workflow, task_step from agnt5.workflows import FlowDefinition @workflow() def data_pipeline() -> FlowDefinition: return FlowDefinition([ task_step("extract", service_name="etl", handler_name="extract_data"), task_step("transform", service_name="etl", handler_name="transform_data", dependencies=["extract"]) ]) @workflow("custom_name") def workflow_factory() -> FlowDefinition: return FlowDefinition([...]) ``` ## Registry Functions ### `get_registered_functions` Get all registered function handlers. ```python def get_registered_functions() -> Dict[str, Callable] ``` #### Returns Dictionary mapping handler names to callable functions. #### Example ```python from agnt5.decorators import get_registered_functions @function() def test_handler(data: str) -> str: return data functions = get_registered_functions() print(functions) # {'test_handler': } ``` ### `get_function_metadata` Inspect metadata for a decorated function. ```python def get_function_metadata(func: Callable) -> dict | None ``` #### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `func` | `Callable` | Decorated function to inspect | #### Returns Metadata dictionary or `None` for non-decorated functions. #### Metadata Structure ```python { "name": "function_name", "type": "function", "parameters": [ { "name": "param_name", "type": "str", "required": True, "default": None # or default value } ], "return_type": "str" } ``` #### Example ```python from agnt5.decorators import get_function_metadata @function() def sample_function(name: str, age: int = 25) -> dict: return {"name": name, "age": age} metadata = get_function_metadata(sample_function) print(metadata) # { # "name": "sample_function", # "type": "function", # "parameters": [ # {"name": "name", "type": "str", "required": True}, # {"name": "age", "type": "int", "required": False, "default": 25} # ], # "return_type": "dict" # } ``` ### `clear_registry` Clear the function registry (primarily for testing). ```python def clear_registry() -> None ``` #### Example ```python from agnt5.decorators import clear_registry, get_registered_functions # Clear all registered functions clear_registry() # Verify registry is empty functions = get_registered_functions() assert len(functions) == 0 ``` ## Execution Functions ### `execute_component` Execute a registered component directly (low-level interface). ```python def execute_component( handler_name: str, input_data: bytes, context: Any | None = None ) -> bytes ``` #### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `handler_name` | `str` | Name of registered handler | | `input_data` | `bytes` | JSON-encoded input data | | `context` | `Any \| None` | Execution context (optional) | #### Returns JSON-encoded result as bytes. #### Behavior 1. Resolves handler by name (raises `ValueError` if not found) 2. Decodes `input_data` from JSON 3. Invokes handler with or without context based on signature 4. Serializes result to JSON bytes 5. Wraps exceptions in `RuntimeError` with detailed logging #### Example ```python from agnt5.decorators import execute_component import json @function() def test_handler(data: str) -> str: return data.upper() # Execute directly input_data = json.dumps("hello").encode() result = execute_component("test_handler", input_data) # Parse result output = json.loads(result.decode()) print(output) # "HELLO" ``` ## Error Handling ### Function Registration Errors ```python # Duplicate names raise ValueError during registration @function("duplicate") def handler1(data: str) -> str: return data @function("duplicate") # Raises ValueError def handler2(data: str) -> str: return data ``` ### Execution Errors ```python # Handler not found try: execute_component("nonexistent", b'{}') except ValueError as e: print(f"Handler error: {e}") # Runtime errors are wrapped @function() def failing_handler(data: dict) -> dict: raise ValueError("Processing failed") try: execute_component("failing_handler", b'{}') except RuntimeError as e: print(f"Execution error: {e}") ``` ## Type Support ### Supported Handler Signatures ```python # No context parameter @function() def simple_handler(data: str) -> str: return data # With context parameter @function() def context_handler(ctx: ExecutionContext, data: str) -> str: return f"{ctx.invocation_id}: {data}" # Async handlers @function() async def async_handler(data: str) -> str: return data.upper() # Streaming handlers @function(streaming=True) async def streaming_handler(data: str): for char in data: yield char ``` ### Parameter Detection The decorator automatically detects context parameters: - If first parameter is named `ctx`, `context`, or has type annotation `ExecutionContext`, it's treated as a context parameter - Context parameters are omitted from metadata parameter lists - Handler invocation includes context only when expected ## Testing Support ### Mock Registry ```python from unittest.mock import patch from agnt5.decorators import get_registered_functions def test_with_clean_registry(): with patch('agnt5.decorators._function_registry', {}): # Test with isolated registry @function() def test_func(data: str) -> str: return data functions = get_registered_functions() assert "test_func" in functions ``` ### Direct Testing ```python import pytest from agnt5.decorators import execute_component from agnt5.components import ExecutionContext, ComponentType def test_function_execution(): @function() def test_handler(data: str) -> str: return data.upper() # Test with execute_component import json input_data = json.dumps("hello").encode() result = execute_component("test_handler", input_data) assert json.loads(result.decode()) == "HELLO" def test_context_function(): @function() def context_handler(ctx: ExecutionContext, data: str) -> dict: return { "invocation_id": ctx.invocation_id, "data": data } # Create mock context from unittest.mock import Mock ctx = Mock(spec=ExecutionContext) ctx.invocation_id = "test-123" result = context_handler(ctx, "test") assert result["invocation_id"] == "test-123" ``` ## Best Practices ### Naming Conventions ```python # Good: Descriptive names @function("user.create") def create_user(user_data: dict) -> dict: return create_user_record(user_data) # Good: Service-scoped names @function("email.send_notification") def send_email(recipient: str, message: str) -> bool: return send_email_message(recipient, message) # Avoid: Generic names @function("process") # Too generic def process_data(data: dict) -> dict: return data ``` ### Error Handling ```python @function() def robust_handler(data: dict) -> dict: try: # Validate input if not isinstance(data, dict): return {"error": "Input must be a dictionary"} # Process data result = process_business_logic(data) return {"success": True, "result": result} except ValueError as e: return {"error": f"Validation error: {e}"} except Exception as e: # Log error for debugging logger.error(f"Unexpected error: {e}") return {"error": "Internal error"} ``` ### Type Annotations ```python from typing import Dict, List, Optional, Union @function() def typed_handler( items: List[str], metadata: Optional[Dict[str, Union[str, int]]] = None ) -> Dict[str, Union[List[str], bool]]: return { "processed_items": [item.upper() for item in items], "has_metadata": metadata is not None } ``` ## Next Steps - [Components API](components) - Component classes and execution context - [Worker API](worker) - Worker runtime and configuration - [Workflows API](workflows) - Workflow definition and step utilities --- ## Context API _Source: https://agnt5.com/sdk/python/context_ > Execution context with APIs for orchestration, state, AI, and observability The **Context** (`ctx`) is the execution environment provided to all AGNT5 components. It provides APIs for orchestration, state management, LLM interactions, coordination, and observability. ## Core Capabilities - **Orchestration** - Execute tasks, spawn functions, parallel execution - **State Management** - Get/set/delete state for entities - **Coordination** - Signals, timers, human approvals - **AI Integration** - LLM calls, tool registration - **Observability** - Logging, metrics, tracing ## Orchestration APIs ### Task Execution Execute functions and wait for results (workflows only): ```python @workflow async def process_workflow(ctx): # Execute a task result = await ctx.task( service_name="analytics", handler_name="process_data", input={"dataset": "users"} ) return result ``` ### Parallel Execution Run multiple tasks concurrently: Parallel Gather (Named) ```python # Returns list of results in order results = await ctx.parallel( ctx.task("service1", "handler1"), ctx.task("service2", "handler2"), ctx.task("service3", "handler3") ) # Access results by index result1 = results[0] result2 = results[1] ``` ```python # Returns dict with named results results = await ctx.gather( db=ctx.task("analytics", "analyze_db"), api=ctx.task("analytics", "analyze_api"), cache=ctx.task("analytics", "analyze_cache") ) # Access results by name db_result = results["db"] api_result = results["api"] ``` ### Async Invocation Spawn child functions without waiting: ```python @function async def batch_processor(ctx, items: list): # Spawn child invocations handles = [] for item in items: handle = ctx.spawn(process_item, item, key=f"item-{item['id']}") handles.append(handle) # Continue other work... # Wait for results later if needed results = [await h.result() for h in handles] return {"processed": len(results)} ``` ### Checkpointing Checkpoint expensive operations (functions only): ```python @function async def process_pipeline(ctx, data_id: str): # Each step is checkpointed raw = await ctx.step("extract", lambda: extract_data(data_id)) cleaned = await ctx.step("clean", lambda: clean_data(raw)) result = await ctx.step("analyze", lambda: analyze(cleaned)) # If crash occurs, resumes from last completed step return result ``` ## State Management (Entities) ### Get, Set, Delete Manage entity state: ```python @entity.write async def update_profile(ctx, name: str, age: int): # Get with default profile = await ctx.get("profile", {}) # Update profile profile.update({"name": name, "age": age}) # Set state ctx.set("profile", profile) ctx.set("last_updated", datetime.now().isoformat()) # Delete temporary data ctx.delete("temp_cache") return {"status": "updated"} ``` ### Entity Method Calls Call entity methods from functions: ```python @function async def chat(ctx, conversation_id: str, message: str): # Call entity method response = await ctx.entity( "ChatAgent", conversation_id ).send_message(message) return response ``` ## Coordination APIs ### Signals Wait for external events: Wait for Signal Emit Signal ```python @workflow async def approval_workflow(ctx, document_id: str): # Submit for review await ctx.task("docs", "submit_review", input={"id": document_id}) # Wait for approval signal (24 hours timeout) approval = await ctx.signal( "manager_approved", timeout_ms=86400000, default={"approved": False} ) if approval["approved"]: await ctx.task("docs", "publish", input={"id": document_id}) return {"status": "published"} else: return {"status": "rejected"} ``` ```python @function async def approve_document(ctx, workflow_id: str): # Send approval signal to waiting workflow await ctx.signal.emit( "manager_approved", target_workflow_id=workflow_id, payload={"approved": True, "approver": "manager@example.com"} ) return {"status": "signal_sent"} ``` ### Timers & Sleep Add delays and scheduled execution: ```python @workflow async def scheduled_job(ctx): # Wait 5 seconds await ctx.timer(delay_ms=5000) # Or use sleep (alternative syntax) await ctx.sleep(30) # 30 seconds # Wait until specific time (cron) await ctx.timer(cron="0 0 * * *") # Daily at midnight return {"status": "completed"} ``` ### Human-in-the-Loop Request human approval: ```python @workflow async def deployment_workflow(ctx, version: str): # Run tests test_results = await ctx.task("ci", "run_tests", input={"version": version}) if test_results["passed"]: # Request human approval for production approval = await ctx.human.approval( "deploy_production", payload={"version": version, "tests": test_results}, timeout=timedelta(minutes=30), required_roles=["admin", "devops"] ) if approval.decision == "approved": await ctx.task("deploy", "to_production", input={"version": version}) return {"status": "deployed"} return {"status": "cancelled"} ``` ## AI Integration ### LLM Generation Generate text or structured responses: Simple Chat Structured ```python @function async def summarize(ctx, text: str): response = await ctx.llm.generate( prompt=f"Summarize this text: {text}", model="gpt-4o-mini" ) return {"summary": response.text} ``` ```python @function async def chat_response(ctx, messages: list): response = await ctx.llm.generate( prompt=[ {"role": "system", "content": "You are a helpful assistant"}, *messages ], model="gpt-4" ) return {"response": response.text} ``` ```python @function async def extract_info(ctx, text: str): # JSON Schema-constrained output response = await ctx.llm.generate( prompt=f"Extract information: {text}", schema={ "type": "object", "properties": { "name": {"type": "string"}, "age": {"type": "integer"}, "email": {"type": "string"} }, "required": ["name"] }, model="gpt-4o-mini" ) return response.object # Parsed JSON ``` ### Streaming Stream responses for real-time output: ```python @function async def stream_story(ctx, topic: str): # Stream text generation async for chunk in await ctx.llm.stream( prompt=f"Write a story about {topic}", model="gpt-4o" ): if chunk.text: yield chunk.text # Stream to client ``` ### Tool Registration Register tools for LLM use: ```python @function async def agent_with_tools(ctx, query: str): # Register search tool search_tool = ctx.tools.register( "web_search", handler=perform_search, description="Search the web for information", schema={ "type": "object", "properties": { "query": {"type": "string"}, "max_results": {"type": "integer"} } } ) # Generate with tool response = await ctx.llm.generate( prompt=query, tools=[search_tool], model="gpt-4o" ) return response ``` ## Observability ### Logging Structured logging with context: ```python @function async def tracked_operation(ctx, data: dict): logger = ctx.log() logger.info("Processing started", extra={"data_size": len(data)}) try: result = process(data) logger.info("Processing completed", extra={"result_size": len(result)}) return result except Exception as e: logger.error("Processing failed", exc_info=True) raise ``` ### Metrics Record custom metrics: ```python @function async def monitored_function(ctx, request: dict): metrics = ctx.metrics() # Increment counter metrics.increment("requests.count", service="api") # Record timing start = time.time() result = await process_request(request) duration = (time.time() - start) * 1000 metrics.observe("latency.ms", duration, endpoint="/api/process") return result ``` ### Distributed Tracing Create spans for tracing: ```python @function async def traced_operation(ctx, data: dict): # Create span for external API call with ctx.trace_span().start("external_api_call", service="payments"): result = await call_payment_api(data) # Create span for database operation with ctx.trace_span().start("database_query", service="postgres"): await save_to_db(result) return result ``` ## Configuration & Secrets ### Secrets Access secrets securely: ```python @function async def api_call(ctx, endpoint: str): # Get API key from secrets api_key = ctx.secrets().get("openai_api_key") db_password = ctx.secrets().get("database_password") # Use secrets in API calls response = await make_request(endpoint, api_key=api_key) return response ``` ### Configuration Feature flags and config: ```python @function async def feature_gated_handler(ctx, data: dict): config = ctx.config() # Check feature flag if config.get("new_feature_enabled", default=False): return await new_implementation(data) else: return await legacy_implementation(data) # A/B testing variant variant = config.variant("experiment_group", default="control") if variant == "treatment": return await experimental_flow(data) ``` ### Request Headers Access incoming headers: ```python @function async def header_aware(ctx, data: dict): headers = ctx.headers() user_agent = headers.get("user-agent", "unknown") correlation_id = headers.get("x-correlation-id") logger = ctx.log() logger.info(f"Request from {user_agent}", extra={"correlation_id": correlation_id}) return {"processed": True} ``` ## Context Properties Access execution metadata: ```python @function async def introspective(ctx, data: dict): return { "run_id": ctx.run_id, # Workflow/run identifier "step_id": ctx.step_id, # Current step identifier "attempt": ctx.attempt, # Retry attempt number "component_type": ctx.component_type, # "function", "entity", "workflow" "object_id": ctx.object_id, # Entity key (for entities) "method_name": ctx.method_name, # Entity method name (for entities) "processed": data } ``` ## API Reference ### Orchestration | API | Description | | --- | --- | | `ctx.task(service, handler, input)` | Execute function (workflows only) | | `ctx.parallel(*tasks)` | Run tasks in parallel | | `ctx.gather(**tasks)` | Parallel with named results | | `ctx.spawn(fn, *args, key)` | Async child invocation | | `ctx.step(name, fn)` | Checkpoint operation (functions) | ### State (Entities) | API | Description | | --- | --- | | `await ctx.get(key, default)` | Get state value | | `ctx.set(key, value)` | Set state value | | `ctx.delete(key)` | Delete state key | | `await ctx.entity(type, key).method()` | Call entity method | ### Coordination | API | Description | | --- | --- | | `await ctx.signal(name, timeout_ms, default)` | Wait for signal | | `await ctx.signal.emit(name, payload)` | Send signal | | `await ctx.timer(delay_ms)` | Wait with delay | | `await ctx.timer(cron)` | Wait until cron time | | `await ctx.sleep(seconds)` | Durable sleep | | `await ctx.human.approval(...)` | Request approval | ### AI Integration | API | Description | | --- | --- | | `await ctx.llm.generate(prompt, model)` | Generate text/JSON | | `await ctx.llm.stream(prompt, model)` | Stream generation | | `ctx.tools.register(name, handler, schema)` | Register tool | ### Observability | API | Description | | --- | --- | | `ctx.log()` | Get logger | | `ctx.metrics()` | Get metrics recorder | | `ctx.trace_span().start(name, service)` | Create trace span | ### Configuration | API | Description | | --- | --- | | `ctx.secrets().get(key)` | Get secret | | `ctx.config().get(key, default)` | Get config value | | `ctx.config().variant(key, default)` | Get A/B variant | | `ctx.headers()` | Get request headers | ## Common Patterns ### Parallel with Error Handling ```python @workflow async def robust_workflow(ctx): results = await ctx.gather( task1=ctx.task("svc", "task1"), task2=ctx.task("svc", "task2") ) if results["task1"] and results["task2"]: return {"status": "success", "results": results} else: return {"status": "partial_failure"} ``` ### Conditional Signal Waiting ```python @workflow async def conditional_approval(ctx, needs_approval: bool): if needs_approval: approval = await ctx.signal("approval_signal", timeout_ms=60000) if not approval.get("approved"): return {"status": "rejected"} # Proceed with operation result = await ctx.task("service", "operation") return {"status": "completed", "result": result} ``` ### LLM with Tool Execution ```python @function async def agent_handler(ctx, query: str): # Register tools search = ctx.tools.register("search", handler=search_web, ...) calc = ctx.tools.register("calculator", handler=calculate, ...) # Generate with tools response = await ctx.llm.generate( prompt=query, tools=[search, calc], model="gpt-4o" ) # Execute tool calls if needed if response.tool_calls: for tool_call in response.tool_calls: handler = ctx.tools.handler(tool_call.name) await handler(**tool_call.arguments) return response ``` ## Next Steps - [Functions](functions) - Using context in functions - [Entity](entity) - Using context in entities - [Workflows](workflows) - Using context in workflows - [Agent](agent) - AI integration with context --- ## Entities _Source: https://agnt5.com/sdk/python/entity_ > Stateful components with unique keys and single-writer consistency Entities are stateful components identified by unique keys. Use entities to model AI agents with conversation history, workflow orchestrators, or any business object that maintains state across interactions. ## Key Characteristics - **Unique Key** - Each instance identified by a unique key (e.g., `agent-conv-123`) - **Private State** - Built-in key-value storage per instance - **Single-Writer** - Automatic consistency - only one write operation per key at a time - **Durable** - State survives crashes and restarts - **Scalable** - Different keys execute in parallel **Implementation Status** Entities are being implemented in Phase 2 of AGNT5 (Target: Q1 2025). The API shown represents the planned design. Check current SDK status for availability. ## Basic Usage ### Creating an Entity ```python from agnt5 import entity # Create entity type agent = entity("ConversationAgent") # Write method (exclusive access per key) @agent.write async def send_message(ctx, message: str) -> dict: history = await ctx.get("history", []) history.append({"role": "user", "content": message}) response = await call_llm(history) history.append({"role": "assistant", "content": response}) ctx.set("history", history) return {"response": response} # Shared method (read-only, concurrent) @agent.shared async def get_history(ctx) -> list: return await ctx.get("history", []) ``` ### Calling Entities Call entity methods from functions: ```python from agnt5 import function @function async def chat(ctx, conv_id: str, msg: str): # Call entity method with unique key return await ctx.entity("ConversationAgent", conv_id).send_message(msg) ``` ## Entity API ### Core Methods | API | Description | | --- | --- | | `entity("name")` | Create entity type | | `@entity.write` | Write method (exclusive per key) | | `@entity.shared` | Shared method (read-only, concurrent) | | `ctx.get(key, default)` | Get state value | | `ctx.set(key, value)` | Set state value | | `ctx.delete(key)` | Delete state key | | `ctx.entity(type, key).method()` | Call entity from function | ### State Operations Get Set Delete ```python @agent.write async def process(ctx, data: dict) -> dict: # Get with default history = await ctx.get("history", []) count = await ctx.get("count", 0) return {"history": history, "count": count} ``` ```python @agent.write async def update_state(ctx, new_data: dict) -> dict: # Set values ctx.set("last_update", datetime.now().isoformat()) ctx.set("data", new_data) ctx.set("version", 2) return {"status": "updated"} ``` ```python @agent.write async def clear_cache(ctx) -> dict: # Delete keys ctx.delete("cached_results") ctx.delete("temporary_data") return {"status": "cleared"} ``` ## Common Patterns ### Conversational AI Agent ```python agent = entity("ChatAgent") @agent.write async def send_message(ctx, message: str) -> dict: """Handle conversational turns with LLM.""" history = await ctx.get("history", []) history.append({"role": "user", "content": message}) # Generate response response = await ctx.llm.generate( prompt=history, model="gpt-4" ) history.append({"role": "assistant", "content": response.text}) # Keep last 20 messages if len(history) > 20: history = history[-20:] ctx.set("history", history) return {"response": response.text} @agent.shared async def get_history(ctx) -> list: """Get conversation history (read-only).""" return await ctx.get("history", []) @agent.shared async def get_message_count(ctx) -> int: """Get total message count.""" history = await ctx.get("history", []) return len(history) ``` Usage: ```python @function async def chat_endpoint(ctx, conversation_id: str, message: str): # Call entity with unique conversation ID return await ctx.entity("ChatAgent", conversation_id).send_message(message) ``` ### Research Agent ```python research_agent = entity("ResearchAgent") @research_agent.write async def start_research(ctx, topic: str) -> dict: """Initialize research task.""" ctx.set("topic", topic) ctx.set("findings", []) ctx.set("status", "in_progress") return {"status": "started", "topic": topic} @research_agent.write async def add_finding(ctx, finding: str, source: str) -> dict: """Add research finding.""" findings = await ctx.get("findings", []) findings.append({ "content": finding, "source": source, "timestamp": datetime.now().isoformat() }) ctx.set("findings", findings) return {"count": len(findings)} @research_agent.write async def synthesize(ctx) -> dict: """Generate summary from findings.""" findings = await ctx.get("findings", []) topic = await ctx.get("topic") # Use LLM to synthesize summary = await ctx.llm.generate( prompt=f"Synthesize these findings about {topic}: {findings}", model="gpt-4" ) ctx.set("summary", summary.text) ctx.set("status", "completed") return {"summary": summary.text} @research_agent.shared async def get_progress(ctx) -> dict: """Check research progress.""" return { "status": await ctx.get("status"), "topic": await ctx.get("topic"), "findings_count": len(await ctx.get("findings", [])) } ``` ### Workflow Orchestrator ```python workflow = entity("WorkflowOrchestrator") @workflow.write async def start(ctx, steps: list) -> dict: """Start workflow execution.""" ctx.set("steps", steps) ctx.set("current_step", 0) ctx.set("results", []) ctx.set("status", "running") return {"status": "started", "total_steps": len(steps)} @workflow.write async def complete_step(ctx, result: dict) -> dict: """Mark step as complete and store result.""" results = await ctx.get("results", []) results.append(result) ctx.set("results", results) current = len(results) ctx.set("current_step", current) # Check if workflow is done steps = await ctx.get("steps", []) if current >= len(steps): ctx.set("status", "completed") return {"completed": current, "total": len(steps)} @workflow.shared async def get_progress(ctx) -> dict: """Get workflow progress.""" return { "current_step": await ctx.get("current_step", 0), "total_steps": len(await ctx.get("steps", [])), "status": await ctx.get("status", "unknown") } ``` ## Consistency & Concurrency ### Single-Writer Per Key Only one write operation per entity key executes at a time: ```python # Same key = serial execution (consistency guaranteed) await ctx.entity("agent", "conv-1").send_message("msg1") # Runs first await ctx.entity("agent", "conv-1").send_message("msg2") # Runs second # No race conditions, no lost updates ``` ### Parallel Execution Across Keys Different entity keys execute in parallel: ```python # Different keys = parallel execution (scales horizontally) await ctx.entity("agent", "conv-1").send_message(msg) # Parallel await ctx.entity("agent", "conv-2").send_message(msg) # Parallel await ctx.entity("agent", "conv-3").send_message(msg) # Parallel ``` ### Shared Methods for Reads Use `@entity.shared` for read-only operations that can run concurrently: ```python # Multiple shared calls can run in parallel for same key @agent.shared async def get_history(ctx) -> list: return await ctx.get("history", []) # These execute concurrently await ctx.entity("agent", "conv-1").get_history() # Concurrent await ctx.entity("agent", "conv-1").get_history() # Concurrent ``` ## Best Practices ### 1. Choose Stable, Meaningful Keys Use unique, stable identifiers for entity keys: ✓ Good Keys ✗ Avoid ```python # Descriptive and stable "agent-conv-{conversation_id}" "workflow-{run_id}" "user-{user_id}" "research-{task_id}" ``` ```python # Not descriptive "abc123" # Changes every time "user-{timestamp}" # Too generic "agent-1" ``` ### 2. Design for Concurrency Choose key granularity for optimal parallelism: ```python # ✓ Good - One entity per conversation await ctx.entity("ChatAgent", f"conv-{conv_id}").send_message(msg) # ✗ Bad - Single global entity (serializes everything) await ctx.entity("ChatAgent", "global").send_message(msg) ``` ### 3. Use Shared for Read Operations Enable concurrent reads with `@entity.shared`: ```python # Write methods - exclusive access @agent.write async def update_state(ctx, data: dict): ctx.set("state", data) # Read methods - concurrent access @agent.shared async def get_state(ctx) -> dict: return await ctx.get("state", {}) ``` ### 4. Keep State Minimal Store only what you need: ```python # ✓ Good - Essential state only ctx.set("history", recent_messages[-20:]) ctx.set("summary", summary_text) # ✗ Avoid - Excessive state ctx.set("full_transcript", all_messages) # Could be huge ctx.set("raw_responses", all_llm_responses) # Redundant ``` ## Entity Use Cases | Use Case | Entity Key | State Stored | | --- | --- | --- | | AI Chat Agent | `agent-conv-{id}` | Conversation history, context | | Research Task | `research-{task_id}` | Findings, sources, summary | | Workflow Orchestrator | `workflow-{run_id}` | Step progress, results | | User Context | `user-{user_id}` | Preferences, personalization | | Shopping Cart | `cart-{session_id}` | Items, totals, discounts | | Game Session | `game-{session_id}` | Player state, score, progress | ## Functions vs Entities | Aspect | Functions | Entities | | --- | --- | --- | | State | Stateless | Stateful (KV store) | | Identity | No identity | Unique key per instance | | Concurrency | Parallel by default | Serial per key, parallel across keys | | Consistency | No consistency needed | Single-writer guarantee | | Use Case | Transformations, API calls | Stateful AI agents, workflows | **When to use Functions:** - Stateless operations - Independent requests - Data transformations - API integrations **When to use Entities:** - Stateful AI agents with memory - Workflow orchestration - User sessions and context - Any state that needs consistency ## Next Steps - [Context API](context) - Entity state operations and APIs - [Functions](functions) - Stateless operations - [Workflows](workflows) - Multi-step orchestration - [Agent Component](agent) - AI agents built on entities --- ## Basic Worker _Source: https://agnt5.com/sdk/python/examples/basic-worker_ > Simple standalone worker with function handlers A minimal AGNT5 worker demonstrating function registration and execution. ## Complete Example ```python title="worker.py" import asyncio import logging from agnt5 import Worker, function # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @function() def greet(name: str) -> str: """Greet a user by name.""" logger.info(f"Greeting user: {name}") return f"Hello, {name}!" @function("math.add") def add_numbers(a: int, b: int) -> int: """Add two numbers together.""" logger.info(f"Adding {a} + {b}") return a + b @function() def process_data(data: dict) -> dict: """Process a data dictionary.""" logger.info(f"Processing data with {len(data)} keys") # Simulate processing processed_data = { key: str(value).upper() if isinstance(value, str) else value for key, value in data.items() } return { "original": data, "processed": processed_data, "status": "completed" } async def main(): """Main worker entry point.""" logger.info("Starting AGNT5 worker...") worker = Worker( service_name="basic-worker", service_version="1.0.0" ) try: await worker.run() except KeyboardInterrupt: logger.info("Worker stopped by user") except Exception as e: logger.error(f"Worker error: {e}") raise if __name__ == "__main__": asyncio.run(main()) ``` ## Running the Worker ### Deploy to AGNT5 ```bash # Authenticate and deploy agnt5 auth login agnt5 deploy ``` ### Run the Worker Locally ```bash # Run the worker python worker.py ``` Expected output: ``` INFO:__main__:Starting AGNT5 worker... INFO:agnt5.worker:Starting worker for service: basic-worker INFO:agnt5.worker:Registered function: greet INFO:agnt5.worker:Registered function: math.add INFO:agnt5.worker:Registered function: process_data INFO:agnt5.worker:Worker running, waiting for tasks... ``` ## Testing Functions ### Using HTTP API Test functions via the AGNT5 Gateway: ```bash # Test greet function curl -X POST http://localhost:8080/call \ -H "Content-Type: application/json" \ -d '{ "serviceName": "basic-worker", "handlerName": "greet", "inputData": "QWxpY2U=" }' # Test math function curl -X POST http://localhost:8080/call \ -H "Content-Type: application/json" \ -d '{ "serviceName": "basic-worker", "handlerName": "math.add", "inputData": "eyJhIjogNSwgImIiOiAzfQ==" }' ``` **Input Data Encoding**: The `inputData` field expects base64-encoded JSON. Use tools like `echo '{"a": 5, "b": 3}' | base64` to encode data. ### Using Python Client ```python title="test_client.py" import asyncio import json import base64 from agnt5 import Client async def test_functions(): """Test all worker functions.""" client = Client("http://localhost:8080") # Test greet function name_data = base64.b64encode(json.dumps("Alice").encode()).decode() result = await client.call( service_name="basic-worker", handler_name="greet", input_data=name_data ) print(f"Greet result: {result}") # Test math function math_data = base64.b64encode(json.dumps({"a": 10, "b": 5}).encode()).decode() result = await client.call( service_name="basic-worker", handler_name="math.add", input_data=math_data ) print(f"Math result: {result}") # Test data processing process_data = base64.b64encode(json.dumps({ "name": "john doe", "status": "active", "count": 42 }).encode()).decode() result = await client.call( service_name="basic-worker", handler_name="process_data", input_data=process_data ) print(f"Process result: {result}") if __name__ == "__main__": asyncio.run(test_functions()) ``` ## Error Handling Add error handling to make functions more robust: ```python title="robust_worker.py" import asyncio import logging from agnt5 import Worker, function logger = logging.getLogger(__name__) @function() def safe_divide(a: float, b: float) -> dict: """Safely divide two numbers with error handling.""" try: if b == 0: return { "error": "Division by zero", "result": None } result = a / b logger.info(f"Division successful: {a} / {b} = {result}") return { "result": result, "error": None } except Exception as e: logger.error(f"Unexpected error: {e}") return { "error": str(e), "result": None } @function() def validate_email(email: str) -> dict: """Validate email address format.""" import re try: # Basic email validation pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' is_valid = bool(re.match(pattern, email)) return { "email": email, "valid": is_valid, "message": "Valid email" if is_valid else "Invalid email format" } except Exception as e: logger.error(f"Email validation error: {e}") return { "email": email, "valid": False, "message": f"Validation error: {e}" } async def main(): worker = Worker("robust-worker") await worker.run() if __name__ == "__main__": asyncio.run(main()) ``` ## Configuration ### Environment Variables ```bash # Service configuration export AGNT5_SERVICE_NAME=basic-worker export AGNT5_SERVICE_VERSION=1.0.0 # Coordinator endpoint export AGNT5_COORDINATOR_ENDPOINT=http://localhost:9091 # Logging export AGNT5_LOG_LEVEL=DEBUG python worker.py ``` ### Configuration File ```python title="config_worker.py" import os import asyncio import logging from agnt5 import Worker, function from agnt5.logging import install_opentelemetry_logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Install telemetry install_opentelemetry_logging(logger=logger, level=logging.INFO) @function() def configured_handler(data: dict) -> dict: """Handler with access to configuration.""" return { "service_name": os.getenv("AGNT5_SERVICE_NAME", "unknown"), "service_version": os.getenv("AGNT5_SERVICE_VERSION", "1.0.0"), "data": data } async def main(): # Worker with configuration worker = Worker( service_name=os.getenv("AGNT5_SERVICE_NAME", "configured-worker"), service_version=os.getenv("AGNT5_SERVICE_VERSION", "1.0.0"), coordinator_endpoint=os.getenv("AGNT5_COORDINATOR_ENDPOINT", "http://localhost:9091") ) await worker.run() if __name__ == "__main__": asyncio.run(main()) ``` ## Production Deployment ### Dockerfile ```dockerfile title="Dockerfile" FROM python:3.11-slim WORKDIR /app # Install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application COPY worker.py . # Health check HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1 # Run worker CMD ["python", "worker.py"] ``` ### Docker Compose ```yaml title="docker-compose.yml" version: '3.8' services: worker: build: . environment: - AGNT5_SERVICE_NAME=basic-worker - AGNT5_SERVICE_VERSION=1.0.0 - AGNT5_COORDINATOR_ENDPOINT=http://coordinator:9091 - AGNT5_LOG_LEVEL=INFO restart: unless-stopped depends_on: - coordinator networks: - agnt5 coordinator: image: agnt5/coordinator:latest ports: - "9091:9091" networks: - agnt5 networks: agnt5: driver: bridge ``` ## Next Steps - [ASGI Server Example](asgi-server) - Web application integration - [Workflow Example](workflow-example) - Multi-step orchestration - [Error Handling Patterns](error-handling) - Comprehensive error handling --- ## Functions _Source: https://agnt5.com/sdk/python/functions_ > Handler decorators and function execution in the AGNT5 Python SDK Functions are the core building blocks of AGNT5 applications. Use the `@function` decorator to register Python callables as invokable components that can be discovered and executed by the platform. ## Basic Usage ### Simple Function ```python from agnt5 import function @function() def greet_user(name: str) -> str: """Greet a user by name.""" return f"Hello, {name}!" ``` ### Named Function Override the registered name: ```python @function("math.add") def add_numbers(a: int, b: int) -> int: """Add two numbers together.""" return a + b ``` ### Function with Context Access execution metadata through the context parameter: ```python from agnt5 import function from agnt5.components import ExecutionContext @function() def context_aware(ctx: ExecutionContext, data: dict) -> dict: """Process data with execution context.""" return { "invocation_id": ctx.invocation_id, "service_name": ctx.metadata.get("service_name"), "processed_data": data, "component_type": ctx.component_type.value } ``` ## Decorator Parameters ### `function(name=None)` | Parameter | Type | Description | |-----------|------|-------------| | `name` | `str \| None` | Override the registered function name. Defaults to the original function name. | ```python # Uses function name "process_data" @function() def process_data(data: dict) -> dict: return data # Uses custom name "data_processor" @function("data_processor") def process_data(data: dict) -> dict: return data ``` ## Handler Signatures AGNT5 supports flexible function signatures to accommodate different use cases. ### Without Context For simple stateless functions: ```python @function() def calculate_tax(amount: float, rate: float) -> float: return amount * rate @function() def format_message(template: str, **kwargs) -> str: return template.format(**kwargs) ``` ### With Context When you need access to invocation metadata: ```python @function() def audit_handler(ctx: ExecutionContext, action: str, data: dict) -> dict: """Handler that logs audit information.""" import logging logger = logging.getLogger(__name__) logger.info(f"Audit: {action} from {ctx.invocation_id}") return { "action": action, "invocation_id": ctx.invocation_id, "data": data, "timestamp": time.time() } ``` ## Async Functions AGNT5 supports both synchronous and asynchronous functions: ### Async Handler ```python import asyncio from agnt5 import function @function() async def async_processor(data: dict) -> dict: """Async processing with I/O operations.""" # Simulate async I/O await asyncio.sleep(0.1) # Async HTTP request example async with httpx.AsyncClient() as client: response = await client.post( "https://api.example.com/process", json=data ) external_result = response.json() return { "original": data, "external": external_result, "processed_at": datetime.utcnow().isoformat() } ``` ### Async with Context ```python @function() async def async_context_handler(ctx: ExecutionContext, query: str) -> dict: """Async handler with context access.""" # Use context for correlation correlation_id = ctx.metadata.get("correlation_id", ctx.invocation_id) # Async database query result = await database.execute( "SELECT * FROM items WHERE name LIKE %s", f"%{query}%" ) return { "correlation_id": correlation_id, "query": query, "results": [dict(row) for row in result] } ``` ## Streaming Functions For functions that need to return multiple responses over time: ### Basic Streaming ```python from agnt5 import function @function(streaming=True) async def stream_data(count: int): """Stream multiple data chunks.""" for i in range(count): yield { "chunk": i, "data": f"Data chunk {i}", "timestamp": time.time() } await asyncio.sleep(0.1) # Simulate processing time ``` ### Streaming with Context ```python @function(streaming=True) async def stream_with_context(ctx: ExecutionContext, query: str): """Stream search results progressively.""" search_id = ctx.invocation_id # Stream results as they're found async for result in search_engine.stream_search(query): yield { "search_id": search_id, "result": result, "timestamp": time.time() } ``` ## Function Metadata The SDK automatically captures and provides metadata about registered functions: ### Inspecting Functions ```python from agnt5.decorators import get_registered_functions, get_function_metadata # Get all registered functions functions = get_registered_functions() print(f"Registered functions: {list(functions.keys())}") # Get metadata for a specific function @function() def sample_function(name: str, age: int = 25) -> dict: return {"name": name, "age": age} metadata = get_function_metadata(sample_function) print(metadata) ``` **Output:** ```python { "name": "sample_function", "type": "function", "parameters": [ {"name": "name", "type": "str", "required": True}, {"name": "age", "type": "int", "required": False, "default": 25} ], "return_type": "dict" } ``` ### Runtime Annotations The decorator adds runtime annotations to functions: ```python @function() def annotated_function(data: str) -> str: return data.upper() # Check annotations print(annotated_function._agnt5_handler_name) # "annotated_function" print(annotated_function._agnt5_is_function) # True ``` ## Error Handling ### Basic Error Handling ```python @function() def safe_divider(a: float, b: float) -> dict: """Safely divide two numbers.""" try: if b == 0: return {"error": "Division by zero", "result": None} result = a / b return {"result": result, "error": None} except Exception as e: return {"error": str(e), "result": None} ``` ### Context-Aware Error Handling ```python import logging from agnt5 import function from agnt5.components import ExecutionContext @function() def robust_handler(ctx: ExecutionContext, data: dict) -> dict: """Handler with comprehensive error handling.""" logger = logging.getLogger(__name__) try: # Log the invocation logger.info(f"Processing invocation {ctx.invocation_id}") # Validate input if not isinstance(data, dict): raise ValueError("Input must be a dictionary") required_fields = ["id", "name"] missing_fields = [field for field in required_fields if field not in data] if missing_fields: raise ValueError(f"Missing required fields: {missing_fields}") # Process data result = { "processed": True, "id": data["id"], "name": data["name"].upper(), "invocation_id": ctx.invocation_id } logger.info(f"Successfully processed {data['id']}") return result except ValueError as e: logger.warning(f"Validation error in {ctx.invocation_id}: {e}") return {"error": f"Validation error: {e}", "result": None} except Exception as e: logger.error(f"Unexpected error in {ctx.invocation_id}: {e}") return {"error": "Internal error", "result": None} ``` ## Type Annotations Use Python type hints for better documentation and validation: ### Basic Types ```python from typing import Dict, List, Optional, Union @function() def typed_handler( name: str, age: int, tags: List[str], metadata: Optional[Dict[str, any]] = None ) -> Dict[str, Union[str, int, List[str]]]: """Handler with comprehensive type annotations.""" return { "name": name, "age": age, "tags": tags, "has_metadata": metadata is not None } ``` ### Pydantic Models For complex data validation: ```python from pydantic import BaseModel, Field from typing import Optional from agnt5 import function class UserRequest(BaseModel): name: str = Field(..., min_length=1, max_length=100) email: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$') age: Optional[int] = Field(None, ge=0, le=150) class UserResponse(BaseModel): id: str name: str email: str age: Optional[int] created_at: str @function() def create_user(request: UserRequest) -> UserResponse: """Create a user with validation.""" user_id = generate_user_id() return UserResponse( id=user_id, name=request.name, email=request.email, age=request.age, created_at=datetime.utcnow().isoformat() ) ``` ## Testing Functions ### Direct Testing Test functions directly without the full platform: ```python import pytest from agnt5.decorators import execute_component from agnt5.components import ExecutionContext, ComponentType def test_greet_function(): # Test with execute_component result = execute_component( "greet_user", b'{"name": "Alice"}', context=None ) # Result is JSON bytes import json parsed = json.loads(result.decode()) assert parsed == "Hello, Alice!" def test_context_function(): # Create mock context ctx = ExecutionContext( invocation_id="test-123", component_type=ComponentType.FUNCTION ) # Test directly result = context_aware(ctx, {"test": "data"}) assert result["invocation_id"] == "test-123" ``` ### Async Testing ```python import pytest import asyncio @pytest.mark.asyncio async def test_async_function(): result = await async_processor({"test": "data"}) assert "original" in result assert "processed_at" in result ``` ### Mock Context Testing ```python from unittest.mock import Mock def test_with_mock_context(): # Create mock context mock_ctx = Mock(spec=ExecutionContext) mock_ctx.invocation_id = "mock-123" mock_ctx.component_type = ComponentType.FUNCTION mock_ctx.metadata = {"service_name": "test-service"} # Test function result = context_aware(mock_ctx, {"test": "data"}) assert result["invocation_id"] == "mock-123" assert result["service_name"] == "test-service" ``` ## Function Registry ### Registry Management ```python from agnt5.decorators import ( get_registered_functions, clear_registry, get_function_metadata ) # Get all registered functions functions = get_registered_functions() # Clear registry (useful for testing) clear_registry() # Re-register functions @function() def new_function(data: str) -> str: return data.upper() # Inspect metadata metadata = get_function_metadata(new_function) ``` ### Custom Registration For advanced use cases, register functions manually: ```python from agnt5.decorators import register_function def my_handler(data: str) -> str: return data.lower() # Manual registration register_function("custom_handler", my_handler) ``` ## Best Practices ### Function Design 1. **Keep functions focused** - Each function should have a single responsibility 2. **Use type hints** - Improve documentation and enable validation 3. **Handle errors gracefully** - Return error information rather than raising exceptions 4. **Log appropriately** - Use structured logging for debugging and monitoring ### Performance 1. **Minimize imports** - Import only what you need 2. **Use async for I/O** - Async functions for database queries and API calls 3. **Cache expensive operations** - Use local caching for repeated computations 4. **Batch operations** - Process multiple items together when possible ### Testing 1. **Test functions directly** - Unit test without the platform 2. **Mock external dependencies** - Use mocks for databases, APIs, etc. 3. **Test error conditions** - Ensure error handling works correctly 4. **Use fixtures** - Share common test data and setup ## Next Steps - [Workflows](workflows) - Multi-step orchestration patterns - [Worker Runtime](worker) - Configure and deploy workers - [API Reference](api/decorators) - Complete decorator API reference - [Examples](examples/basic-worker) - Real-world function examples --- ## Getting Started _Source: https://agnt5.com/sdk/python/getting-started_ > Installation and first steps with the AGNT5 Python SDK Get up and running with the AGNT5 Python SDK in minutes. This guide covers installation, your first worker, and local development setup. ## Installation ### System Requirements - Python 3.8 or higher - pip or uv package manager ### Install from PyPI ```bash pip install agnt5 ``` ### Development Installation For development or contributing to the SDK: ```bash git clone https://github.com/agnt5/agnt5 cd agnt5/sdk/sdk-python pip install -e . ``` ### Verify Installation ```python import agnt5 print(agnt5.__version__) ``` ## First Worker Create a simple worker with a greeting function: ```python title="worker.py" import asyncio from agnt5 import Worker, function @function() def greet(name: str) -> str: """Greet a user by name.""" return f"Hello, {name}!" @function("math_add") def add_numbers(a: int, b: int) -> int: """Add two numbers together.""" return a + b async def main(): worker = Worker(service_name="hello-service") await worker.run() if __name__ == "__main__": asyncio.run(main()) ``` ## Deploy to AGNT5 ### Install the CLI and Authenticate ```bash # Install the AGNT5 CLI brew install agnt5/tap/agnt5 # macOS # or curl -LsSf https://agnt5.com/cli.sh | bash # Linux # Authenticate agnt5 auth login # Deploy your worker agnt5 deploy ``` ### Run Your Worker Locally In a new terminal, run your worker: ```bash python worker.py ``` You should see output like: ``` INFO:agnt5.worker:Starting worker for service: hello-service INFO:agnt5.worker:Registered function: greet INFO:agnt5.worker:Registered function: math_add INFO:agnt5.worker:Worker running, waiting for tasks... ``` ## Test Your Functions ### Using HTTP API Test your functions using the Gateway HTTP API: ```bash # Test the greet function curl -X POST http://localhost:8080/call \ -H "Content-Type: application/json" \ -d '{ "serviceName": "hello-service", "handlerName": "greet", "inputData": "QWxpY2U=" }' ``` The `inputData` is base64-encoded JSON. For `"Alice"`, the base64 is `"QWxpY2U="`. ### Using Python Client ```python import asyncio import json import base64 from agnt5 import Client async def test_functions(): client = Client("http://localhost:8080") # Test greet function name = "Alice" input_data = base64.b64encode(json.dumps(name).encode()).decode() result = await client.call( service_name="hello-service", handler_name="greet", input_data=input_data ) print(f"Greeting result: {result}") # Test add function numbers = {"a": 5, "b": 3} input_data = base64.b64encode(json.dumps(numbers).encode()).decode() result = await client.call( service_name="hello-service", handler_name="math_add", input_data=input_data ) print(f"Addition result: {result}") asyncio.run(test_functions()) ``` ## ASGI Integration For web applications, use the ASGI runtime: ```python title="asgi_app.py" from agnt5 import Worker, function @function() def api_handler(request: dict) -> dict: return { "message": "Hello from AGNT5!", "received": request } # Create ASGI application app = Worker("web-service", runtime="asgi") app.enable_cors() # Enable CORS for browser access ``` Run with uvicorn: ```bash pip install uvicorn uvicorn asgi_app:app --reload --port 8000 ``` Test the ASGI endpoints: ```bash # Health check curl http://localhost:8000/health # List available functions curl http://localhost:8000/functions # Call a function curl -X POST http://localhost:8000/invoke/api_handler \ -H "Content-Type: application/json" \ -d '{"test": "data"}' ``` ## Configuration ### Environment Variables Configure your worker using environment variables: ```bash export AGNT5_COORDINATOR_ENDPOINT=http://localhost:9091 export AGNT5_SERVICE_NAME=my-service export AGNT5_LOG_LEVEL=DEBUG python worker.py ``` ### Configuration in Code ```python import logging from agnt5 import Worker from agnt5.logging import install_opentelemetry_logging # Configure logging logging.basicConfig(level=logging.INFO) install_opentelemetry_logging() # Create worker with custom configuration worker = Worker( service_name="configured-service", service_version="1.2.0", coordinator_endpoint="http://localhost:9091", runtime="standalone" ) ``` ## Error Handling Handle errors gracefully in your functions: ```python from agnt5 import function import logging logger = logging.getLogger(__name__) @function() def safe_divide(a: float, b: float) -> dict: try: if b == 0: return {"error": "Division by zero", "result": None} result = a / b logger.info(f"Division successful: {a} / {b} = {result}") return {"result": result, "error": None} except Exception as e: logger.error(f"Unexpected error: {e}") return {"error": str(e), "result": None} ``` ## Development Tips ### Hot Reload During development, restart your worker when code changes: ```python title="dev_worker.py" import asyncio import sys from pathlib import Path from agnt5 import Worker, function # Add auto-reload during development if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nWorker stopped") sys.exit(0) ``` ### Debugging Enable debug logging to see detailed execution information: ```python import logging logging.basicConfig(level=logging.DEBUG) from agnt5.logging import install_opentelemetry_logging install_opentelemetry_logging(level=logging.DEBUG) ``` ### Testing Functions Test your functions locally without the full platform: ```python from agnt5.decorators import execute_component # Test function directly result = execute_component("greet", b'{"name": "Alice"}') print(result) ``` ## Next Steps ### Core Primitives - [Functions](functions) - Stateless operations with retries - [Entities](entity) - Stateful components with unique keys - [Workflows](workflows) - Multi-step orchestration - [Context API](context) - Full API reference ### Agent Development Kit - [Agents](agent) - Autonomous LLM-driven systems - [Tools](tool) - Extend agent capabilities - [Sessions](session) - Conversation management - [Memory](memory) - Long-term knowledge storage ### Configuration - [Worker Runtime](worker) - Configure and deploy workers - [Examples](examples/) - Real-world usage patterns --- ## Memory _Source: https://agnt5.com/sdk/python/memory_ > Long-term knowledge storage with semantic search for agents Memory is a long-term knowledge storage system that enables agents to remember facts, preferences, and context across conversations. Unlike Session state (short-term), Memory provides persistent, searchable knowledge that agents build upon over time. ## Key Characteristics - **Long-Term Persistence** - Knowledge survives across sessions and conversations - **Semantic Search** - Find relevant memories using natural language queries - **Smart Ingestion** - Automatically extract important facts using LLMs - **Multiple Backends** - InMemory (dev), Vector (semantic), Database (persistent) - **Built on Entity** - Inherits durability and consistency - **Cross-Session** - Shared knowledge accessible to all agents ## Basic Usage ### Creating Memory Development Production (Vector) Production (Database) ```python from agnt5 import Memory from agnt5.memory import InMemoryService # In-memory storage (no persistence) memory = Memory(service=InMemoryService()) ``` ```python from agnt5 import Memory from agnt5.memory import VectorMemoryService # Vector storage with semantic search memory = Memory(service=VectorMemoryService( embedding_model="text-embedding-3-small", vector_store="qdrant" )) ``` ```python from agnt5 import Memory from agnt5.memory import DatabaseMemoryService # Database storage for reliability memory = Memory(service=DatabaseMemoryService( connection_string="postgresql://..." )) ``` ### Storing and Retrieving Memories ```python # Store individual memories await memory.store( key="user_role", content="Senior Software Engineer specializing in distributed systems", type="user_info", confidence=0.95 ) await memory.store( key="project_context", content="Building a real-time analytics platform for financial data", type="project_info" ) # Retrieve specific memories memories = await memory.recall(["user_role", "project_context"]) for mem in memories: print(f"{mem.key}: {mem.content}") ``` ### Semantic Search ```python # Search using natural language results = await memory.search( query="What does the user know about databases?", limit=5 ) for result in results: print(f"Score: {result.score:.2f}") print(f"Content: {result.content}") print(f"Source: {result.metadata.get('source_session')}") ``` ### Integration with Agents ```python from agnt5 import Agent, Session, Memory, LanguageModel # Create memory memory = Memory(service=VectorMemoryService()) # Store long-term knowledge await memory.store("user_expertise", "PhD in Machine Learning, specializes in NLP") await memory.store("preferred_tools", "Prefers PyTorch over TensorFlow") # First conversation session1 = Session(id="conv-001", user_id="researcher-123") lm = LanguageModel() agent = Agent(name="assistant", model=lm, memory=memory, session=session1) await agent.run("Help me implement attention mechanisms") # Agent recalls user's ML expertise and PyTorch preference # Later conversation (different session) session2 = Session(id="conv-042", user_id="researcher-123") agent2 = Agent(name="assistant", model=lm, memory=memory, session=session2) await agent2.run("Review my transformer code") # Agent still remembers user's background and preferences ``` ## Smart Ingestion Automatically extract and store important information from conversations: ```python # Agent conversation session = Session(id="consultation-123", user_id="user-456") agent = Agent(name="advisor", memory=memory, session=session) await agent.run("I'm building a recommendation system for e-commerce") await agent.run("We have 10 million users and need sub-100ms latency") await agent.run("Our team is experienced with Python and Go") # Extract and store important facts memory_keys = await memory.ingest_from_session( session, strategy="smart" # Uses LLM to identify important facts ) # Memory now contains: # - "User building recommendation system for e-commerce" # - "System requirements: 10M users, <100ms latency" # - "Team expertise: Python, Go" # Future conversations automatically recall these facts ``` ## Ingestion Strategies Choose the right strategy for your use case: Smart Entities Summary ```python # LLM identifies important facts await memory.ingest_from_session(session, strategy="smart") ``` Best for: General conversations where important facts need to be identified ```python # Extract names, technologies, organizations await memory.ingest_from_session(session, strategy="entities") ``` Best for: Extracting structured information like people, companies, technologies ```python # Store conversation summary await memory.ingest_from_session(session, strategy="summary") ``` Best for: Creating concise summaries of long conversations ## Common Patterns ### User Profile Memory Build comprehensive user profiles over time: ```python class UserProfileMemory: def __init__(self, user_id: str, memory: Memory): self.user_id = user_id self.memory = memory self.prefix = f"user_{user_id}_" async def store_preference(self, category: str, value: str): """Store user preference.""" await self.memory.store( key=f"{self.prefix}pref_{category}", content=value, type="preference", user_id=self.user_id ) async def store_expertise(self, domain: str, level: str, details: str): """Store user expertise.""" await self.memory.store( key=f"{self.prefix}expertise_{domain}", content=f"{level} expertise in {domain}: {details}", type="expertise", user_id=self.user_id ) async def get_profile(self) -> Dict[str, Any]: """Retrieve complete user profile.""" results = await self.memory.search( query=f"user {self.user_id} profile preferences expertise", limit=50 ) profile = { "preferences": {}, "expertise": {}, "context": [] } for result in results: if result.metadata.get("type") == "preference": category = result.key.replace(f"{self.prefix}pref_", "") profile["preferences"][category] = result.content elif result.metadata.get("type") == "expertise": domain = result.key.replace(f"{self.prefix}expertise_", "") profile["expertise"][domain] = result.content return profile # Usage user_memory = UserProfileMemory("user-123", memory) await user_memory.store_preference("language", "Python") await user_memory.store_expertise("ml", "advanced", "10+ years in NLP") profile = await user_memory.get_profile() ``` ### Conversation Summarization Archive long sessions as summaries: ```python async def archive_session_to_memory( session: Session, memory: Memory, summary_threshold: int = 50 ): """Archive long sessions as summaries in memory.""" history = await session.history() if len(history) > summary_threshold: # Generate comprehensive summary from agnt5 import LanguageModel lm = LanguageModel() conversation = "\n".join([ f"{msg.role}: {msg.content}" for msg in history ]) summary = await lm.generate( prompt=f"""Summarize this conversation in 2-3 paragraphs. Focus on key decisions, important facts, and outcomes. Conversation: {conversation} Summary:""", max_tokens=300 ) # Store summary in memory await memory.store( key=f"session_summary_{session.id}", content=summary.text, type="session_summary", session_id=session.id, user_id=session.user_id, message_count=len(history) ) # Optionally prune session to save space await session.prune(strategy="keep_summary") # Usage await archive_session_to_memory(session, memory, summary_threshold=100) ``` ### Learning Agent Agent that learns from every interaction: ```python class LearningAgent: def __init__(self, name: str, model, tools, memory: Memory): self.agent = Agent(name=name, model=model, tools=tools, memory=memory) self.memory = memory async def run_and_learn(self, prompt: str, session: Session): """Execute task and learn from interaction.""" # Run agent result = await self.agent.run(prompt, session=session) # Extract and store learnings memory_keys = await self.memory.ingest_from_session( session, strategy="smart" ) # Store outcome for future reference await self.memory.store( key=f"interaction_{session.id}", content=f"Task: {prompt}\nOutcome: {result.output}", type="interaction_history", success=result.status == "completed" ) return result async def recall_similar_tasks(self, prompt: str) -> List[Dict]: """Find similar past interactions.""" return await self.memory.search( query=f"Similar to: {prompt}", limit=5 ) # Usage learning_agent = LearningAgent("assistant", lm, tools, memory) # Agent learns from each interaction result = await learning_agent.run_and_learn( "Debug this performance issue", session ) # Later: Recall similar past work similar = await learning_agent.recall_similar_tasks( "Another performance problem" ) ``` ## Best Practices ### 1. Distinguish Memory from Session State Use Memory for long-term, Session for short-term: Comparison Example | Aspect | Session State | Memory | | --- | --- | --- | | Lifetime | Single conversation | Indefinite | | Scope | Session-specific | Cross-session | | Search | Direct key access | Semantic search | | Purpose | Current context | Long-term knowledge | | Example | Shopping cart | User preferences | ```python # Session State - SHORT-TERM, conversation-specific session.set_state("current_task", "analyzing code") session.set_state("files_open", ["main.py", "test.py"]) # Memory - LONG-TERM, cross-conversation await memory.store("user_expertise", "Expert in Python") await memory.store("coding_style", "Prefers functional programming") ``` ### 2. Add Metadata for Better Retrieval Enrich memories with metadata: ```python await memory.store( key="technical_decision_001", content="Chose PostgreSQL for transactional data, Redis for caching", type="decision", category="architecture", confidence=0.9, source_session="planning-session-789", timestamp=datetime.now(), decision_maker="tech-lead-123", rationale="Need ACID guarantees and high read performance" ) # Later: Search by metadata postgres_decisions = await memory.search( query="database decisions", filter={"type": "decision", "category": "architecture"} ) ``` ### 3. Implement Memory Maintenance Manage memory lifecycle: ```python async def maintain_memory(memory: Memory, user_id: str): """Prune old or low-confidence memories.""" # Remove outdated information await memory.forget([ key for key in await memory.list_keys(user_id=user_id) if is_outdated(key) ]) # Update confidence scores based on usage for key in await memory.list_keys(user_id=user_id): mem = await memory.recall([key]) if mem[0].metadata.get("last_accessed"): days_since_access = calculate_days_since( mem[0].metadata["last_accessed"] ) new_confidence = calculate_confidence_decay( mem[0].metadata.get("confidence", 1.0), days_since_access ) await memory.update(key, confidence=new_confidence) ``` ## Session State vs Memory | Aspect | Session State | Memory | | --- | --- | --- | | Lifetime | Single conversation | Indefinite | | Scope | Session-specific | Cross-session | | Search | Direct key access | Semantic search | | Purpose | Current context | Long-term knowledge | | Storage | Entity state | Entity + Vector DB | | Example | Shopping cart | User preferences | **When to use Session State:** - Current conversation context - Temporary workflow state - UI state and navigation **When to use Memory:** - User profile and preferences - Historical interactions - Domain knowledge - Learned facts and insights ## Next Steps - [Session](session) - Short-term conversation state - [Agent](agent) - Agents use Memory for context - [Entity](entity) - Underlying primitive for Memory - [Context API](context) - Memory context operations --- ## Sessions _Source: https://agnt5.com/sdk/python/session_ > Conversation containers with scoped state and multi-agent coordination Sessions are conversation containers built on the Entity primitive. They manage multi-turn interactions between users and AI agents, providing structured state management, message history, and audit trails. ## Key Characteristics - **Built on Entity** - Inherits durability and consistency from Entity - **Scoped State** - Organize state with session/user/app/temp scopes - **Message History** - Automatic conversation tracking with metadata - **Multi-Agent Ready** - Share context across multiple agents - **Audit Trail** - Complete history of interactions for compliance - **Flexible Retention** - Configurable data retention policies ## Basic Usage ### Creating a Session ```python from agnt5 import Session # Create session with user context session = Session( id="conv-2024-001", app_name="research_assistant", user_id="user-123", metadata={"project": "ai-safety-research"} ) ``` ### Managing Messages Send Messages Get History ```python # Append messages to conversation history await session.send_message({ "role": "user", "content": "What are the key challenges in AI alignment?" }) await session.send_message({ "role": "assistant", "content": "The main challenges include value learning and robustness." }) ``` ```python # Get last 10 messages messages = await session.history(limit=10) # Get all messages all_messages = await session.history() # Filter by role user_messages = [m for m in all_messages if m["role"] == "user"] ``` ### Scoped State Management Session state uses four scopes for different persistence levels: ```python # Session scope - conversation-specific (default) session.set_state("shopping_cart", ["item1", "item2"]) session.set_state("current_step", "checkout") # User scope - persists across all user sessions session.set_state("language", "English", scope="user") session.set_state("timezone", "America/Los_Angeles", scope="user") # App scope - application-wide global state session.set_state("api_version", "v2", scope="app") session.set_state("feature_flags", {"new_ui": True}, scope="app") # Temp scope - temporary invocation-specific session.set_state("processing_step", "validation", scope="temp") ``` ### Integration with Agents ```python from agnt5 import Agent, Session, LanguageModel # Create session session = Session( id="support-ticket-456", user_id="customer-789", metadata={"ticket_type": "billing"} ) # Create agent with session lm = LanguageModel() agent = Agent( name="support_agent", model=lm, instructions="You are a helpful customer support agent.", tools=[search_kb_tool, create_ticket_tool], session=session ) # Agent automatically uses session for context result = await agent.run("I need help with my recent charge") # Session maintains full conversation history history = await session.history() ``` ## Common Patterns ### Multi-Agent Coordination Share context across multiple specialized agents: ```python # Create shared session session = Session( id="research-workflow-001", user_id="researcher-123", metadata={"project": "quantum-computing-review"} ) # Set shared context session.set_state("research_topic", "quantum error correction") session.set_state("target_depth", "comprehensive") # Multiple specialized agents work together literature_agent = Agent( name="literature_reviewer", session=session, tools=[paper_search] ) code_agent = Agent( name="code_analyzer", session=session, tools=[github_search] ) synthesis_agent = Agent( name="synthesizer", session=session, tools=[document_tool] ) # Execute research pipeline papers = await literature_agent.run("Find recent papers") implementations = await code_agent.run("Find implementations") report = await synthesis_agent.run("Synthesize findings") # All agents see shared context and each other's work full_history = await session.history() ``` ### Agent Handoff Pattern Seamlessly transfer conversations between specialized agents: ```python session = Session(id="customer-inquiry-789", user_id="customer-456") # Coordinator routes to appropriate specialist coordinator = Agent( name="router", session=session, tools=[classification_tool] ) routing = await coordinator.run("How do I upgrade my subscription?") if routing.category == "billing": # Billing agent gets full conversation context billing_agent = Agent( name="billing_specialist", session=session, tools=[billing_tools] ) result = await billing_agent.run("Continue from coordinator's analysis") # billing_agent sees all previous messages elif routing.category == "technical": tech_agent = Agent( name="tech_support", session=session, tools=[tech_tools] ) result = await tech_agent.run("Handle technical inquiry") ``` ### Session State vs Memory Comparison Example | Aspect | Session State | Memory | | --- | --- | --- | | Scope | Conversation-specific | Cross-conversation | | Lifetime | Cleared after session | Persists indefinitely | | Use Case | Current context | Long-term knowledge | | Example | Current task, cart items | User preferences, history | ```python from agnt5 import Session, Memory session = Session(id="consultation-123", user_id="user-456") memory = Memory(service=VectorMemoryService()) # Session State - SHORT-TERM, conversation-specific session.set_state("current_diagnosis", "initial assessment") session.set_state("symptoms_discussed", ["headache", "fatigue"]) # Memory - LONG-TERM, cross-conversation knowledge await memory.store("patient_history", "Chronic migraines, diagnosed 2020") await memory.store("medication_allergies", "Penicillin, Sulfa drugs") # Session state clears after conversation # Memory persists indefinitely across all sessions ``` ### Session Export and Audit ```python # Create session with audit metadata session = Session( id="compliance-audit-001", user_id="analyst-789", metadata={ "regulation": "SOC2", "audit_period": "Q4-2024", "auditor": "external-firm" }, retention={"ttl_days": 730} # 2 years ) # Conduct conversation with full tracking agent = Agent(name="data_analyst", session=session) await agent.run("Analyze user access patterns") # Export session for compliance review jsonl_export = await session.export(format="jsonl") # Each line contains: timestamp, role, message, metadata, tool_calls # Query specific events recent_events = await session.events(since="2024-01-01", limit=100) # Prune old messages while keeping metadata await session.prune(strategy="keep_last_50") ``` ### Long-Running Sessions with Pruning ```python # Create session with automatic pruning session = Session( id="long-conversation-456", user_id="user-123", metadata={"type": "ongoing_project"} ) # After many interactions, prune intelligently await session.prune(strategy="keep_important") # Uses LLM await session.prune(strategy="sliding_window", window_size=100) await session.prune(strategy="summarize_old", threshold=50) # Session remains performant even with thousands of messages ``` ## Configuration ### Session Parameters | Parameter | Type | Description | | --- | --- | --- | | `id` | `str` | Unique session identifier | | `app_name` | `str \| None` | Application name | | `user_id` | `str \| None` | User identifier | | `metadata` | `dict \| None` | Session metadata | | `retention` | `dict \| None` | Retention policy configuration | ### Retention Policies Configure data retention: Compliance Performance ```python # For compliance-sensitive applications session = Session( id="healthcare-session", retention={ "ttl_days": 2555, # 7 years (HIPAA) "auto_prune": False, # Manual control "immutable": True # Prevent deletion } ) ``` ```python # For performance-sensitive applications session = Session( id="chat-session", retention={ "ttl_days": 30, # 30-day retention "auto_prune": True, # Automatic cleanup "prune_strategy": "sliding_window", "max_messages": 1000 } ) ``` ## Best Practices ### 1. Use Appropriate State Scopes Match state scope to persistence requirements: ```python # ✓ Session scope - conversation-specific session.set_state("current_page", 3) session.set_state("draft_document", content) # ✓ User scope - user preferences session.set_state("theme", "dark", scope="user") session.set_state("notification_preference", "email", scope="user") # ✓ App scope - global configuration session.set_state("rate_limit", 1000, scope="app") session.set_state("feature_flags", flags, scope="app") # ✓ Temp scope - transient data session.set_state("validation_step", "in_progress", scope="temp") ``` ### 2. Design for Multi-Agent Coordination Structure session state for agent collaboration: ```python # Good - Clear coordination structure session.set_state("workflow_stage", "research") session.set_state("agent_outputs", { "researcher": {"status": "completed", "findings": [...]}, "analyzer": {"status": "in_progress"}, "writer": {"status": "pending"} }) # Agents can check dependencies current_stage = session.get_state("workflow_stage") researcher_output = session.get_state("agent_outputs")["researcher"] ``` ### 3. Implement Retention Strategies Manage session lifecycle appropriately: ```python # For regulated industries session = Session( id="medical-consultation", retention={ "ttl_days": 2555, # Legal requirement "immutable": True } ) # For ephemeral conversations session = Session( id="temp-chat", retention={ "ttl_days": 1, # Delete after 1 day "auto_prune": True } ) ``` ## Entity vs Session | Aspect | Entity | Session | | --- | --- | --- | | Purpose | General stateful primitive | Conversation-specific | | State Structure | Flexible key-value | Opinionated message + state | | API | Low-level (get/set/delete) | High-level (send_message/history) | | Scoping | Manual | Built-in (session/user/app/temp) | | Audit | Manual event tracking | Automatic conversation log | | Use Case | Custom stateful components | AI agent conversations | **When to use Entity:** - Building custom stateful patterns - Need complete control over state structure - Non-conversation workloads **When to use Session:** - AI agent conversations - Multi-agent coordination needed - Audit trails required - Standard conversation patterns ## Next Steps - [Entity](entity) - Underlying primitive for Session - [Agent](agent) - Agents use Sessions for context - [Memory](memory) - Long-term storage vs Session state - [Context API](context) - Session context operations --- ## Tools _Source: https://agnt5.com/sdk/python/tool_ > Callable capabilities that extend agent abilities with automatic schema extraction Tools are callable capabilities that extend what agents can do. Tools provide structured interfaces to functions, APIs, services, and other agents, with automatic schema extraction from Python code. ## Key Characteristics - **Automatic Schema** - Extract input/output schemas from docstrings and type hints - **Multiple Types** - Function, Hosted, MCP, OpenAPI, and Agent tools - **Built on Function** - Inherits durability and retry logic - **Confirmation Policies** - Optional user approval for dangerous operations - **Rich Metadata** - Descriptions, examples, and parameter constraints ## Basic Usage ### Function Tools with Auto-Schema The simplest way to create tools is with the `@tool()` decorator: ```python from agnt5 import tool @tool(auto_schema=True) def search_web(query: str, max_results: int = 10) -> List[Dict[str, str]]: """Search the web for information. Args: query: The search query string max_results: Maximum number of results to return Returns: List of search results with title, url, and snippet """ # Implementation return search_results ``` **Schema automatically extracted:** ```json { "name": "search_web", "description": "Search the web for information.", "input_schema": { "type": "object", "properties": { "query": {"type": "string", "description": "The search query string"}, "max_results": {"type": "integer", "default": 10} }, "required": ["query"] } } ``` ### Using Tools with Agents ```python from agnt5 import Agent, tool, LanguageModel @tool(auto_schema=True) def calculate_area(length: float, width: float) -> float: """Calculate the area of a rectangle. Args: length: Length in meters width: Width in meters Returns: Area in square meters """ return length * width lm = LanguageModel() agent = Agent( name="math_assistant", model=lm, tools=[calculate_area], instructions="Help users with geometry calculations." ) result = await agent.run("What's the area of a 5m by 3m room?") # Agent automatically calls calculate_area(5.0, 3.0) ``` ## Tool Types ### Function Tools Direct Python function execution: Simple Complex ```python @tool(auto_schema=True) def get_weather(city: str) -> Dict[str, Any]: """Get current weather for a city.""" # API call return weather_data ``` ```python from typing import List, Optional @tool(auto_schema=True) def search_documentation( query: str, language: str = "python", max_results: int = 5 ) -> List[Dict[str, str]]: """Search official language documentation. Use this tool when you need specific functions, classes, or usage examples from official documentation. Args: query: Function name, class, or concept to search for language: Programming language (python, javascript, go, rust) max_results: Maximum number of results to return Returns: List of documentation sections with title, url, and examples Examples: >>> search_documentation("asyncio.gather", "python") [{"title": "asyncio.gather", "url": "...", "example": "..."}] """ # Implementation return search_results ``` ### Hosted Tools Tools deployed as durable AGNT5 workers: ```python from agnt5 import worker from agnt5.tools import HostedTool # Define worker function @worker.handler def analyze_data(data: Dict) -> Dict: """Worker function for complex data analysis.""" # Heavy computation here return analysis_results # Create hosted tool analysis_tool = HostedTool( name="analyze_data", description="Perform complex data analysis", endpoint="agnt5://data-analysis-service/analyze_data" ) # Use with agent agent = Agent(name="analyst", tools=[analysis_tool]) ``` ### MCP Tools Integrate with Model Context Protocol servers: ```python from agnt5.tools import MCPTool # Connect to MCP server filesystem_tool = MCPTool( name="filesystem", mcp_server_url="http://localhost:3000/mcp", capabilities=["read_file", "write_file", "list_directory"] ) agent = Agent(name="file_assistant", tools=[filesystem_tool]) ``` ### OpenAPI Tools Generate tools from OpenAPI specifications: ```python from agnt5.tools import OpenAPITool # Create tools from OpenAPI spec github_tools = OpenAPITool.from_spec( spec_url="https://api.github.com/openapi.json", operations=["get_repo", "list_issues", "create_issue"] ) agent = Agent(name="github_bot", tools=github_tools) ``` ## Tool Configuration ### Manual Schema Definition For more control, define schemas explicitly: ```python from agnt5 import Tool search_tool = Tool( name="search", description="Search for information", input_schema={ "type": "object", "properties": { "query": {"type": "string", "minLength": 1}, "filters": {"type": "object"} }, "required": ["query"] }, handler=search_function ) ``` ### Confirmation for Dangerous Operations Require user approval for destructive actions: ```python @tool(auto_schema=True, confirmation=True) def delete_database(database_name: str) -> Dict[str, str]: """Delete a database permanently. Args: database_name: Name of the database to delete Returns: Status of deletion operation Warning: This operation is irreversible and will delete all data. """ # Requires human approval before execution pass # Agent proposes deletion but waits for approval agent = Agent(name="admin", tools=[delete_database]) result = await agent.run("Clean up the test database") # User receives confirmation prompt before tool executes ``` ## Common Patterns ### Tool Composition Combine multiple tools for complex capabilities: ```python @tool(auto_schema=True) def search_papers(query: str, year_from: int = 2020) -> List[Dict]: """Search academic papers.""" pass @tool(auto_schema=True) def download_pdf(url: str) -> bytes: """Download PDF document.""" pass @tool(auto_schema=True) def extract_text(pdf_data: bytes) -> str: """Extract text from PDF.""" pass # Agent orchestrates multiple tools research_agent = Agent( name="researcher", tools=[search_papers, download_pdf, extract_text], instructions="Search papers, download them, and extract key findings." ) result = await research_agent.run("Survey recent work on transformers") # Agent chains: search_papers → download_pdf → extract_text ``` ### Tool Error Handling Tools with robust error handling: ```python @tool(auto_schema=True) def fetch_stock_price(symbol: str) -> Dict[str, Any]: """Fetch current stock price. Args: symbol: Stock ticker symbol (e.g., 'AAPL', 'GOOGL') Returns: Stock price data Raises: ValueError: If symbol is invalid ConnectionError: If market data service is unavailable """ try: price_data = market_api.get_price(symbol) return { "symbol": symbol, "price": price_data.current, "change": price_data.change } except InvalidSymbolError: raise ValueError(f"Invalid stock symbol: {symbol}") except MarketAPIError as e: raise ConnectionError(f"Market data unavailable: {e}") # Agent handles tool errors gracefully agent = Agent(name="stock_advisor", tools=[fetch_stock_price]) ``` ### Dynamic Tool Registration Register tools at runtime based on context: ```python # Base toolset base_tools = [search_tool, calculate_tool] # Add specialized tools based on user role if user.role == "admin": admin_tools = [delete_user_tool, modify_permissions_tool] all_tools = base_tools + admin_tools else: all_tools = base_tools agent = Agent( name="assistant", tools=all_tools, instructions=f"You are assisting a {user.role}." ) ``` ### Tool with Context Access Tools can access execution context for advanced operations: ```python from agnt5 import tool, Context @tool(auto_schema=True) async def store_memory(ctx: Context, key: str, value: str) -> Dict[str, str]: """Store information in long-term memory. Args: ctx: Execution context (automatically provided) key: Memory key value: Content to store Returns: Confirmation of storage """ # Access context for durable storage await ctx.memory.set(key, value) return { "status": "stored", "key": key, "timestamp": ctx.now() } # Context is automatically injected when tool is called agent = Agent(name="memory_agent", tools=[store_memory]) ``` ## Best Practices ### 1. Write Clear Tool Descriptions Good descriptions help agents use tools correctly: ✓ Good ✗ Avoid ```python @tool(auto_schema=True) def search_documentation(query: str, language: str = "python") -> List[Dict]: """Search official language documentation for code examples and API references. Use this tool when you need to find specific functions, classes, or usage examples from official documentation. Returns relevant documentation sections with code examples. Args: query: Specific function name, class, or concept to search for language: Programming language (python, javascript, go, rust) Returns: List of documentation sections with title, url, and code examples """ pass ``` ```python @tool(auto_schema=True) def search(q: str) -> List: """Search for stuff.""" # Too vague pass ``` ### 2. Use Type Hints and Docstrings Enable automatic schema extraction: ```python from typing import List, Dict, Optional @tool(auto_schema=True) def analyze_sentiment( text: str, language: str = "en", return_scores: bool = False ) -> Dict[str, Any]: """Analyze sentiment of text. Args: text: Text to analyze (minimum 10 characters) language: ISO language code (en, es, fr, de) return_scores: Include detailed confidence scores Returns: Sentiment analysis with label (positive/negative/neutral) and optional confidence scores """ # Type hints + docstring = complete schema pass ``` ### 3. Implement Confirmation for Dangerous Operations Protect users from destructive actions: ```python # Dangerous operations should require confirmation @tool(auto_schema=True, confirmation=True) def execute_code(code: str, language: str = "python") -> Dict[str, str]: """Execute arbitrary code in a sandboxed environment. Warning: Code execution can be dangerous. Requires explicit user approval. """ pass @tool(auto_schema=True, confirmation=True) def send_email_blast(recipients: List[str], subject: str, body: str) -> Dict: """Send email to multiple recipients. Warning: Bulk email requires confirmation to prevent spam. """ pass ``` ## Function vs Tool | Aspect | Function | Tool | | --- | --- | --- | | Purpose | General computation | Agent capability | | Schema | Optional | Required (auto-generated) | | Discovery | Manual invocation | Agent-driven selection | | Metadata | Basic | Rich (description, examples) | | Use Case | Backend logic | Agent actions | **When to use Function:** - Backend processing - Internal system operations - Not exposed to agents **When to use Tool:** - Agent capabilities - External system integration - User-facing operations ## Next Steps - [Functions](functions) - Underlying primitive for tools - [Agent](agent) - Agents use tools for actions - [Context API](context) - Tool context operations - [Worker](worker) - Hosted tool deployment --- ## Worker Runtime _Source: https://agnt5.com/sdk/python/worker_ > Configure and deploy Python workers for AGNT5 The `Worker` class is the high-level runtime that integrates with the AGNT5 platform, automatically registers decorated components, and handles execution coordination. ## Worker Configuration ### Basic Worker ```python import asyncio from agnt5 import Worker, function @function() def hello(name: str) -> str: return f"Hello, {name}!" async def main(): worker = Worker(service_name="hello-service") await worker.run() if __name__ == "__main__": asyncio.run(main()) ``` ### Configuration Parameters ```python worker = Worker( service_name="my-service", # Required: Service identifier service_version="1.2.0", # Version string (default: "1.0.0") coordinator_endpoint="http://localhost:9091", # Worker coordinator URL runtime="standalone" # Runtime mode: "standalone" or "asgi" ) ``` | Parameter | Type | Description | Default | |-----------|------|-------------|---------| | `service_name` | `str` | Service identifier for registration | **Required** | | `service_version` | `str` | Version string for this service | `"1.0.0"` | | `coordinator_endpoint` | `str` | Worker coordinator URL | `"http://localhost:9091"` | | `runtime` | `str` | Runtime adapter: `"standalone"` or `"asgi"` | `"standalone"` | ## Runtime Modes ### Standalone Runtime For background workers, batch processing, and daemon processes: ```python import asyncio from agnt5 import Worker, function @function() def background_task(data: dict) -> dict: # Process data in background return {"processed": True, "result": data} async def main(): # Standalone worker blocks until stopped worker = Worker( service_name="background-processor", runtime="standalone" ) await worker.run() if __name__ == "__main__": asyncio.run(main()) ``` **Characteristics:** - Blocks until manually stopped (Ctrl+C) - Connects to worker coordinator - Ideal for background processing - Built-in signal handling - OpenTelemetry integration ### ASGI Runtime For web applications and HTTP endpoints: ```python from agnt5 import Worker, function @function() def web_handler(request: dict) -> dict: return {"message": "Hello from AGNT5!", "data": request} # Create ASGI application app = Worker( service_name="web-service", runtime="asgi" ) # Enable CORS for browser access app.enable_cors() ``` Run with any ASGI server: ```bash # Install ASGI server pip install uvicorn # Run the application uvicorn main:app --reload --port 8000 ``` **ASGI Endpoints:** | Endpoint | Method | Description | |----------|--------|-------------| | `/health` | GET | Health check endpoint | | `/functions` | GET | List registered functions | | `/invoke/{handler}` | POST | Invoke specific function | ## Environment Configuration ### Environment Variables Configure workers using environment variables: ```bash # Service configuration export AGNT5_SERVICE_NAME=my-service export AGNT5_SERVICE_VERSION=2.0.0 export AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com # Logging configuration export AGNT5_LOG_LEVEL=INFO export AGNT5_LOG_FORMAT=json # Runtime configuration export AGNT5_RUNTIME=standalone export AGNT5_DISABLE_TELEMETRY=false ``` ### Configuration Priority Configuration sources in order of precedence: 1. **Constructor parameters** - Highest priority 2. **Environment variables** - Medium priority 3. **Default values** - Lowest priority ```python # This worker uses constructor values over environment worker = Worker( service_name="explicit-service", # Overrides AGNT5_SERVICE_NAME coordinator_endpoint="http://localhost:9091" ) ``` ## Worker Lifecycle ### Initialization ```python async def main(): worker = Worker("my-service") # Worker validates Rust extension availability # Creates runtime adapter (standalone or ASGI) # Installs OpenTelemetry logging # Registers all decorated functions and workflows await worker.run() # Starts the worker loop ``` ### Registration Process 1. **Function Discovery**: Scans for `@function` decorated callables 2. **Workflow Discovery**: Scans for `@workflow` decorated factories 3. **Component Registration**: Sends metadata to coordinator 4. **Service Announcement**: Service becomes available for invocations ### Graceful Shutdown ```python import signal import asyncio from agnt5 import Worker class GracefulWorker: def __init__(self): self.worker = Worker("graceful-service") self.shutdown_requested = False async def run(self): # Register signal handlers signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) try: await self.worker.run() except KeyboardInterrupt: print("Shutdown requested via keyboard interrupt") finally: await self._cleanup() def _signal_handler(self, signum, frame): print(f"Received signal {signum}, initiating graceful shutdown...") self.shutdown_requested = True async def _cleanup(self): print("Cleaning up resources...") # Perform cleanup tasks # Close database connections # Finish in-flight requests # Remove OpenTelemetry handlers print("Cleanup complete") async def main(): graceful_worker = GracefulWorker() await graceful_worker.run() if __name__ == "__main__": asyncio.run(main()) ``` ## ASGI Integration ### Basic ASGI App ```python from agnt5 import Worker, function @function() def api_endpoint(data: dict) -> dict: return {"status": "success", "received": data} # Create ASGI app app = Worker("api-service", runtime="asgi") ``` ### CORS Configuration Enable CORS for browser access: ```python # Enable CORS with defaults (allows all origins) app.enable_cors() # Enable CORS with specific origins app.enable_cors(origins=["https://myapp.com", "https://localhost:3000"]) # Disable CORS app.disable_cors() ``` ### Custom Middleware Add ASGI middleware: ```python from starlette.middleware.cors import CORSMiddleware from starlette.middleware.gzip import GZipMiddleware app = Worker("api-service", runtime="asgi") # Add middleware (if using Starlette/FastAPI patterns) # Note: This is conceptual - actual middleware integration depends on ASGI runtime implementation ``` ### Error Handling ASGI runtime provides consistent error responses: ```python # Function that raises an exception @function() def failing_function(data: dict) -> dict: raise ValueError("Something went wrong") # ASGI runtime catches and formats the error: # { # "error": "Function failing_function failed: Something went wrong", # "status": 500 # } ``` ## Worker Methods ### Runtime Control ```python worker = Worker("my-service") # Check if worker is running if worker.is_running(): print("Worker is active") # For ASGI workers only - get ASGI callable if worker.runtime == "asgi": asgi_app = worker.__call__ # ASGI callable interface ``` ### Component Registration ```python # Manual component registration (usually automatic) worker._register_components() # Internal message handling (not part of public API) # worker._handle_message(request) ``` ## Observability ### OpenTelemetry Integration Workers automatically install OpenTelemetry logging: ```python import logging from agnt5 import Worker from agnt5.logging import install_opentelemetry_logging, remove_opentelemetry_logging # Custom logging setup logger = logging.getLogger("my-service") # Install telemetry with custom formatter install_opentelemetry_logging( logger=logger, level=logging.DEBUG, format_string="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) worker = Worker("my-service") # Telemetry is automatically cleaned up on worker shutdown ``` ### Structured Logging ```python import logging from agnt5 import Worker, function logger = logging.getLogger(__name__) @function() def logged_function(ctx, data: dict) -> dict: # Structured logging with context logger.info( "Processing function", extra={ "invocation_id": ctx.invocation_id, "service_name": ctx.metadata.get("service_name"), "data_size": len(str(data)) } ) result = {"processed": True} logger.info( "Function completed", extra={ "invocation_id": ctx.invocation_id, "success": True } ) return result ``` ## Development Patterns ### Hot Reload Development ```python import os import sys from agnt5 import Worker, function # Development configuration if os.getenv("ENVIRONMENT") == "development": import logging logging.basicConfig(level=logging.DEBUG) @function() def development_handler(data: dict) -> dict: return {"env": "development", "data": data} async def main(): worker = Worker( service_name="dev-service", coordinator_endpoint=os.getenv("COORDINATOR_URL", "http://localhost:9091") ) try: await worker.run() except KeyboardInterrupt: print("\nDevelopment worker stopped") sys.exit(0) if __name__ == "__main__": import asyncio asyncio.run(main()) ``` ### Testing Workers ```python import pytest from unittest.mock import AsyncMock, patch from agnt5 import Worker, function @function() def test_function(data: str) -> str: return data.upper() @pytest.fixture async def mock_worker(): with patch('agnt5.worker.PyWorker'): worker = Worker("test-service") yield worker @pytest.mark.asyncio async def test_worker_registration(mock_worker): # Test component registration mock_worker._register_components() # Verify functions are registered from agnt5.decorators import get_registered_functions functions = get_registered_functions() assert "test_function" in functions @pytest.mark.asyncio async def test_worker_asgi_mode(): app = Worker("test-service", runtime="asgi") assert callable(app) # ASGI callable interface ``` ## Production Deployment ### Container Deployment ```dockerfile FROM python:3.11-slim WORKDIR /app # Install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application code COPY . . # Health check HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1 # Run worker CMD ["python", "worker.py"] ``` ### Environment Configuration ```bash # Production environment variables AGNT5_SERVICE_NAME=production-service AGNT5_SERVICE_VERSION=1.2.0 AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com AGNT5_LOG_LEVEL=INFO AGNT5_LOG_FORMAT=json ``` ### Health Monitoring ```python import asyncio import logging from agnt5 import Worker, function logger = logging.getLogger(__name__) @function() def health_check() -> dict: """Health check endpoint.""" return { "status": "healthy", "service": "production-service", "timestamp": time.time() } async def main(): worker = Worker( service_name="production-service", service_version=os.getenv("SERVICE_VERSION", "1.0.0") ) logger.info("Starting production worker") try: await worker.run() except Exception as e: logger.error(f"Worker failed: {e}") raise finally: logger.info("Worker shutdown complete") if __name__ == "__main__": asyncio.run(main()) ``` ## Best Practices ### Service Design 1. **Service Naming** - Use consistent, descriptive service names 2. **Version Management** - Use semantic versioning for service versions 3. **Resource Management** - Clean up resources in shutdown handlers 4. **Error Handling** - Handle exceptions gracefully in workers 5. **Health Checks** - Implement health check functions for monitoring ### Performance 1. **Connection Pooling** - Reuse database and HTTP connections 2. **Async Operations** - Use async functions for I/O operations 3. **Resource Limits** - Configure appropriate memory and CPU limits 4. **Scaling** - Deploy multiple worker instances for high throughput 5. **Monitoring** - Track worker performance and error rates ### Security 1. **Input Validation** - Validate all function inputs 2. **Error Messages** - Don't expose sensitive information in errors 3. **Authentication** - Use proper authentication for coordinator connections 4. **Network Security** - Use secure connections (HTTPS/TLS) in production 5. **Secrets Management** - Use environment variables for sensitive configuration ## Next Steps ### Core Primitives - [Functions](functions) - Stateless operations with retries - [Entities](entity) - Stateful components - [Workflows](workflows) - Multi-step orchestration - [Context API](context) - Full API reference ### Agent Development Kit - [Agents](agent) - Autonomous LLM-driven systems - [Tools](tool) - Extend agent capabilities - [Sessions](session) - Conversation management - [Memory](memory) - Long-term knowledge storage ### Resources - [Examples](examples/basic-worker) - Worker deployment examples --- ## Workflows _Source: https://agnt5.com/sdk/python/workflows_ > Multi-step orchestration and durable execution patterns Workflows enable durable, multi-step orchestration with automatic recovery, state persistence, and complex dependency management. Built on the AGNT5 orchestration plane for exactly-once execution guarantees. ## Basic Workflow ### Simple Sequential Workflow ```python from agnt5 import workflow, task_step from agnt5.workflows import FlowDefinition @workflow() def data_pipeline() -> FlowDefinition: return FlowDefinition([ task_step( name="extract", service_name="etl-service", handler_name="extract_data" ), task_step( name="transform", service_name="etl-service", handler_name="transform_data", dependencies=["extract"] ), task_step( name="load", service_name="etl-service", handler_name="load_data", dependencies=["transform"] ) ]) ``` ### Parallel Execution Steps without dependencies execute in parallel: ```python @workflow() def parallel_processing() -> FlowDefinition: return FlowDefinition([ # These three steps run in parallel task_step( name="process_a", service_name="service", handler_name="process_type_a" ), task_step( name="process_b", service_name="service", handler_name="process_type_b" ), task_step( name="process_c", service_name="service", handler_name="process_type_c" ), # This step waits for all three to complete task_step( name="merge_results", service_name="service", handler_name="merge_results", dependencies=["process_a", "process_b", "process_c"] ) ]) ``` ## Step Types ### Task Steps Execute function handlers on services: ```python from agnt5.workflows import task_step # Basic task step step = task_step( name="unique_step_name", service_name="my-service", handler_name="my_handler" ) # Task with dependencies and input data step = task_step( name="dependent_step", service_name="my-service", handler_name="process_data", dependencies=["previous_step"], input_data={"config": "production", "batch_size": 100} ) # Task with object keys (Phase 2) step = task_step( name="object_step", service_name="object-service", handler_name="update_state", dependencies=["init_step"], object_keys=["user:123", "cart:456"] ) ``` ### Wait Signal Steps Pause execution until external signals: ```python from agnt5.workflows import wait_signal_step # Basic signal wait step = wait_signal_step( name="wait_for_approval", signal_name="approval_granted", dependencies=["review_step"] ) # Signal wait with timeout step = wait_signal_step( name="wait_with_timeout", signal_name="user_action", dependencies=["prompt_user"], timeout_ms=300000, # 5 minutes on_timeout="timeout_handler_step" ) ``` ### Wait Timer Steps Scheduled delays and cron-based execution: ```python from agnt5.workflows import wait_timer_step # Fixed delay step = wait_timer_step( name="delay_before_retry", timer_key="retry_delay", delay_ms=30000, # 30 seconds dependencies=["failed_step"] ) # Cron schedule step = wait_timer_step( name="nightly_batch", timer_key="nightly", cron_expr="0 2 * * *", # 2 AM daily dependencies=["prep_step"] ) # Timer with retries step = wait_timer_step( name="retry_with_backoff", timer_key="exponential_backoff", delay_ms=5000, max_retries=3, dependencies=["error_step"] ) ``` ## Workflow Examples ### ETL Pipeline ```python @workflow() def nightly_etl() -> FlowDefinition: return FlowDefinition([ # Start with data validation task_step( name="validate_sources", service_name="etl-service", handler_name="validate_data_sources" ), # Extract from multiple sources in parallel task_step( name="extract_database", service_name="etl-service", handler_name="extract_from_database", dependencies=["validate_sources"] ), task_step( name="extract_api", service_name="etl-service", handler_name="extract_from_api", dependencies=["validate_sources"] ), task_step( name="extract_files", service_name="etl-service", handler_name="extract_from_files", dependencies=["validate_sources"] ), # Wait for all extractions to complete task_step( name="merge_extracted_data", service_name="etl-service", handler_name="merge_data", dependencies=["extract_database", "extract_api", "extract_files"] ), # Transform data task_step( name="clean_data", service_name="etl-service", handler_name="clean_and_normalize", dependencies=["merge_extracted_data"] ), task_step( name="enrich_data", service_name="etl-service", handler_name="enrich_with_metadata", dependencies=["clean_data"] ), # Wait for maintenance window wait_timer_step( name="wait_for_maintenance_window", timer_key="maintenance", cron_expr="0 3 * * *", # 3 AM dependencies=["enrich_data"] ), # Load data task_step( name="load_to_warehouse", service_name="etl-service", handler_name="load_data_warehouse", dependencies=["wait_for_maintenance_window"] ), # Generate reports task_step( name="generate_reports", service_name="reporting-service", handler_name="generate_daily_reports", dependencies=["load_to_warehouse"] ) ]) ``` ### Approval Workflow ```python @workflow() def document_approval() -> FlowDefinition: return FlowDefinition([ # Submit document for review task_step( name="submit_document", service_name="doc-service", handler_name="submit_for_review", input_data={"priority": "normal"} ), # Notify reviewers task_step( name="notify_reviewers", service_name="notification-service", handler_name="send_review_notifications", dependencies=["submit_document"] ), # Wait for approval (with timeout) wait_signal_step( name="wait_for_approval", signal_name="document_approved", dependencies=["notify_reviewers"], timeout_ms=172800000, # 48 hours on_timeout="escalate_approval" ), # Escalation path task_step( name="escalate_approval", service_name="doc-service", handler_name="escalate_to_manager", # No dependencies - triggered by timeout ), # Wait for escalated approval wait_signal_step( name="wait_escalated_approval", signal_name="escalated_approval", dependencies=["escalate_approval"], timeout_ms=86400000 # 24 hours ), # Publish approved document task_step( name="publish_document", service_name="doc-service", handler_name="publish_approved_document", dependencies=["wait_for_approval", "wait_escalated_approval"] ) ]) ``` ## Workflow Registration ### Using the Decorator ```python from agnt5 import workflow from agnt5.workflows import get_registered_workflows @workflow() def my_workflow() -> FlowDefinition: return FlowDefinition([...]) @workflow("custom_name") def workflow_with_custom_name() -> FlowDefinition: return FlowDefinition([...]) # Inspect registered workflows workflows = get_registered_workflows() print(f"Registered workflows: {list(workflows.keys())}") ``` ### Manual Registration ```python from agnt5.workflows import register_workflow def create_workflow_definition() -> FlowDefinition: return FlowDefinition([ task_step("step1", service_name="service", handler_name="handler1"), task_step("step2", service_name="service", handler_name="handler2", dependencies=["step1"]) ]) # Manual registration register_workflow("manual_workflow", create_workflow_definition()) ``` ## Workflow Data Classes ### FlowDefinition Container for workflow steps with serialization: ```python from agnt5.workflows import FlowDefinition # Create definition flow = FlowDefinition([ task_step("step1", service_name="svc", handler_name="h1"), task_step("step2", service_name="svc", handler_name="h2", dependencies=["step1"]) ]) # Serialize to dictionary flow_dict = flow.to_dict() # Serialize to JSON string flow_json = flow.to_json() ``` ### WorkflowStep Individual step configuration: ```python from agnt5.workflows import WorkflowStep, StepType # Manual step creation (usually use helper functions instead) step = WorkflowStep( name="custom_step", step_type=StepType.TASK, service_name="my-service", handler_name="my_handler", dependencies=["previous_step"], input_data={"key": "value"} ) ``` ### Configuration Classes ```python from agnt5.workflows import SignalConfig, TimerConfig # Signal configuration signal_config = SignalConfig( name="approval_signal", timeout_ms=3600000, # 1 hour on_timeout="timeout_step" ) # Timer configuration timer_config = TimerConfig( key="batch_timer", delay_ms=60000, # 1 minute max_retries=5 ) # Cron timer configuration cron_timer = TimerConfig( key="daily_job", cron_expr="0 0 * * *", # Daily at midnight max_retries=3 ) ``` ## Validation and Error Handling ### Workflow Validation AGNT5 validates workflows during registration: ```python from agnt5.workflows import register_workflow, FlowDefinition, task_step # This will raise ValueError: Missing dependencies try: invalid_flow = FlowDefinition([ task_step("step2", service_name="svc", handler_name="h2", dependencies=["step1"]), # step1 doesn't exist task_step("step3", service_name="svc", handler_name="h3", dependencies=["step2"]) ]) register_workflow("invalid", invalid_flow) except ValueError as e: print(f"Validation error: {e}") ``` ### Validation Rules - At least one step must be defined - Step names must be unique within the workflow - Dependencies must reference existing steps - Dependencies must appear earlier in the definition (causal order) - Required fields must be populated based on step type ## Testing Workflows ### Workflow Definition Testing ```python import pytest from agnt5.workflows import FlowDefinition, task_step, get_registered_workflows def test_workflow_definition(): # Test workflow structure flow = FlowDefinition([ task_step("extract", service_name="etl", handler_name="extract"), task_step("transform", service_name="etl", handler_name="transform", dependencies=["extract"]) ]) # Verify serialization flow_dict = flow.to_dict() assert len(flow_dict["steps"]) == 2 assert flow_dict["steps"][1]["dependencies"] == ["extract"] def test_workflow_registration(): @workflow() def test_workflow() -> FlowDefinition: return FlowDefinition([ task_step("test_step", service_name="test", handler_name="test") ]) # Verify registration workflows = get_registered_workflows() assert "test_workflow" in workflows ``` ### Integration Testing Test workflows with a local development environment: ```python import asyncio from agnt5 import Client async def test_workflow_execution(): client = Client("http://localhost:8080") # Trigger workflow result = await client.start_workflow( workflow_name="data_pipeline", input_data={"source": "test_data"} ) workflow_id = result["workflow_id"] # Poll for completion while True: status = await client.get_workflow_status(workflow_id) if status["state"] in ["completed", "failed"]: break await asyncio.sleep(1) assert status["state"] == "completed" ``` ## Best Practices ### Design Patterns 1. **Idempotent Steps** - Design steps to be safely retryable 2. **Small Steps** - Break complex operations into smaller, focused steps 3. **Clear Dependencies** - Make step relationships explicit 4. **Meaningful Names** - Use descriptive names for steps and workflows 5. **Error Handling** - Plan for failure and recovery scenarios ### Performance 1. **Parallel Execution** - Remove unnecessary dependencies to enable parallelism 2. **Batch Operations** - Group related operations into single steps 3. **Resource Management** - Consider resource usage when designing workflows 4. **State Minimization** - Keep workflow state as small as possible ### Monitoring 1. **Structured Logging** - Add logging to workflow steps 2. **Progress Tracking** - Use meaningful step names and descriptions 3. **Metrics Collection** - Track workflow success rates and durations 4. **Error Alerting** - Set up alerts for workflow failures ## Next Steps - [Worker Runtime](worker) - Configure and deploy workers - [API Reference](api/workflows) - Complete workflows API reference - [Examples](examples/workflow-example) - Real-world workflow patterns # CLI Reference --- ## Authentication Commands _Source: https://agnt5.com/cli_ > Authentication and identity management for AGNT5 CLI Authentication is required before you can create projects, deploy, or interact with the Control Plane. Credentials are stored in `~/.agnt5/config.yaml` and read on every invocation. **Commands**: `agnt5 auth login [--api-key ]`, `agnt5 auth logout`, `agnt5 auth whoami` **Credential store**: `~/.agnt5/config.yaml` (persisted on login, read on every invocation) **Env overrides**: `AGNT5_API_KEY` bypasses the login command; `AGNT5_API_URL` overrides the Control Plane URL **Auth flow**: browser-based OAuth via PropelAuth; CLI polls the Control Plane until an API key is issued ### `agnt5 auth login`
Launches the authentication flow. By default the CLI requests a short-lived session ID from the Control Plane, opens your browser to the PropelAuth login screen, and polls until an API key is issued. The generated key is persisted to `~/.agnt5/config.yaml` so subsequent commands can reuse it. ### Syntax ```bash agnt5 auth login [options] ``` ### Options | Flag | Description | | --- | --- | | `--api-key ` | Skip the browser flow and provide an existing API key. The CLI validates the key by fetching the current user before saving it locally. | The CLI respects `AGNT5_API_URL` and other context settings when constructing the OAuth URLs, so ensure your context is correct before logging in. You can also provide `AGNT5_API_KEY` in the environment to bypass the login command entirely.
OAuth flow With API key Success response ```bash agnt5 auth login ``` ``` Starting authentication flow... ✓ Session ID requested from Control Plane Opening browser for authentication... → https://auth.agnt5.com/propelauth/login Waiting for authentication to complete... ✓ Authentication successful ✓ API key saved to ~/.agnt5/config.yaml You are now authenticated as: user@example.com ``` ```bash agnt5 auth login --api-key agnt5_sk_abc123... ``` ``` Validating provided API key... ✓ API key is valid ✓ User verified: user@example.com ✓ API key saved to ~/.agnt5/config.yaml Authentication complete. ``` ```bash agnt5 auth login ``` ``` ✓ Already authenticated as: user@example.com ✓ API key is valid and active Current context: production API URL: https://api.agnt5.com No action needed. ```
### `agnt5 auth logout`
Clears any stored API keys and tokens from `~/.agnt5/config.yaml`. You can re-authenticate later with `agnt5 auth login`. ### Syntax ```bash agnt5 auth logout ```
Logout Already logged out ```bash agnt5 auth logout ``` ``` ✓ API key cleared from ~/.agnt5/config.yaml ✓ Authentication tokens removed You have been logged out successfully. Use 'agnt5 auth login' to authenticate again. ``` ```bash agnt5 auth logout ``` ``` No authentication credentials found. Already logged out. ```
### `agnt5 auth status`
Displays whether you are authenticated, printing the email associated with the stored key along with the active environment and API base URL. If the saved key is invalid the command reports the failure and suggests `agnt5 auth login`. ### Syntax ```bash agnt5 auth status ```
Authenticated Invalid key Not authenticated ```bash agnt5 auth status ``` ``` ✓ Authenticated as: user@example.com ✓ API key is valid Environment: production API URL: https://api.agnt5.com Context: production Authentication status: Active ``` ```bash agnt5 auth status ``` ``` ✗ Authentication failed ✗ Stored API key is invalid or expired Please run 'agnt5 auth login' to re-authenticate. ``` ```bash agnt5 auth status ``` ``` ✗ Not authenticated No API key found in ~/.agnt5/config.yaml Run 'agnt5 auth login' to authenticate. ```
### `agnt5 whoami`
Prints the email address tied to the stored credentials. This command is also available via `agnt5 auth whoami` and returns `Not authenticated` when no valid key is present. ### Syntax ```bash agnt5 whoami ```
Authenticated Not authenticated ```bash agnt5 whoami ``` ``` user@example.com ``` ```bash agnt5 whoami ``` ``` Not authenticated ```
### `agnt5 version`
Display CLI build metadata including version string, Go runtime, and platform. Use this command to inspect the CLI build information. ### Syntax ```bash agnt5 version ``` Use `agnt5 version` paired with `agnt5 whoami` to confirm both your binary and credentials before deploying.
Version info With whoami ```bash agnt5 version ``` ``` AGNT5 CLI v1.2.3 Build Information: Version: 1.2.3 Go version: go1.21.0 Platform: darwin/amd64 Build date: 2024-01-15 10:30:45 Git commit: abc1234 ``` ```bash agnt5 version && agnt5 whoami ``` ``` AGNT5 CLI v1.2.3 Build Information: Version: 1.2.3 Go version: go1.21.0 Platform: darwin/amd64 Build date: 2024-01-15 10:30:45 Git commit: abc1234 user@example.com ```
--- ## Commands Reference _Source: https://agnt5.com/cli_ > Complete reference for all AGNT5 CLI commands and their options **Top-level commands**: `agnt5 init`, `agnt5 run`, `agnt5 deploy`, `agnt5 logs`, `agnt5 config`, `agnt5 auth`, `agnt5 project`, `agnt5 context`, `agnt5 list` **Invocation shape**: `agnt5 [args] [options]` **Help**: `agnt5 --help` for top-level; `agnt5 --help` per subcommand ### `agnt5 init` Initialize a new AGNT5 project with templates, configuration, and best practices. ```bash agnt5 init [options] ``` **Options:** - `--template ` - Project template (python, typescript, go, minimal) - `--minimal` - Create a minimal project structure - `--no-install` - Skip dependency installation - `--git` - Initialize git repository (default: true) **Examples:** ```bash agnt5 init my-project agnt5 init my-ai-agent --template python agnt5 init simple-workflow --minimal agnt5 init my-project --no-install --no-git ``` ### `agnt5 run` Execute workflows locally or remotely with comprehensive logging and error handling. ```bash agnt5 run [options] ``` **Options:** - `--input ` - Input data file (JSON) - `--env ` - Target environment (local, dev, staging, prod) - `--remote` - Execute on remote AGNT5 runtime - `--watch` - Watch for file changes and re-run - `--debug` - Enable debug output **Examples:** ```bash agnt5 run my-workflow agnt5 run data-processor --input data.json agnt5 run my-workflow --remote --env staging agnt5 run my-workflow --watch --debug ``` ## Deployment Commands ### `agnt5 deploy` Deploy your workflows to production with zero-downtime deployments and automatic scaling. ```bash agnt5 deploy [options] ``` **Options:** - `--env ` - Target environment (staging, production) - `--watch` - Watch deployment status - `--no-build` - Skip build step - `--force` - Force deployment even with warnings - `--rollback` - Rollback to previous deployment **Examples:** ```bash agnt5 deploy agnt5 deploy --env staging agnt5 deploy --watch --env production agnt5 deploy --rollback ``` ### `agnt5 build` Build your project for deployment. ```bash agnt5 build [options] ``` **Options:** - `--env ` - Build for specific environment - `--output ` - Output directory (default: dist) - `--minify` - Minify output files - `--sourcemap` - Generate source maps **Examples:** ```bash agnt5 build agnt5 build --env production --minify agnt5 build --output ./build --sourcemap ``` ## Monitoring Commands ### `agnt5 logs` Stream real-time logs from your workflows with filtering, search, and export capabilities. ```bash agnt5 logs [options] ``` **Options:** - `--follow, -f` - Follow log output in real-time - `--filter ` - Filter by log level (error, warn, info, debug) - `--grep ` - Filter logs by pattern - `--tail ` - Show last N lines (default: 100) - `--env ` - Target environment - `--export ` - Export logs to file **Examples:** ```bash agnt5 logs agnt5 logs --follow --filter error agnt5 logs --grep "workflow-123" --tail 50 agnt5 logs --env production --export logs.json ``` ### `agnt5 status` Check the status of your workflows, deployments, and system health across environments. ```bash agnt5 status [options] ``` **Options:** - `--env ` - Target environment - `--watch, -w` - Watch status in real-time - `--json` - Output in JSON format - `--verbose` - Show detailed status information **Examples:** ```bash agnt5 status agnt5 status --env production --watch agnt5 status --json --verbose ``` ## Configuration Commands ### `agnt5 config` Manage CLI configuration and settings. ```bash agnt5 config [options] ``` **Subcommands:** - `get ` - Get configuration value - `set ` - Set configuration value - `list` - List all configuration values - `reset` - Reset configuration to defaults **Options:** - `--global` - Modify global configuration - `--env ` - Environment-specific config **Examples:** ```bash agnt5 config get api-key agnt5 config set api-key your-api-key agnt5 config set timeout 30000 --env production agnt5 config list --global agnt5 config reset ``` ### `agnt5 auth` Manage authentication with AGNT5 services. ```bash agnt5 auth ``` **Subcommands:** - `login` - Authenticate with AGNT5 - `logout` - Remove authentication - `whoami` - Show current user - `token` - Manage API tokens **Examples:** ```bash agnt5 auth login agnt5 auth whoami agnt5 auth logout ``` ## Environment Commands ### `agnt5 env` Manage deployment environments and their configuration. ```bash agnt5 env [options] ``` **Subcommands:** - `list` - List all environments - `create ` - Create new environment - `delete ` - Delete environment - `set ` - Set environment variable - `unset ` - Remove environment variable **Examples:** ```bash agnt5 env list agnt5 env create staging agnt5 env set DATABASE_URL postgres://... --env staging agnt5 env unset DEBUG --env production agnt5 env delete old-staging ``` ## Utility Commands ### `agnt5 validate` Validate project configuration and workflow definitions. ```bash agnt5 validate [file] [options] ``` **Options:** - `--schema` - Validate against specific schema - `--fix` - Attempt to fix validation errors - `--strict` - Use strict validation rules **Examples:** ```bash agnt5 validate agnt5 validate workflow.yaml agnt5 validate --strict --fix ``` ### `agnt5 doctor` Diagnose common issues with your AGNT5 setup. ```bash agnt5 doctor [options] ``` **Options:** - `--verbose` - Show detailed diagnostic information - `--fix` - Attempt to fix detected issues **Examples:** ```bash agnt5 doctor agnt5 doctor --verbose --fix ``` ## Global Options These options are available for all commands: - `--help, -h` - Show help information - `--version, -v` - Show CLI version - `--verbose` - Enable verbose output - `--quiet, -q` - Suppress non-error output - `--config ` - Use specific config file --- ## Configuration _Source: https://agnt5.com/cli_ > Configure the AGNT5 CLI for your development environment and deployment targets Configure the AGNT5 CLI for your development environment, API authentication, and deployment targets. **Config file locations**: project root (`agnt5.config.js`, `agnt5.config.json`, or `agnt5.yaml`); user home (`~/.agnt5/config.yaml` for credentials, `~/.agnt5/context.yaml` for environment) **Config formats**: JavaScript (`agnt5.config.js`) or JSON (`agnt5.config.json`); manifests use YAML (`agnt5.yaml`) **Precedence**: CLI flags > env vars (`AGNT5_*`) > project config > user config > built-in defaults ## Project Configuration The CLI uses a configuration file in your project root. The CLI supports both JavaScript and JSON formats: ### `agnt5.config.js` ```javascript module.exports = { // Project settings name: 'my-project', version: '1.0.0', description: 'My AI workflow project', // Development server dev: { port: 3000, host: 'localhost', watch: ['src/**/*', 'workflows/**/*'], reload: true, open: true }, // Build settings build: { outDir: 'dist', minify: true, sourcemap: false, target: 'node16' }, // Deployment settings deploy: { environment: 'production', region: 'us-east-1', timeout: 300000, retries: 3 }, // Workflow configuration workflows: { timeout: 60000, retries: 2, concurrency: 10 } }; ``` ### `agnt5.config.json` ```json { "name": "my-project", "version": "1.0.0", "dev": { "port": 3000, "host": "localhost", "watch": ["src/**/*"] }, "deploy": { "environment": "production", "region": "us-east-1" } } ``` ## Environment Variables Configure the CLI using environment variables. These can be set in your shell or in a `.env` file: ### Authentication ```bash # API authentication AGNT5_API_KEY=your-api-key AGNT5_BASE_URL=https://api.agnt5.com # Alternative: use auth token AGNT5_AUTH_TOKEN=your-jwt-token ``` ### Runtime Configuration ```bash # Default environment AGNT5_ENVIRONMENT=development # Logging AGNT5_LOG_LEVEL=info # error, warn, info, debug AGNT5_LOG_FORMAT=pretty # pretty, json # Timeouts (in milliseconds) AGNT5_TIMEOUT=30000 AGNT5_CONNECT_TIMEOUT=5000 # Development settings AGNT5_DEV_PORT=3000 AGNT5_DEV_HOST=localhost AGNT5_HOT_RELOAD=true ``` ### Deployment Configuration ```bash # Default deployment environment AGNT5_DEPLOY_ENV=production # Runtime settings AGNT5_RUNTIME_REGION=us-east-1 AGNT5_RUNTIME_MEMORY=512 AGNT5_RUNTIME_TIMEOUT=300 ``` ## Global CLI Configuration Manage global CLI settings that persist across all projects: ### View Current Configuration ```bash agnt5 config list --global ``` ### Set Global Configuration ```bash # API settings agnt5 config set api-key your-api-key --global agnt5 config set base-url https://api.agnt5.com --global # Default preferences agnt5 config set log-level info --global agnt5 config set editor vscode --global agnt5 config set auto-update true --global ``` ### Configuration File Location Global configuration is stored in: - **macOS/Linux**: `~/.config/agnt5/config.json` - **Windows**: `%APPDATA%\agnt5\config.json` ## Environment-Specific Configuration Configure different settings for different environments: ### Development Environment ```bash agnt5 config set timeout 10000 --env development agnt5 config set log-level debug --env development agnt5 config set hot-reload true --env development ``` ### Staging Environment ```bash agnt5 config set base-url https://staging-api.agnt5.com --env staging agnt5 config set timeout 30000 --env staging agnt5 config set log-level info --env staging ``` ### Production Environment ```bash agnt5 config set base-url https://api.agnt5.com --env production agnt5 config set timeout 60000 --env production agnt5 config set log-level warn --env production ``` ## API Keys and Authentication ### Setting Up API Keys 1. **Get your API key** from the AGNT5 dashboard 2. **Set it globally** for all projects: ```bash agnt5 config set api-key your-api-key --global ``` 3. **Or set per environment:** ```bash agnt5 config set api-key your-dev-key --env development agnt5 config set api-key your-prod-key --env production ``` ### Using Environment Files Create `.env` files for each environment: #### `.env.development` ```bash AGNT5_API_KEY=dev_api_key_here AGNT5_BASE_URL=https://dev-api.agnt5.com AGNT5_LOG_LEVEL=debug ``` #### `.env.production` ```bash AGNT5_API_KEY=prod_api_key_here AGNT5_BASE_URL=https://api.agnt5.com AGNT5_LOG_LEVEL=warn ``` ### Authentication Methods The CLI supports multiple authentication methods in order of precedence: 1. **Command-line flags**: `--api-key your-key` 2. **Environment variables**: `AGNT5_API_KEY` 3. **Project config file**: `agnt5.config.js` 4. **Global config**: `~/.config/agnt5/config.json` 5. **Interactive login**: `agnt5 auth login` ## Configuration Validation Validate your configuration to ensure everything is set up correctly: ```bash # Validate current configuration agnt5 config validate # Validate specific environment agnt5 config validate --env production # Show configuration sources agnt5 config validate --verbose ``` ## Configuration Schema The complete configuration schema: ```typescript interface AgntConfig { // Project metadata name?: string; version?: string; description?: string; // API configuration apiKey?: string; baseUrl?: string; timeout?: number; // Development server dev?: { port?: number; host?: string; watch?: string[]; reload?: boolean; open?: boolean; }; // Build configuration build?: { outDir?: string; minify?: boolean; sourcemap?: boolean; target?: string; }; // Deployment settings deploy?: { environment?: string; region?: string; timeout?: number; retries?: number; }; // Workflow settings workflows?: { timeout?: number; retries?: number; concurrency?: number; }; // Logging configuration logging?: { level?: 'error' | 'warn' | 'info' | 'debug'; format?: 'pretty' | 'json'; }; } ``` ## Best Practices ### Security - **Never commit API keys** to version control - **Use environment-specific keys** for different deployment targets - **Rotate keys regularly** and update configuration - **Use `.env` files** for local development ### Organization - **Use project config files** for team-shared settings - **Use global config** for personal preferences - **Document environment variables** in your project README - **Validate configuration** in CI/CD pipelines ### Performance - **Set appropriate timeouts** for your use case - **Configure concurrency limits** based on your resources - **Use region-specific endpoints** for better latency - **Enable caching** where appropriate --- ## Context Switching Commands (Advanced) _Source: https://agnt5.com/cli_ > Switch between local development endpoints and hosted environments Advanced users can switch between local development endpoints and hosted environments with the `agnt5 context` command group. Contexts influence which Control Plane and Gateway URLs the CLI uses, and they can also seed environment-specific configuration files. **Commands**: `agnt5 context` (show current), `agnt5 context use `, `agnt5 context list` **Context store**: `~/.agnt5/context.yaml` **Default context**: `production` (API URL `https://api.agnt5.com`) **Hidden from `--help`**: yes, but invokable directly Although the commands are marked hidden in `--help`, you can still run them directly. These are advanced commands primarily used for development and testing against different AGNT5 environments. ### `agnt5 context`
Prints the currently active context (defaults to `production` when none is stored) along with the API base URL pulled from `~/.agnt5/context.yaml`. ### Syntax ```bash agnt5 context ```
```bash # Show current context agnt5 context ``` ``` Current context: production API URL: https://api.agnt5.com ```
### `agnt5 context list`
Lists the built-in contexts (`local`, `staging`, `production`) and marks the active one with `*`. ### Syntax ```bash agnt5 context list ```
```bash # List all available contexts agnt5 context list ``` ``` Available contexts: local http://localhost:34181 staging https://api.agnt5.xyz * production https://api.agnt5.com ```
### `agnt5 context set`
Writes the chosen context to `~/.agnt5/context.yaml`, updates related settings, and copies `~/.agnt5/config..yaml` into `config.yaml` when present. The command also prints the final API and gateway URLs so you can confirm the change. ### Syntax ```bash agnt5 context set ``` ### Available Contexts | Context | API URL | Gateway URL | Notes | | --- | --- | --- | --- | | `local` | `http://localhost:34181` | `http://localhost:34183` | Dev stack with local OAuth | | `staging` | `https://api.agnt5.xyz` | `https://gw.agnt5.xyz` | Pre-production testing | | `production` | `https://api.agnt5.com` | `https://gw.agnt5.com` | Live environment (default) |
Local development Staging Production ```bash # Switch to local development agnt5 context set local ``` ``` ✓ Context switched to: local ✓ Configuration copied from config.local.yaml Active endpoints: API URL: http://localhost:34181 Gateway: http://localhost:34183 gRPC Gateway: localhost:34184 Auth URL: https://86934364.propelauthtest.com You are now targeting the local development stack. ``` ```bash # Switch to staging environment agnt5 context set staging ``` ``` ✓ Context switched to: staging ✓ Configuration copied from config.staging.yaml Active endpoints: API URL: https://api.agnt5.xyz Gateway: https://gw.agnt5.xyz Auth URL: https://auth.agnt5.xyz/propelauth You are now targeting the staging environment. ``` ```bash # Switch back to production agnt5 context set production ``` ``` ✓ Context switched to: production ✓ Configuration copied from config.production.yaml Active endpoints: API URL: https://api.agnt5.com Gateway: https://gw.agnt5.com Auth URL: https://auth.agnt5.com/propelauth You are now targeting the production environment. ```
## Context Configuration Every context also updates `agnt5_env` to a descriptive value (`agnt5-local`, `agnt5-staging`, `agnt5-production`). Commands like `agnt5 auth login` and `agnt5 deploy` pick up these values the next time they run, ensuring you are talking to the correct control plane. ### Environment-Specific Configuration Keep environment-specific credentials in `~/.agnt5/config.local.yaml`, `config.staging.yaml`, etc. Switching contexts copies the relevant file over `config.yaml`, so your API keys stay in sync with the environment you just selected. ### Configuration Files by Context | File | Purpose | | --- | --- | | `~/.agnt5/context.yaml` | Stores the currently active context | | `~/.agnt5/config.yaml` | Main configuration file, updated when contexts switch | | `~/.agnt5/config.local.yaml` | Local environment-specific settings | | `~/.agnt5/config.staging.yaml` | Staging environment-specific settings | | `~/.agnt5/config.production.yaml` | Production environment-specific settings | ### Context Workflow ```bash # 1. Check current context agnt5 context # 2. List available contexts agnt5 context list # 3. Switch to local development agnt5 context set local # 4. Authenticate against local environment agnt5 auth login # 5. Work with local stack agnt5 project list # 6. Switch back to production agnt5 context set production ``` ## Use Cases ### Local Development Switch to `local` context when working with the development stack: ```bash agnt5 context set local agnt5 auth login ``` ### Staging Testing Use `staging` context for pre-production testing: ```bash agnt5 context set staging agnt5 auth login agnt5 deploy --staging ``` ### Production Operations Default `production` context for live deployments: ```bash agnt5 context set production agnt5 auth login agnt5 deploy --prod ``` **Important:** Always verify your context before performing sensitive operations like production deployments. Use `agnt5 context` to confirm you're targeting the correct environment. --- ## Deployment Commands _Source: https://agnt5.com/cli_ > Build, push, and deploy AGNT5 projects with comprehensive deployment pipeline `agnt5 deploy` is the all-in-one command that builds your project, pushes the resulting image to the configured registry, validates the target workspace, and creates (or updates) a deployment. It intentionally mirrors the Vercel CLI's single entry point: run `agnt5 deploy` from the project root and the CLI orchestrates the rest. **Command**: `agnt5 deploy [options]` **Required**: authenticated session (`agnt5 auth login`); project binding (`.agnt5/project-ref`); deployment manifest (`agnt5.yaml`); running Docker daemon **Key flags**: `--environment`, `--prod`, `--staging`, `--dry-run`, `--build-only`, `--platform`, `--replicas`, `--cpu`, `--memory`, `--push-remote` **Side effects**: builds Docker image; pushes to local registry (and remote if `--push-remote`); creates/updates Control Plane deployment **Stages**: dry-run plan (optional) → build → push → deploy ## Prerequisites - You must be authenticated (`agnt5 auth login`) so the CLI can talk to the Control Plane and container registries - The working directory needs a project binding via `.agnt5/project-ref` and a deployment manifest (`agnt5.yaml`). `agnt5 project create` scaffolds both for Python projects - Docker must be installed and running; the CLI performs connectivity checks and will error out with guidance if Docker or the registry cannot be reached ### `agnt5 deploy`
The complete deployment workflow with build, push, and deployment orchestration. ### Syntax ```bash agnt5 deploy [options] ``` ### Command Workflow 1. **Dry run (optional)** – With `--dry-run` the CLI prints a deployment plan and exits without building or deploying 2. **Build** – Creates a Docker context, optionally generates a Dockerfile for Python projects, and uses the local Docker daemon to build the image 3. **Push** – Tags are pushed to the local registry by default, and optionally mirrored to a remote registry when `--push-remote` is set 4. **Deploy** – The Control Plane client validates or creates the workspace, resumes it if paused, and issues a deployment request ### Options **Common Options** | Flag | Description | | --- | --- | | `--project-dir` | Use a different directory as the build context (default: `.`) | | `--environment` | Name of the deployment environment (default: `development`) | | `--prod` | Shorthand flag that forces `environment` to `production` | | `--staging` | Shorthand flag that forces `environment` to `staging` | | `--dry-run` | Show the plan without building or deploying | | `--build-only` | Stop after a successful build/push so you can deploy later | **Build Options** | Flag | Description | | --- | --- | | `--dockerfile` | Path to the Dockerfile (defaults to `./Dockerfile`) | | `--no-cache` | Disable Docker layer caching | | `--clean-cache` | Prune Docker build cache before building | | `--pull` | Always attempt to pull newer base images | | `--platform` | Target platforms for multi-arch builds | | `--build-args` | Additional build arguments | | `--target` | Build a specific stage from multi-stage Dockerfile | | `--tags` | Append extra image tags | **Registry Options** | Flag | Description | | --- | --- | | `--push` | Push to local registry (default: `true`) | | `--push-remote` | Mirror to remote registry | **Deployment Options** | Flag | Description | | --- | --- | | `--replicas` | Number of replicas (default: `1`) | | `--cpu` | CPU limit | | `--memory` | Memory limit |
Basic deployment Build customization Production Dry run ```bash # Deploy to development environment agnt5 deploy ``` ``` Starting deployment... ✓ Docker connectivity check passed ✓ Project reference found: acme/my-project ✓ Building image: localhost:5001/acme/my-project:latest Building Docker image... [+] Building 45.2s (12/12) FINISHED ✓ Image built successfully ✓ Pushing to local registry ✓ Validating workspace (development) ✓ Deploying to workspace Deployment successful! Next steps: curl http://localhost:8090/call -d '{"serviceName":"my-project"}' agnt5 logs ``` ```bash # Deploy to staging agnt5 deploy --staging ``` ```bash # Clean build without cache agnt5 deploy --clean-cache ``` ``` Starting deployment with clean cache... ✓ Pruning Docker build cache ✓ Pulling fresh base images Building Docker image... [+] Building 120.5s (12/12) FINISHED ✓ Clean build completed ``` ```bash # Multi-platform build agnt5 deploy --platform linux/amd64,linux/arm64 ``` ```bash # Build only (no deployment) agnt5 deploy --build-only ``` ```bash # Full production deployment agnt5 deploy --prod --replicas 3 --cpu 1 --memory 1Gi --tags v1.2.0 ``` ``` Starting production deployment... ✓ Environment: production ✓ Replicas: 3 ✓ Resources: 1 CPU, 1Gi memory ✓ Additional tags: v1.2.0 Building Docker image... [+] Building 52.1s (12/12) FINISHED ✓ Image tagged: v1.2.0 ✓ Pushing to local registry ✓ Mirroring to remote registry ✓ Validating production workspace ✓ Deploying with 3 replicas Production deployment successful! Deployment ID: dep-prod-abc123 Status: running (3/3 replicas ready) ``` ```bash # See deployment plan agnt5 deploy --dry-run ``` ``` Deployment Plan (DRY RUN): Project: acme/my-project Environment: development Image: localhost:5001/acme/my-project:latest Replicas: 1 Resources: default Build Steps: 1. Validate Docker connectivity 2. Build image from ./Dockerfile 3. Push to localhost:5001 Deploy Steps: 1. Validate/create development workspace 2. Deploy with 1 replica 3. Wait for ready status No changes will be made (dry run mode). ``` ```bash # Plan production deployment agnt5 deploy --prod --dry-run --replicas 3 ```
## Build Stage Details The build stage performs several safeguards before issuing a Docker build: - Validates Docker connectivity by pinging the daemon and pulling a tiny `hello-world` image to confirm registry access - Ensures Docker is installed, the project directory exists, and a Dockerfile is available (generating one for Python projects when needed) - Reads defaults from `config.yaml`/`agnt5.yaml` to name the image after your project reference and merges in any additional tags or build arguments you provided If you set `--clean-cache` or `--no-cache`, the CLI prunes the Docker build cache before starting the build and forces base image pulls. Progress updates stream directly to the terminal (including the current Docker build step). ## Registry Pushes After a successful build, the CLI tags the image for the configured registry: ### Local Registry - Local pushes go to `localhost:5001` by default (or whatever `deploy.registry.local_url` resolves to) - Authentication uses your AGNT5 API key ### Remote Registry - Remote pushes use `deploy.registry.remote_url` when defined; otherwise the CLI falls back to the default remote registry (`iarun-agnt5-cr.protoml.dev`) - Credentials come from `config.yaml` or, if omitted, reuse your API key - When both `--push` and `--push-remote` are enabled the CLI pushes to the local registry first, then mirrors the tag to the remote endpoint You can opt out of pushing entirely with `--push=false`, but keep in mind that `agnt5 deploy` still expects to deploy the latest image tag. ## Workspace Validation and Deployment The deployment step uses the Control Plane API to ensure your workspace is ready: ### Workspace States | State | Action | | --- | --- | | **No workspace** | CLI creates one automatically and waits (up to 10 minutes) for it to reach the `ready` phase | | **Paused** | Workspaces are resumed before deployment | | **Pending/Provisioning** | Triggers a wait loop until ready | | **Failed** | Produces actionable error messages and stops the deployment | ### Deployment Process When the workspace is ready the CLI constructs a deployment payload with your image reference, replica count, and resource hints, then polls every five seconds (for up to five minutes) for the deployment to reach a `running`/`ready` status. Upon success it prints handy follow-up steps, including a `curl` example and a reminder to inspect logs via `agnt5 logs`. ## Complete Example Workflow ```bash # 1. Check deployment plan agnt5 deploy --prod --dry-run --replicas 3 # 2. Build and deploy to production agnt5 deploy --prod --replicas 3 --cpu 1 --memory 1Gi --tags v1.2.0 # 3. Monitor deployment (see deployment visibility commands) agnt5 list --environment production agnt5 logs ``` Use `--dry-run` first to verify the deployment plan, then run the full command. Combine with `agnt5 list` and `agnt5 logs` to monitor the rollout. **Programmatic equivalents**: the CLI calls Control Plane HTTP endpoints — see [API reference](/api-reference/create-contact.md) for the request shapes --- ## Deployment Visibility Commands _Source: https://agnt5.com/cli_ > Monitor and inspect deployment history and logs Two top-level commands surface deployment information. They mirror Vercel's `vercel list` and `vercel logs`, but the current implementation is still a work in progress. Expect behavior to evolve as the Control Plane endpoints mature. **Commands**: `agnt5 list [--environment ] [--limit ] [--status ] [--all]` (alias `agnt5 ls`); `agnt5 logs [options]` **Status**: work-in-progress; both commands return a placeholder error today (`list functionality not yet implemented`) — use the Control Plane UI/API directly until the integration ships **Inspection alternatives**: query the Control Plane HTTP API or browse Studio **Work in Progress** Both commands are currently under development. They return placeholder errors today but will provide full deployment visibility once the Control Plane endpoints are stabilized. ### `agnt5 list`
Displays a list of deployments for the current project. The command accepts filters that will be wired to the Control Plane once the backing API stabilizes. ### Syntax ```bash agnt5 list [options] agnt5 ls [options] ``` ### Options | Flag | Description | | --- | --- | | `--environment` | Environment to inspect (default: `development`) | | `--limit` | Maximum number of deployments to show (default: `10`) | | `--status` | Filter by deployment status | | `--all` | Include deployments from every environment | **Current Status:** Returns a placeholder error today (`list functionality not yet implemented`). Use the Control Plane UI or APIs directly for detailed history until the CLI integration ships.
Basic listing Filtered results Environment-specific ```bash # List recent deployments agnt5 list ``` ``` Error: list functionality not yet implemented Use the Control Plane UI for deployment history. ``` ```bash # Short alias agnt5 ls ``` ```bash # Filter by status agnt5 list --status running --limit 20 ``` ``` Error: list functionality not yet implemented Expected output: ID | Status | Environment | Created | Image ------------|---------|-------------|------------|------- dep-abc123 | running | production | 2 min ago | v1.2.0 dep-def456 | running | staging | 1 hr ago | v1.1.8 ``` ```bash # List all deployments across environments agnt5 list --all ``` ```bash # Show production deployments agnt5 list --environment production ``` ``` Error: list functionality not yet implemented Expected output for production: ID | Status | Created | Image | Replicas ------------|---------|------------|--------|---------- dep-prod-01 | running | 2 min ago | v1.2.0 | 3/3 dep-prod-02 | stopped | 1 hr ago | v1.1.9 | 0/3 ```
### `agnt5 logs`
Streams logs for a specific deployment. When an ID is omitted the command will eventually default to the latest deployment for the current project. ### Syntax ```bash agnt5 logs [deployment-id] [options] ``` ### Options | Flag | Description | | --- | --- | | `--follow`, `-f` | Follow log output | | `--tail` | Number of lines to show from the end of the logs (default: `100`) | | `--since` | Timestamp or duration filter (e.g., `2h`, `30m`) | **Current Status:** Returns a placeholder error until log streaming is implemented. The CLI currently surfaces a message reminding you that a deployment ID is required.
Basic logs Live streaming Time filtering ```bash # Show logs for latest deployment agnt5 logs ``` ``` Error: deployment ID required Log streaming not yet implemented. Use the Control Plane UI for deployment logs. ``` ```bash # Show last 50 lines from recent logs agnt5 logs --tail 50 ``` ```bash # Stream logs for specific deployment agnt5 logs deploy-abc123 --follow ``` ``` Error: deployment ID required Expected behavior: 2024-01-15T10:30:45Z [INFO] Starting service... 2024-01-15T10:30:46Z [INFO] Handler registered: greet_user 2024-01-15T10:30:47Z [INFO] Service ready on port 8080 2024-01-15T10:30:48Z [INFO] Received request: greet_user ... (streaming continues) ``` ```bash # Follow logs with short flag agnt5 logs deploy-abc123 -f ``` ```bash # Show logs from last 2 hours agnt5 logs --since 2h ``` ``` Error: deployment ID required Expected filtered output: 2024-01-15T08:30:45Z [INFO] Service healthy 2024-01-15T08:35:12Z [INFO] Request processed 2024-01-15T09:15:33Z [WARN] High memory usage 2024-01-15T09:45:21Z [INFO] Memory usage normal ``` ```bash # Follow logs with time filter agnt5 logs deploy-abc123 -f --since 30m ```
## Planned Integration Once these commands are complete they will work hand-in-hand with `agnt5 deploy` to provide an end-to-end workflow: ### Example Future Workflow ```bash # 1. Deploy your project agnt5 deploy --prod # 2. List deployments to see the new one agnt5 list --environment production # 3. Monitor logs for the deployment agnt5 logs deploy-abc123 --follow # 4. Check deployment status agnt5 list --status running ``` ### Integration with Deployment Pipeline The visibility commands will integrate seamlessly with the deployment workflow: - **Post-deployment monitoring**: After `agnt5 deploy` completes, use `agnt5 logs` to monitor the new deployment - **Historical analysis**: Use `agnt5 list` to compare deployment performance over time - **Debugging**: Filter logs by time ranges to troubleshoot specific deployment issues - **Multi-environment visibility**: Compare deployments across development, staging, and production ### Expected Features When implementation is complete, expect: - **Rich filtering**: Filter deployments by status, environment, time range, and more - **Real-time updates**: Live log streaming with automatic reconnection - **Deployment details**: Full metadata about each deployment including build info, resource usage, and health status - **Integration hooks**: Commands will automatically detect the current project context and default to relevant deployments Until these commands are implemented, you can monitor deployments through the Control Plane UI or use the direct APIs for programmatic access. --- ## CLI Overview _Source: https://agnt5.com/cli_ > Command-line interface for developing, testing, and deploying AI workflows locally and in production **Binary**: `agnt5` **Install (macOS)**: `brew install agnt5/tap/agnt5` **Install (Linux)**: `curl -LsSf https://agnt5.com/cli.sh | bash` **Core commands**: `agnt5 init`, `agnt5 deploy`, `agnt5 logs`, `agnt5 config`, `agnt5 auth`, `agnt5 project` **Config home**: `~/.agnt5/` (credentials, context) ## Core Commands - `agnt5 init` - Scaffold new projects from templates - `agnt5 deploy` - Push to production or staging environments - `agnt5 logs` - Stream logs from any environment - `agnt5 config` - Manage environment variables and settings ## Installation macOS Linux Install the CLI using Homebrew: ```bash brew install agnt5/tap/agnt5 ``` Install using the install script: ```bash curl -LsSf https://agnt5.com/cli.sh | bash ``` **Configure your PATH** Add the CLI to your PATH after installation: ```bash echo 'export PATH="$HOME/.agnt5/bin:$PATH"' >> ~/.bashrc source ~/.bashrc ``` Use `~/.zshrc` if you're using zsh instead of bash. ### Verify Installation ```bash agnt5 --version ``` ## Quickstart ```bash # Initialize a new project agnt5 init my-workflow cd my-workflow # Authenticate and deploy agnt5 auth login agnt5 deploy ``` ## Upgrading Homebrew CLI {" "} ```bash brew upgrade agnt5 ``` ```bash agnt5 upgrade ``` --- ## Project Management Commands _Source: https://agnt5.com/cli_ > Create and manage AGNT5 projects with Control Plane integration The `agnt5 project` command family helps you create Control Plane projects, sync local configuration, and explore existing projects. Many operations require an API key because they call the Control Plane directly. **Commands**: `agnt5 project create [name] [--organization-id ]` (alias `agnt5 create`); other `agnt5 project` subcommands manage existing projects **Languages supported**: `python`, `typescript` **Required**: API key (`agnt5 auth login` first) **Side effects**: creates project in Control Plane; for Python projects scaffolds `app.py`, `src//functions.py`, etc.; writes `.agnt5/project-ref` for subsequent CLI calls ### `agnt5 project create`
Creates a new project remotely and, for Python projects, scaffolds a ready-to-run repository on disk. The command is also available as a top-level shortcut `agnt5 create`. ### Syntax ```bash agnt5 project create [project-name] [options] agnt5 create [project-name] [options] ``` ### Options | Flag | Description | | --- | --- | | `--organization-id` | Force the Control Plane organization to associate with the new project. | ### Flow Summary 1. Prompt (or accept a CLI argument) for the project name 2. Prompt for a language (`python` or `typescript`) 3. Resolve the organization ID or prompt to choose 4. Call the Control Plane to create the project 5. Generate full project skeleton for Python projects 6. Write `.agnt5/project-ref` for future CLI commands 7. Print next steps ### Python Project Scaffolding For Python projects, the CLI generates: - `app.py`, `src//functions.py` - `tests/__init__.py`, `tests/test_functions.py` - `pyproject.toml`, `agnt5.yaml`, `Dockerfile` - `.gitignore`, `README.md` Non-Python languages currently skip the scaffolding step but still create the remote project and update your local metadata.
Interactive create Python project With organization ```bash agnt5 project create ``` ``` ? Project name: my-ai-workflow ? Select language: ❯ python typescript ? Python version: 3.12 ? Select organization: ❯ Acme Corp (org-abc123) Personal (org-def456) Creating project... ✓ Project created successfully Project Details: ID: proj-xyz789 Name: my-ai-workflow Slug: my-ai-workflow Reference: acme/my-ai-workflow Status: active Next steps: cd my-ai-workflow pip install -r requirements.txt agnt5 deploy ``` ```bash agnt5 create python-workflow ``` ``` Creating project 'python-workflow'... ? Select language: python ? Python version: 3.12 ? Organization: Acme Corp (org-abc123) ✓ Project created in Control Plane ✓ Generating Python project structure... Generated files: ✓ app.py ✓ src/python_workflow/__init__.py ✓ src/python_workflow/functions.py ✓ tests/test_functions.py ✓ pyproject.toml ✓ agnt5.yaml ✓ Dockerfile ✓ .gitignore ✓ README.md ✓ .agnt5/project-ref created Project ready! Next steps: cd python-workflow pip install -e . agnt5 deploy ``` ```bash agnt5 create my-project --organization-id org-abc123 ``` ``` Creating project 'my-project'... Using organization: org-abc123 ? Select language: ❯ python typescript ✓ Project created successfully Project Details: ID: proj-123456 Organization: org-abc123 Reference: acme/my-project Project directory created with Python scaffolding. ```
### `agnt5 project list`
Lists projects visible to your API key in a simple table. Pagination, search, and ordering options are available. ### Syntax ```bash agnt5 project list [options] ``` ### Options | Flag | Description | | --- | --- | | `--page`, `-p` | Page number to request (default: `1`) | | `--page-size` | Number of projects per page (default: `20`) | | `--search`, `-s` | Filter by name, ref, or slug | | `--language`, `-l` | Filter by language (e.g., `python`) | | `--status` | Filter by status returned from the Control Plane | | `--order-by` | Sort column: `created`, `updated`, `name` (default: `updated`) | | `--order-dir` | Sort direction: `asc` or `desc` (default: `desc`) | The output includes truncated IDs, names, references, languages (with versions when available), statuses, and creation dates. Pagination metadata at the bottom shows how many results are displayed relative to the total.
Basic list Search & filter Pagination ```bash agnt5 project list ``` ``` Projects (showing 3 of 3 total): ID | Name | Reference | Language | Status | Updated ----------|------------------|---------------------|----------|---------|---------- proj-abc | my-ai-workflow | acme/my-ai-workflow | python | active | 2 days ago proj-def | data-processor | acme/data-processor | python | active | 1 week ago proj-ghi | api-service | acme/api-service | node | paused | 2 weeks ago Page 1 of 1 (3 projects total) ``` ```bash agnt5 project list --search "workflow" --language python ``` ``` Projects matching 'workflow' (language: python): ID | Name | Reference | Language | Status | Updated ----------|------------------|---------------------|----------|---------|---------- proj-abc | my-ai-workflow | acme/my-ai-workflow | python | active | 2 days ago proj-xyz | batch-workflow | acme/batch-workflow | python | active | 5 days ago Page 1 of 1 (2 projects found, 2 total) ``` ```bash agnt5 project list --page 2 --page-size 10 --order-by created --order-dir asc ``` ``` Projects (page 2, ordered by created date): ID | Name | Reference | Language | Status | Created ----------|------------------|---------------------|----------|---------|---------- proj-klm | analytics-api | acme/analytics-api | python | active | Jan 15 proj-nop | webhook-handler | acme/webhook-handler| node | active | Jan 18 proj-qrs | ml-pipeline | acme/ml-pipeline | python | active | Jan 20 Page 2 of 5 (30 projects total, showing 11-20) ```
### `agnt5 project info`
**Work in Progress** Placeholder command that will eventually print detailed metadata about the current project or a provided ID. For now it simply acknowledges the request. Use `agnt5 project list` to locate IDs until the implementation is complete. ### Syntax ```bash agnt5 project info [project-id] ``` This command will eventually provide comprehensive project details including configuration, deployment status, environment variables, and resource usage metrics.
Current project Specific project ```bash agnt5 project info ``` ``` Command acknowledged but not yet implemented. Use 'agnt5 project list' to view available projects. ``` ```bash agnt5 project info proj-123 ``` ``` Command acknowledged but not yet implemented. Use 'agnt5 project list' to view available projects. ```
### `agnt5 project init`
**Work in Progress** Reserved for initializing an existing directory as an AGNT5 project. The command currently prints a stub message. ### Syntax ```bash agnt5 project init ``` This command will eventually initialize an existing directory as an AGNT5 project by creating the necessary configuration files and project metadata.
```bash agnt5 project init ``` ``` Command acknowledged but not yet implemented. Use 'agnt5 project create' to create new projects. ```
## Local Project Metadata Most project-aware commands rely on two files created during `project create`: | File | Purpose | | --- | --- | | `.agnt5/project-ref` | Binds the working directory to a Control Plane project reference | | `agnt5.yaml` | Holds language, environment, deploy, and variable configuration for the deployment pipeline | If you clone an existing project make sure both files are present (or run `SaveProjectRef` / `SaveDeploymentProjectConfig` helper functions) before using `agnt5 deploy`. ### Project Reference File The `.agnt5/project-ref` file contains the Control Plane project reference that links your local directory to the remote project. This file is automatically created during `agnt5 project create` and is required for deployment commands. ### Deployment Configuration The `agnt5.yaml` file contains project-specific configuration used by the deployment pipeline: ```yaml # Example agnt5.yaml structure language: python version: "3.12" environment: development: # Development-specific config production: # Production-specific config deploy: # Deployment configuration variables: # Environment variables ``` --- ## Troubleshooting _Source: https://agnt5.com/cli_ > Common issues and solutions for the AGNT5 CLI Common issues and solutions when using the AGNT5 CLI. **Diagnostic commands**: `agnt5 --version`, `agnt5 auth whoami`, `agnt5 context` **Common failure modes**: missing CLI on PATH (`command not found`); missing/expired credentials; Docker daemon unreachable; wrong context for the target environment **Reset paths**: re-run `agnt5 auth login` to refresh credentials; `agnt5 context use ` to switch environment; reinstall via the platform's install method to fix PATH issues ## Installation Issues ### Command Not Found **Problem**: `agnt5: command not found` after installation. **Solutions**: 1. **Check if npm global packages are in PATH**: ```bash npm config get prefix echo $PATH ``` 2. **Reinstall the CLI**: ```bash npm uninstall -g agnt5 npm install -g agnt5 ``` 3. **Use npx as alternative**: ```bash npx agnt5 --version ``` 4. **Add npm global bin to PATH** (if missing): ```bash # Add to ~/.bashrc or ~/.zshrc export PATH=$PATH:$(npm config get prefix)/bin ``` ### Permission Denied **Problem**: Permission errors during installation on macOS/Linux. **Solutions**: 1. **Use a Node version manager** (recommended): ```bash # Install nvm curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh | bash nvm install 18 nvm use 18 npm install -g agnt5 ``` 2. **Change npm's default directory**: ```bash mkdir ~/.npm-global npm config set prefix '~/.npm-global' echo 'export PATH=~/.npm-global/bin:$PATH' >> ~/.bashrc source ~/.bashrc npm install -g agnt5 ``` 3. **Use sudo** (not recommended): ```bash sudo npm install -g agnt5 ``` ### Windows Installation Issues **Problem**: Installation fails on Windows. **Solutions**: 1. **Run as Administrator**: - Right-click Command Prompt/PowerShell - Select "Run as Administrator" - Run installation command 2. **Use Windows Subsystem for Linux (WSL)**: ```bash wsl --install # Then install in WSL environment ``` 3. **Use Chocolatey**: ```bash choco install nodejs npm install -g agnt5 ``` ## Authentication Issues ### Invalid API Key **Problem**: "Invalid API key" or "Unauthorized" errors. **Solutions**: 1. **Verify your API key**: ```bash agnt5 config get api-key ``` 2. **Reset and re-enter API key**: ```bash agnt5 config set api-key your-new-api-key ``` 3. **Use interactive login**: ```bash agnt5 auth login ``` 4. **Check environment variables**: ```bash echo $AGNT5_API_KEY ``` ### Token Expired **Problem**: Authentication token has expired. **Solutions**: 1. **Re-authenticate**: ```bash agnt5 auth logout agnt5 auth login ``` 2. **Generate new API key** from AGNT5 dashboard 3. **Update configuration**: ```bash agnt5 config set api-key your-new-key ``` ## Deployment Issues ### Deployment Timeout **Problem**: Deployments fail due to timeout. **Solutions**: 1. **Increase timeout**: ```bash agnt5 deploy --timeout 600000 # 10 minutes ``` 2. **Configure in project config**: ```javascript // agnt5.config.js module.exports = { deploy: { timeout: 600000 } }; ``` 3. **Check deployment status**: ```bash agnt5 status --env production --verbose ``` 4. **Use incremental deployment**: ```bash agnt5 deploy --incremental ``` ### Build Failures **Problem**: Build process fails during deployment. **Solutions**: 1. **Check build logs**: ```bash agnt5 build --verbose ``` 2. **Clean build cache**: ```bash agnt5 build --clean ``` 3. **Test build locally**: ```bash agnt5 build --env production ``` 4. **Check dependencies**: ```bash npm audit npm update ``` ## Network Issues ### Connection Timeout **Problem**: Commands fail with connection timeout. **Solutions**: 1. **Check network connectivity**: ```bash ping api.agnt5.com ``` 2. **Increase timeout**: ```bash agnt5 config set timeout 60000 ``` 3. **Configure proxy** (if behind corporate firewall): ```bash npm config set proxy http://proxy.company.com:8080 npm config set https-proxy http://proxy.company.com:8080 ``` 4. **Use different base URL**: ```bash agnt5 config set base-url https://api-eu.agnt5.com ``` ### SSL Certificate Issues **Problem**: SSL/TLS certificate errors. **Solutions**: 1. **Update Node.js** to latest stable version 2. **Use different registry** (temporary): ```bash npm config set registry https://registry.npmjs.org/ ``` 3. **Disable SSL verification** (not recommended for production): ```bash npm config set strict-ssl false ``` 4. **Update CA certificates**: ```bash # macOS brew update && brew upgrade ca-certificates # Ubuntu/Debian sudo apt-get update && sudo apt-get upgrade ca-certificates ``` ## Performance Issues ### Slow Commands **Problem**: CLI commands are running slowly. **Solutions**: 1. **Enable caching**: ```bash agnt5 config set cache.enabled true ``` 2. **Increase concurrency**: ```bash agnt5 config set concurrency 20 ``` 3. **Use local runtime** for development: ```bash agnt5 run workflow --local ``` 4. **Profile command execution**: ```bash agnt5 --verbose --profile run workflow ``` ### Memory Issues **Problem**: CLI process uses too much memory. **Solutions**: 1. **Increase Node.js memory limit**: ```bash export NODE_OPTIONS="--max-old-space-size=4096" agnt5 command ``` 2. **Reduce concurrency**: ```bash agnt5 config set workflows.concurrency 5 ``` 3. **Clear cache**: ```bash agnt5 cache clear ``` ## Configuration Issues ### Configuration Not Loading **Problem**: CLI ignores configuration files. **Solutions**: 1. **Verify config file location**: ```bash agnt5 config validate --verbose ``` 2. **Check config file syntax**: ```bash node -c agnt5.config.js ``` 3. **Use explicit config file**: ```bash agnt5 --config ./my-config.js command ``` 4. **Reset configuration**: ```bash agnt5 config reset ``` ## Getting Help ### Enable Debug Mode For detailed troubleshooting information: ```bash agnt5 --debug command agnt5 --verbose command ``` ### Check System Information ```bash agnt5 doctor --verbose ``` ### Validate Setup ```bash agnt5 config validate agnt5 auth whoami agnt5 status --verbose ``` ### Contact Support If you're still experiencing issues: 1. **Check the logs**: ```bash agnt5 logs --level error --tail 100 ``` 2. **Create a minimal reproduction case** 3. **Include system information**: ```bash agnt5 --version node --version npm --version echo $AGNT5_API_KEY | cut -c1-8 # First 8 chars only ``` 4. **Report the issue** with full error messages and steps to reproduce # Changelog --- ## Agent Memory and Context Management _Source: https://agnt5.com/changelog_ > Persistent conversation memory with automatic context window management Agents now maintain durable conversation history across sessions with intelligent context window management. No more context loss when conversations span days or exceed token limits. ## Automatic Context Summarization When conversation history approaches the LLM's token limit, AGNT5 automatically summarizes older messages while preserving recent exchanges verbatim: ```python @agent() class SupportAgent: async def handle_message(self, user_id: str, message: str): # Context automatically managed response = await self.chat(message) return response ``` The agent maintains full conversation history in durable storage. Recent messages stay intact for immediate context. Older messages get compressed through summarization. The LLM sees a seamless conversation thread that fits within token limits. ## Why Context Matters Long-running agent conversations — customer support, research assistants, coding copilots — require persistent memory. Users expect agents to remember previous interactions, not restart from scratch each session. With automatic context management, your agents scale to conversations of any length. The complexity of token counting, summarization, and history management becomes invisible. [Read the agent documentation](/docs/fundamentals/agents.md) for implementation details. --- ## Python SDK: Type-Safe Entity State _Source: https://agnt5.com/changelog_ > TypedDict support for entity state with full autocomplete and validation Entity state management now supports Python's `TypedDict`, bringing full type safety and IDE autocomplete to your durable entities. ### The Problem Previously, entity state was untyped — a plain dictionary that could hold any structure. This worked, but required manual validation and provided no IDE support: ```python @entity() class UserSession: async def update_preferences(self, key: str, value: any): # What fields exist in self.state? No autocomplete to help. self.state[key] = value ``` #### The Solution Define your state structure with `TypedDict`, and the SDK enforces it at runtime: ```python from typing import TypedDict class SessionState(TypedDict): user_id: str preferences: dict[str, str] last_active: int @entity() class UserSession: state: SessionState async def update_preference(self, key: str, value: str): # Full autocomplete on self.state.preferences self.state["preferences"][key] = value ``` Type checking happens automatically. Invalid state updates fail fast with clear error messages. Your IDE provides autocomplete for all state fields. Read the [entity documentation](/docs/fundamentals/entities.md) to learn more about type-safe state management. --- ## Improved Workflow State Persistence _Source: https://agnt5.com/changelog_ > Enhanced checkpoint recovery and deterministic replay for long-running workflows When workflows span hours or days, state persistence becomes critical. This release strengthens AGNT5's checkpoint recovery system to handle complex state transitions more reliably. ## What Changed We've redesigned how workflow state gets persisted during execution. Previously, checkpoints were created after each function invocation. Now, checkpoints capture the complete workflow context — including local variables, pending tasks, and execution history. ```python @workflow() async def research_pipeline(topic: str): # Checkpoint created here with full context sources = await gather_sources(topic) # If failure occurs here, workflow resumes with sources intact summaries = await summarize_sources(sources) return await synthesize_report(summaries) ``` This means when a workflow resumes after a failure, it picks up exactly where it left off. No re-execution of completed steps. No lost progress. ## Why This Matters Long-running AI workflows often fail mid-execution — API timeouts, rate limits, infrastructure issues. With enhanced checkpoint recovery, these failures no longer mean starting over. Your workflows become truly durable. Pause them. Resume them. Replay them with different code. The execution history is the source of truth. Learn more in the [workflow documentation](/docs/fundamentals/workflows.md).