AGNT5 Python SDK

Build AI agents and durable workflows with the AGNT5 Python SDK


Build AI agents and reliable workflows with automatic recovery. AGNT5 combines agent orchestration and fault-tolerant execution in one lightweight framework.

Primitives comparison

Function Entity Workflow Agent Tool
What Stateless operation with retries Stateful component with unique key Multi-step orchestrated process LLM with instructions and tools Python function LLMs can call
State None Isolated per entity key Isolated per workflow instance Conversation history via Entity None
Durability Automatic retries, checkpointing Persistent state across runs Checkpointed steps, resume on failure Context preserved in Entity Runs within agent context
Best For Document analysis, embeddings generation, LLM API calls AI chat sessions, agent memory, conversation history RAG pipelines, content generation with review, AI evals Customer support, research assistants, code review Vector search, knowledge base queries, API integrations

Key Features

  • Automatic recovery from failures with configurable retry policies
  • Checkpointing resumes from exact failure point
  • Multi-agent coordination via handoffs and composition
  • Python-native - decorators, async/await, type hints
  • Multi-provider - OpenAI, Anthropic, Groq, Azure, Bedrock, OpenRouter
  • Built-in tracing for debugging and monitoring

Installation

pip install agnt5

Quick example

from agnt5 import Agent, workflow, tool, Context, WorkflowContext

# Define a tool for the agent
@tool(auto_schema=True)
async def search_docs(ctx: Context, query: str) -> str:
    """Search documentation for answers."""
    # Your search logic here
    return f"Found documentation about: {query}"

# Create an AI agent with tools
agent = Agent(
    name="assistant",
    model="openai/gpt-4o-mini",
    instructions="You are a helpful assistant. Search docs when needed.",
    tools=[search_docs]
)

# Create a durable workflow that orchestrates the agent
@workflow
async def process_question(ctx: WorkflowContext, question: str) -> dict:
    """Durable workflow for processing questions."""

    # Step 1: Get answer from agent (checkpointed)
    answer = await ctx.step("get_answer", agent.run(question))

    # Step 2: Store result (checkpointed)
    await ctx.step("store", save_answer(question, answer))

    return {"question": question, "answer": answer}

# If this crashes after step 1, it resumes from step 2 on restart

Note: Set your OPENAI_API_KEY environment variable before running.

Next Steps

Getting Started

Core Primitives

  • Functions - Stateless operations with retries
  • Entities - Stateful components with unique keys
  • Workflows - Multi-step orchestration patterns
  • Context API - Orchestration, state, AI, and observability APIs

Agent Development Kit (ADK)

  • Agents - Autonomous LLM-driven systems
  • Sessions - Conversation containers and multi-agent coordination
  • Tools - Callable capabilities that extend agent abilities
  • Memory - Long-term knowledge storage with semantic search

Examples



Getting Started

Get up and running with the AGNT5 Python SDK in minutes. This guide covers installation, your first worker, and local development setup.

Installation

System Requirements

  • Python 3.8 or higher
  • pip or uv package manager
  • Docker (for local development server)

Install from PyPI

pip install agnt5

Development Installation

For development or contributing to the SDK:

git clone https://github.com/agnt5/agnt5
cd agnt5/sdk/sdk-python
pip install -e .

Verify Installation

import agnt5
print(agnt5.__version__)

First Worker

Create a simple worker with a greeting function:

import asyncio
from agnt5 import Worker, function

@function()
def greet(name: str) -> str:
    """Greet a user by name."""
    return f"Hello, {name}!"

@function("math_add")
def add_numbers(a: int, b: int) -> int:
    """Add two numbers together."""
    return a + b

async def main():
    worker = Worker(service_name="hello-service")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Local Development Setup

Start the AGNT5 Development Server

The development server runs all AGNT5 services locally in Docker:

Run Your Worker

In a new terminal, run your worker:

python worker.py

You should see output like:

INFO:agnt5.worker:Starting worker for service: hello-service
INFO:agnt5.worker:Registered function: greet
INFO:agnt5.worker:Registered function: math_add
INFO:agnt5.worker:Worker running, waiting for tasks...

Test Your Functions

Using HTTP API

Test your functions using the Gateway HTTP API:

# Test the greet function
curl -X POST http://localhost:8080/call \
  -H "Content-Type: application/json" \
  -d '{
    "serviceName": "hello-service",
    "handlerName": "greet",
    "inputData": "QWxpY2U="
  }'

The inputData is base64-encoded JSON. For "Alice", the base64 is "QWxpY2U=".

Using Python Client

import asyncio
import json
import base64
from agnt5 import Client

async def test_functions():
    client = Client("http://localhost:8080")

    # Test greet function
    name = "Alice"
    input_data = base64.b64encode(json.dumps(name).encode()).decode()

    result = await client.call(
        service_name="hello-service",
        handler_name="greet",
        input_data=input_data
    )

    print(f"Greeting result: {result}")

    # Test add function
    numbers = {"a": 5, "b": 3}
    input_data = base64.b64encode(json.dumps(numbers).encode()).decode()

    result = await client.call(
        service_name="hello-service",
        handler_name="math_add",
        input_data=input_data
    )

    print(f"Addition result: {result}")

asyncio.run(test_functions())

ASGI Integration

For web applications, use the ASGI runtime:

from agnt5 import Worker, function

@function()
def api_handler(request: dict) -> dict:
    return {
        "message": "Hello from AGNT5!",
        "received": request
    }

# Create ASGI application
app = Worker("web-service", runtime="asgi")
app.enable_cors()  # Enable CORS for browser access

Run with uvicorn:

pip install uvicorn
uvicorn asgi_app:app --reload --port 8000

Test the ASGI endpoints:

# Health check
curl http://localhost:8000/health

# List available functions
curl http://localhost:8000/functions

# Call a function
curl -X POST http://localhost:8000/invoke/api_handler \
  -H "Content-Type: application/json" \
  -d '{"test": "data"}'

Configuration

Environment Variables

Configure your worker using environment variables:

export AGNT5_COORDINATOR_ENDPOINT=http://localhost:9091
export AGNT5_SERVICE_NAME=my-service
export AGNT5_LOG_LEVEL=DEBUG

python worker.py

Configuration in Code

import logging
from agnt5 import Worker
from agnt5.logging import install_opentelemetry_logging

# Configure logging
logging.basicConfig(level=logging.INFO)
install_opentelemetry_logging()

# Create worker with custom configuration
worker = Worker(
    service_name="configured-service",
    service_version="1.2.0",
    coordinator_endpoint="http://localhost:9091",
    runtime="standalone"
)

Error Handling

Handle errors gracefully in your functions:

from agnt5 import function
import logging

logger = logging.getLogger(__name__)

@function()
def safe_divide(a: float, b: float) -> dict:
    try:
        if b == 0:
            return {"error": "Division by zero", "result": None}

        result = a / b
        logger.info(f"Division successful: {a} / {b} = {result}")

        return {"result": result, "error": None}

    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {"error": str(e), "result": None}

Development Tips

Hot Reload

During development, restart your worker when code changes:

import asyncio
import sys
from pathlib import Path
from agnt5 import Worker, function

# Add auto-reload during development
if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nWorker stopped")
        sys.exit(0)

Debugging

Enable debug logging to see detailed execution information:

import logging
logging.basicConfig(level=logging.DEBUG)

from agnt5.logging import install_opentelemetry_logging
install_opentelemetry_logging(level=logging.DEBUG)

Testing Functions

Test your functions locally without the full platform:

from agnt5.decorators import execute_component

# Test function directly
result = execute_component("greet", b'{"name": "Alice"}')
print(result)

Next Steps

Core Primitives

Agent Development Kit

  • Agents - Autonomous LLM-driven systems
  • Tools - Extend agent capabilities
  • Sessions - Conversation management
  • Memory - Long-term knowledge storage

Configuration


Functions

Functions are the core building blocks of AGNT5 applications. Use the @function decorator to register Python callables as invokable components that can be discovered and executed by the platform.

Basic Usage

Simple Function

from agnt5 import function

@function()
def greet_user(name: str) -> str:
    """Greet a user by name."""
    return f"Hello, {name}!"

Named Function

Override the registered name:

@function("math.add")
def add_numbers(a: int, b: int) -> int:
    """Add two numbers together."""
    return a + b

Function with Context

Access execution metadata through the context parameter:

from agnt5 import function
from agnt5.components import ExecutionContext

@function()
def context_aware(ctx: ExecutionContext, data: dict) -> dict:
    """Process data with execution context."""
    return {
        "invocation_id": ctx.invocation_id,
        "service_name": ctx.metadata.get("service_name"),
        "processed_data": data,
        "component_type": ctx.component_type.value
    }

Decorator Parameters

function(name=None)

Parameter Type Description
name str | None Override the registered function name. Defaults to the original function name.
# Uses function name "process_data"
@function()
def process_data(data: dict) -> dict:
    return data

# Uses custom name "data_processor"
@function("data_processor")
def process_data(data: dict) -> dict:
    return data

Handler Signatures

AGNT5 supports flexible function signatures to accommodate different use cases.

Without Context

For simple stateless functions:

@function()
def calculate_tax(amount: float, rate: float) -> float:
    return amount * rate

@function()
def format_message(template: str, **kwargs) -> str:
    return template.format(**kwargs)

With Context

When you need access to invocation metadata:

@function()
def audit_handler(ctx: ExecutionContext, action: str, data: dict) -> dict:
    """Handler that logs audit information."""
    import logging

    logger = logging.getLogger(__name__)
    logger.info(f"Audit: {action} from {ctx.invocation_id}")

    return {
        "action": action,
        "invocation_id": ctx.invocation_id,
        "data": data,
        "timestamp": time.time()
    }

Async Functions

AGNT5 supports both synchronous and asynchronous functions:

Async Handler

import asyncio
from agnt5 import function

@function()
async def async_processor(data: dict) -> dict:
    """Async processing with I/O operations."""
    # Simulate async I/O
    await asyncio.sleep(0.1)

    # Async HTTP request example
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://api.example.com/process",
            json=data
        )
        external_result = response.json()

    return {
        "original": data,
        "external": external_result,
        "processed_at": datetime.utcnow().isoformat()
    }

Async with Context

@function()
async def async_context_handler(ctx: ExecutionContext, query: str) -> dict:
    """Async handler with context access."""

    # Use context for correlation
    correlation_id = ctx.metadata.get("correlation_id", ctx.invocation_id)

    # Async database query
    result = await database.execute(
        "SELECT * FROM items WHERE name LIKE %s",
        f"%{query}%"
    )

    return {
        "correlation_id": correlation_id,
        "query": query,
        "results": [dict(row) for row in result]
    }

Streaming Functions

For functions that need to return multiple responses over time:

Basic Streaming

from agnt5 import function

@function(streaming=True)
async def stream_data(count: int):
    """Stream multiple data chunks."""
    for i in range(count):
        yield {
            "chunk": i,
            "data": f"Data chunk {i}",
            "timestamp": time.time()
        }
        await asyncio.sleep(0.1)  # Simulate processing time

Streaming with Context

@function(streaming=True)
async def stream_with_context(ctx: ExecutionContext, query: str):
    """Stream search results progressively."""
    search_id = ctx.invocation_id

    # Stream results as they're found
    async for result in search_engine.stream_search(query):
        yield {
            "search_id": search_id,
            "result": result,
            "timestamp": time.time()
        }

Function Metadata

The SDK automatically captures and provides metadata about registered functions:

Inspecting Functions

from agnt5.decorators import get_registered_functions, get_function_metadata

# Get all registered functions
functions = get_registered_functions()
print(f"Registered functions: {list(functions.keys())}")

# Get metadata for a specific function
@function()
def sample_function(name: str, age: int = 25) -> dict:
    return {"name": name, "age": age}

metadata = get_function_metadata(sample_function)
print(metadata)

Output:

{
    "name": "sample_function",
    "type": "function",
    "parameters": [
        {"name": "name", "type": "str", "required": True},
        {"name": "age", "type": "int", "required": False, "default": 25}
    ],
    "return_type": "dict"
}

Runtime Annotations

The decorator adds runtime annotations to functions:

@function()
def annotated_function(data: str) -> str:
    return data.upper()

# Check annotations
print(annotated_function._agnt5_handler_name)  # "annotated_function"
print(annotated_function._agnt5_is_function)   # True

Error Handling

Basic Error Handling

@function()
def safe_divider(a: float, b: float) -> dict:
    """Safely divide two numbers."""
    try:
        if b == 0:
            return {"error": "Division by zero", "result": None}

        result = a / b
        return {"result": result, "error": None}

    except Exception as e:
        return {"error": str(e), "result": None}

Context-Aware Error Handling

import logging
from agnt5 import function
from agnt5.components import ExecutionContext

@function()
def robust_handler(ctx: ExecutionContext, data: dict) -> dict:
    """Handler with comprehensive error handling."""
    logger = logging.getLogger(__name__)

    try:
        # Log the invocation
        logger.info(f"Processing invocation {ctx.invocation_id}")

        # Validate input
        if not isinstance(data, dict):
            raise ValueError("Input must be a dictionary")

        required_fields = ["id", "name"]
        missing_fields = [field for field in required_fields if field not in data]
        if missing_fields:
            raise ValueError(f"Missing required fields: {missing_fields}")

        # Process data
        result = {
            "processed": True,
            "id": data["id"],
            "name": data["name"].upper(),
            "invocation_id": ctx.invocation_id
        }

        logger.info(f"Successfully processed {data['id']}")
        return result

    except ValueError as e:
        logger.warning(f"Validation error in {ctx.invocation_id}: {e}")
        return {"error": f"Validation error: {e}", "result": None}

    except Exception as e:
        logger.error(f"Unexpected error in {ctx.invocation_id}: {e}")
        return {"error": "Internal error", "result": None}

Type Annotations

Use Python type hints for better documentation and validation:

Basic Types

from typing import Dict, List, Optional, Union

@function()
def typed_handler(
    name: str,
    age: int,
    tags: List[str],
    metadata: Optional[Dict[str, any]] = None
) -> Dict[str, Union[str, int, List[str]]]:
    """Handler with comprehensive type annotations."""
    return {
        "name": name,
        "age": age,
        "tags": tags,
        "has_metadata": metadata is not None
    }

Pydantic Models

For complex data validation:

from pydantic import BaseModel, Field
from typing import Optional
from agnt5 import function

class UserRequest(BaseModel):
    name: str = Field(..., min_length=1, max_length=100)
    email: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    age: Optional[int] = Field(None, ge=0, le=150)

class UserResponse(BaseModel):
    id: str
    name: str
    email: str
    age: Optional[int]
    created_at: str

@function()
def create_user(request: UserRequest) -> UserResponse:
    """Create a user with validation."""
    user_id = generate_user_id()

    return UserResponse(
        id=user_id,
        name=request.name,
        email=request.email,
        age=request.age,
        created_at=datetime.utcnow().isoformat()
    )

Testing Functions

Direct Testing

Test functions directly without the full platform:

import pytest
from agnt5.decorators import execute_component
from agnt5.components import ExecutionContext, ComponentType

def test_greet_function():
    # Test with execute_component
    result = execute_component(
        "greet_user",
        b'{"name": "Alice"}',
        context=None
    )

    # Result is JSON bytes
    import json
    parsed = json.loads(result.decode())
    assert parsed == "Hello, Alice!"

def test_context_function():
    # Create mock context
    ctx = ExecutionContext(
        invocation_id="test-123",
        component_type=ComponentType.FUNCTION
    )

    # Test directly
    result = context_aware(ctx, {"test": "data"})
    assert result["invocation_id"] == "test-123"

Async Testing

import pytest
import asyncio

@pytest.mark.asyncio
async def test_async_function():
    result = await async_processor({"test": "data"})
    assert "original" in result
    assert "processed_at" in result

Mock Context Testing

from unittest.mock import Mock

def test_with_mock_context():
    # Create mock context
    mock_ctx = Mock(spec=ExecutionContext)
    mock_ctx.invocation_id = "mock-123"
    mock_ctx.component_type = ComponentType.FUNCTION
    mock_ctx.metadata = {"service_name": "test-service"}

    # Test function
    result = context_aware(mock_ctx, {"test": "data"})
    assert result["invocation_id"] == "mock-123"
    assert result["service_name"] == "test-service"

Function Registry

Registry Management

from agnt5.decorators import (
    get_registered_functions,
    clear_registry,
    get_function_metadata
)

# Get all registered functions
functions = get_registered_functions()

# Clear registry (useful for testing)
clear_registry()

# Re-register functions
@function()
def new_function(data: str) -> str:
    return data.upper()

# Inspect metadata
metadata = get_function_metadata(new_function)

Custom Registration

For advanced use cases, register functions manually:

from agnt5.decorators import register_function

def my_handler(data: str) -> str:
    return data.lower()

# Manual registration
register_function("custom_handler", my_handler)

Best Practices

Function Design

  1. Keep functions focused - Each function should have a single responsibility
  2. Use type hints - Improve documentation and enable validation
  3. Handle errors gracefully - Return error information rather than raising exceptions
  4. Log appropriately - Use structured logging for debugging and monitoring

Performance

  1. Minimize imports - Import only what you need
  2. Use async for I/O - Async functions for database queries and API calls
  3. Cache expensive operations - Use local caching for repeated computations
  4. Batch operations - Process multiple items together when possible

Testing

  1. Test functions directly - Unit test without the platform
  2. Mock external dependencies - Use mocks for databases, APIs, etc.
  3. Test error conditions - Ensure error handling works correctly
  4. Use fixtures - Share common test data and setup

Next Steps


Entities

Entities are stateful components identified by unique keys. Use entities to model AI agents with conversation history, workflow orchestrators, or any business object that maintains state across interactions.

Key Characteristics

  • Unique Key - Each instance identified by a unique key (e.g., agent-conv-123)
  • Private State - Built-in key-value storage per instance
  • Single-Writer - Automatic consistency - only one write operation per key at a time
  • Durable - State survives crashes and restarts
  • Scalable - Different keys execute in parallel

Implementation Status

Entities are being implemented in Phase 2 of AGNT5 (Target: Q1 2025). The API shown represents the planned design. Check current SDK status for availability.

Basic Usage

Creating an Entity

from agnt5 import entity

# Create entity type
agent = entity("ConversationAgent")

# Write method (exclusive access per key)
@agent.write
async def send_message(ctx, message: str) -> dict:
    history = await ctx.get("history", [])
    history.append({"role": "user", "content": message})

    response = await call_llm(history)
    history.append({"role": "assistant", "content": response})

    ctx.set("history", history)
    return {"response": response}

# Shared method (read-only, concurrent)
@agent.shared
async def get_history(ctx) -> list:
    return await ctx.get("history", [])

Calling Entities

Call entity methods from functions:

from agnt5 import function

@function
async def chat(ctx, conv_id: str, msg: str):
    # Call entity method with unique key
    return await ctx.entity("ConversationAgent", conv_id).send_message(msg)

Entity API

Core Methods

API Description
entity("name") Create entity type
@entity.write Write method (exclusive per key)
@entity.shared Shared method (read-only, concurrent)
ctx.get(key, default) Get state value
ctx.set(key, value) Set state value
ctx.delete(key) Delete state key
ctx.entity(type, key).method() Call entity from function

State Operations

Common Patterns

Conversational AI Agent

agent = entity("ChatAgent")

@agent.write
async def send_message(ctx, message: str) -> dict:
    """Handle conversational turns with LLM."""
    history = await ctx.get("history", [])
    history.append({"role": "user", "content": message})

    # Generate response
    response = await ctx.llm.generate(
        prompt=history,
        model="gpt-4"
    )
    history.append({"role": "assistant", "content": response.text})

    # Keep last 20 messages
    if len(history) > 20:
        history = history[-20:]

    ctx.set("history", history)
    return {"response": response.text}

@agent.shared
async def get_history(ctx) -> list:
    """Get conversation history (read-only)."""
    return await ctx.get("history", [])

@agent.shared
async def get_message_count(ctx) -> int:
    """Get total message count."""
    history = await ctx.get("history", [])
    return len(history)

Usage:

@function
async def chat_endpoint(ctx, conversation_id: str, message: str):
    # Call entity with unique conversation ID
    return await ctx.entity("ChatAgent", conversation_id).send_message(message)

Research Agent

research_agent = entity("ResearchAgent")

@research_agent.write
async def start_research(ctx, topic: str) -> dict:
    """Initialize research task."""
    ctx.set("topic", topic)
    ctx.set("findings", [])
    ctx.set("status", "in_progress")
    return {"status": "started", "topic": topic}

@research_agent.write
async def add_finding(ctx, finding: str, source: str) -> dict:
    """Add research finding."""
    findings = await ctx.get("findings", [])
    findings.append({
        "content": finding,
        "source": source,
        "timestamp": datetime.now().isoformat()
    })
    ctx.set("findings", findings)
    return {"count": len(findings)}

@research_agent.write
async def synthesize(ctx) -> dict:
    """Generate summary from findings."""
    findings = await ctx.get("findings", [])
    topic = await ctx.get("topic")

    # Use LLM to synthesize
    summary = await ctx.llm.generate(
        prompt=f"Synthesize these findings about {topic}: {findings}",
        model="gpt-4"
    )

    ctx.set("summary", summary.text)
    ctx.set("status", "completed")
    return {"summary": summary.text}

@research_agent.shared
async def get_progress(ctx) -> dict:
    """Check research progress."""
    return {
        "status": await ctx.get("status"),
        "topic": await ctx.get("topic"),
        "findings_count": len(await ctx.get("findings", []))
    }

Workflow Orchestrator

workflow = entity("WorkflowOrchestrator")

@workflow.write
async def start(ctx, steps: list) -> dict:
    """Start workflow execution."""
    ctx.set("steps", steps)
    ctx.set("current_step", 0)
    ctx.set("results", [])
    ctx.set("status", "running")
    return {"status": "started", "total_steps": len(steps)}

@workflow.write
async def complete_step(ctx, result: dict) -> dict:
    """Mark step as complete and store result."""
    results = await ctx.get("results", [])
    results.append(result)
    ctx.set("results", results)

    current = len(results)
    ctx.set("current_step", current)

    # Check if workflow is done
    steps = await ctx.get("steps", [])
    if current >= len(steps):
        ctx.set("status", "completed")

    return {"completed": current, "total": len(steps)}

@workflow.shared
async def get_progress(ctx) -> dict:
    """Get workflow progress."""
    return {
        "current_step": await ctx.get("current_step", 0),
        "total_steps": len(await ctx.get("steps", [])),
        "status": await ctx.get("status", "unknown")
    }

Consistency & Concurrency

Single-Writer Per Key

Only one write operation per entity key executes at a time:

# Same key = serial execution (consistency guaranteed)
await ctx.entity("agent", "conv-1").send_message("msg1")  # Runs first
await ctx.entity("agent", "conv-1").send_message("msg2")  # Runs second

# No race conditions, no lost updates

Parallel Execution Across Keys

Different entity keys execute in parallel:

# Different keys = parallel execution (scales horizontally)
await ctx.entity("agent", "conv-1").send_message(msg)  # Parallel
await ctx.entity("agent", "conv-2").send_message(msg)  # Parallel
await ctx.entity("agent", "conv-3").send_message(msg)  # Parallel

Shared Methods for Reads

Use @entity.shared for read-only operations that can run concurrently:

# Multiple shared calls can run in parallel for same key
@agent.shared
async def get_history(ctx) -> list:
    return await ctx.get("history", [])

# These execute concurrently
await ctx.entity("agent", "conv-1").get_history()  # Concurrent
await ctx.entity("agent", "conv-1").get_history()  # Concurrent

Best Practices

1. Choose Stable, Meaningful Keys

Use unique, stable identifiers for entity keys:

2. Design for Concurrency

Choose key granularity for optimal parallelism:

# ✓ Good - One entity per conversation
await ctx.entity("ChatAgent", f"conv-{conv_id}").send_message(msg)

# ✗ Bad - Single global entity (serializes everything)
await ctx.entity("ChatAgent", "global").send_message(msg)

3. Use Shared for Read Operations

Enable concurrent reads with @entity.shared:

# Write methods - exclusive access
@agent.write
async def update_state(ctx, data: dict):
    ctx.set("state", data)

# Read methods - concurrent access
@agent.shared
async def get_state(ctx) -> dict:
    return await ctx.get("state", {})

4. Keep State Minimal

Store only what you need:

# ✓ Good - Essential state only
ctx.set("history", recent_messages[-20:])
ctx.set("summary", summary_text)

# ✗ Avoid - Excessive state
ctx.set("full_transcript", all_messages)  # Could be huge
ctx.set("raw_responses", all_llm_responses)  # Redundant

Entity Use Cases

Use Case Entity Key State Stored
AI Chat Agent agent-conv-{id} Conversation history, context
Research Task research-{task_id} Findings, sources, summary
Workflow Orchestrator workflow-{run_id} Step progress, results
User Context user-{user_id} Preferences, personalization
Shopping Cart cart-{session_id} Items, totals, discounts
Game Session game-{session_id} Player state, score, progress

Functions vs Entities

Aspect Functions Entities
State Stateless Stateful (KV store)
Identity No identity Unique key per instance
Concurrency Parallel by default Serial per key, parallel across keys
Consistency No consistency needed Single-writer guarantee
Use Case Transformations, API calls Stateful AI agents, workflows

When to use Functions:

  • Stateless operations
  • Independent requests
  • Data transformations
  • API integrations

When to use Entities:

  • Stateful AI agents with memory
  • Workflow orchestration
  • User sessions and context
  • Any state that needs consistency

Next Steps


Workflows

Workflows enable durable, multi-step orchestration with automatic recovery, state persistence, and complex dependency management. Built on the AGNT5 orchestration plane for exactly-once execution guarantees.

Basic Workflow

Simple Sequential Workflow

from agnt5 import workflow, task_step
from agnt5.workflows import FlowDefinition

@workflow()
def data_pipeline() -> FlowDefinition:
    return FlowDefinition([
        task_step(
            name="extract",
            service_name="etl-service",
            handler_name="extract_data"
        ),
        task_step(
            name="transform",
            service_name="etl-service",
            handler_name="transform_data",
            dependencies=["extract"]
        ),
        task_step(
            name="load",
            service_name="etl-service",
            handler_name="load_data",
            dependencies=["transform"]
        )
    ])

Parallel Execution

Steps without dependencies execute in parallel:

@workflow()
def parallel_processing() -> FlowDefinition:
    return FlowDefinition([
        # These three steps run in parallel
        task_step(
            name="process_a",
            service_name="service",
            handler_name="process_type_a"
        ),
        task_step(
            name="process_b",
            service_name="service",
            handler_name="process_type_b"
        ),
        task_step(
            name="process_c",
            service_name="service",
            handler_name="process_type_c"
        ),
        # This step waits for all three to complete
        task_step(
            name="merge_results",
            service_name="service",
            handler_name="merge_results",
            dependencies=["process_a", "process_b", "process_c"]
        )
    ])

Step Types

Task Steps

Execute function handlers on services:

from agnt5.workflows import task_step

# Basic task step
step = task_step(
    name="unique_step_name",
    service_name="my-service",
    handler_name="my_handler"
)

# Task with dependencies and input data
step = task_step(
    name="dependent_step",
    service_name="my-service",
    handler_name="process_data",
    dependencies=["previous_step"],
    input_data={"config": "production", "batch_size": 100}
)

# Task with object keys (Phase 2)
step = task_step(
    name="object_step",
    service_name="object-service",
    handler_name="update_state",
    dependencies=["init_step"],
    object_keys=["user:123", "cart:456"]
)

Wait Signal Steps

Pause execution until external signals:

from agnt5.workflows import wait_signal_step

# Basic signal wait
step = wait_signal_step(
    name="wait_for_approval",
    signal_name="approval_granted",
    dependencies=["review_step"]
)

# Signal wait with timeout
step = wait_signal_step(
    name="wait_with_timeout",
    signal_name="user_action",
    dependencies=["prompt_user"],
    timeout_ms=300000,  # 5 minutes
    on_timeout="timeout_handler_step"
)

Wait Timer Steps

Scheduled delays and cron-based execution:

from agnt5.workflows import wait_timer_step

# Fixed delay
step = wait_timer_step(
    name="delay_before_retry",
    timer_key="retry_delay",
    delay_ms=30000,  # 30 seconds
    dependencies=["failed_step"]
)

# Cron schedule
step = wait_timer_step(
    name="nightly_batch",
    timer_key="nightly",
    cron_expr="0 2 * * *",  # 2 AM daily
    dependencies=["prep_step"]
)

# Timer with retries
step = wait_timer_step(
    name="retry_with_backoff",
    timer_key="exponential_backoff",
    delay_ms=5000,
    max_retries=3,
    dependencies=["error_step"]
)

Workflow Examples

ETL Pipeline

@workflow()
def nightly_etl() -> FlowDefinition:
    return FlowDefinition([
        # Start with data validation
        task_step(
            name="validate_sources",
            service_name="etl-service",
            handler_name="validate_data_sources"
        ),

        # Extract from multiple sources in parallel
        task_step(
            name="extract_database",
            service_name="etl-service",
            handler_name="extract_from_database",
            dependencies=["validate_sources"]
        ),
        task_step(
            name="extract_api",
            service_name="etl-service",
            handler_name="extract_from_api",
            dependencies=["validate_sources"]
        ),
        task_step(
            name="extract_files",
            service_name="etl-service",
            handler_name="extract_from_files",
            dependencies=["validate_sources"]
        ),

        # Wait for all extractions to complete
        task_step(
            name="merge_extracted_data",
            service_name="etl-service",
            handler_name="merge_data",
            dependencies=["extract_database", "extract_api", "extract_files"]
        ),

        # Transform data
        task_step(
            name="clean_data",
            service_name="etl-service",
            handler_name="clean_and_normalize",
            dependencies=["merge_extracted_data"]
        ),
        task_step(
            name="enrich_data",
            service_name="etl-service",
            handler_name="enrich_with_metadata",
            dependencies=["clean_data"]
        ),

        # Wait for maintenance window
        wait_timer_step(
            name="wait_for_maintenance_window",
            timer_key="maintenance",
            cron_expr="0 3 * * *",  # 3 AM
            dependencies=["enrich_data"]
        ),

        # Load data
        task_step(
            name="load_to_warehouse",
            service_name="etl-service",
            handler_name="load_data_warehouse",
            dependencies=["wait_for_maintenance_window"]
        ),

        # Generate reports
        task_step(
            name="generate_reports",
            service_name="reporting-service",
            handler_name="generate_daily_reports",
            dependencies=["load_to_warehouse"]
        )
    ])

Approval Workflow

@workflow()
def document_approval() -> FlowDefinition:
    return FlowDefinition([
        # Submit document for review
        task_step(
            name="submit_document",
            service_name="doc-service",
            handler_name="submit_for_review",
            input_data={"priority": "normal"}
        ),

        # Notify reviewers
        task_step(
            name="notify_reviewers",
            service_name="notification-service",
            handler_name="send_review_notifications",
            dependencies=["submit_document"]
        ),

        # Wait for approval (with timeout)
        wait_signal_step(
            name="wait_for_approval",
            signal_name="document_approved",
            dependencies=["notify_reviewers"],
            timeout_ms=172800000,  # 48 hours
            on_timeout="escalate_approval"
        ),

        # Escalation path
        task_step(
            name="escalate_approval",
            service_name="doc-service",
            handler_name="escalate_to_manager",
            # No dependencies - triggered by timeout
        ),

        # Wait for escalated approval
        wait_signal_step(
            name="wait_escalated_approval",
            signal_name="escalated_approval",
            dependencies=["escalate_approval"],
            timeout_ms=86400000  # 24 hours
        ),

        # Publish approved document
        task_step(
            name="publish_document",
            service_name="doc-service",
            handler_name="publish_approved_document",
            dependencies=["wait_for_approval", "wait_escalated_approval"]
        )
    ])

Workflow Registration

Using the Decorator

from agnt5 import workflow
from agnt5.workflows import get_registered_workflows

@workflow()
def my_workflow() -> FlowDefinition:
    return FlowDefinition([...])

@workflow("custom_name")
def workflow_with_custom_name() -> FlowDefinition:
    return FlowDefinition([...])

# Inspect registered workflows
workflows = get_registered_workflows()
print(f"Registered workflows: {list(workflows.keys())}")

Manual Registration

from agnt5.workflows import register_workflow

def create_workflow_definition() -> FlowDefinition:
    return FlowDefinition([
        task_step("step1", service_name="service", handler_name="handler1"),
        task_step("step2", service_name="service", handler_name="handler2",
                 dependencies=["step1"])
    ])

# Manual registration
register_workflow("manual_workflow", create_workflow_definition())

Workflow Data Classes

FlowDefinition

Container for workflow steps with serialization:

from agnt5.workflows import FlowDefinition

# Create definition
flow = FlowDefinition([
    task_step("step1", service_name="svc", handler_name="h1"),
    task_step("step2", service_name="svc", handler_name="h2", dependencies=["step1"])
])

# Serialize to dictionary
flow_dict = flow.to_dict()

# Serialize to JSON string
flow_json = flow.to_json()

WorkflowStep

Individual step configuration:

from agnt5.workflows import WorkflowStep, StepType

# Manual step creation (usually use helper functions instead)
step = WorkflowStep(
    name="custom_step",
    step_type=StepType.TASK,
    service_name="my-service",
    handler_name="my_handler",
    dependencies=["previous_step"],
    input_data={"key": "value"}
)

Configuration Classes

from agnt5.workflows import SignalConfig, TimerConfig

# Signal configuration
signal_config = SignalConfig(
    name="approval_signal",
    timeout_ms=3600000,  # 1 hour
    on_timeout="timeout_step"
)

# Timer configuration
timer_config = TimerConfig(
    key="batch_timer",
    delay_ms=60000,  # 1 minute
    max_retries=5
)

# Cron timer configuration
cron_timer = TimerConfig(
    key="daily_job",
    cron_expr="0 0 * * *",  # Daily at midnight
    max_retries=3
)

Validation and Error Handling

Workflow Validation

AGNT5 validates workflows during registration:

from agnt5.workflows import register_workflow, FlowDefinition, task_step

# This will raise ValueError: Missing dependencies
try:
    invalid_flow = FlowDefinition([
        task_step("step2", service_name="svc", handler_name="h2",
                 dependencies=["step1"]),  # step1 doesn't exist
        task_step("step3", service_name="svc", handler_name="h3",
                 dependencies=["step2"])
    ])
    register_workflow("invalid", invalid_flow)
except ValueError as e:
    print(f"Validation error: {e}")

Validation Rules

  • At least one step must be defined
  • Step names must be unique within the workflow
  • Dependencies must reference existing steps
  • Dependencies must appear earlier in the definition (causal order)
  • Required fields must be populated based on step type

Testing Workflows

Workflow Definition Testing

import pytest
from agnt5.workflows import FlowDefinition, task_step, get_registered_workflows

def test_workflow_definition():
    # Test workflow structure
    flow = FlowDefinition([
        task_step("extract", service_name="etl", handler_name="extract"),
        task_step("transform", service_name="etl", handler_name="transform",
                 dependencies=["extract"])
    ])

    # Verify serialization
    flow_dict = flow.to_dict()
    assert len(flow_dict["steps"]) == 2
    assert flow_dict["steps"][1]["dependencies"] == ["extract"]

def test_workflow_registration():
    @workflow()
    def test_workflow() -> FlowDefinition:
        return FlowDefinition([
            task_step("test_step", service_name="test", handler_name="test")
        ])

    # Verify registration
    workflows = get_registered_workflows()
    assert "test_workflow" in workflows

Integration Testing

Test workflows with a local development environment:

import asyncio
from agnt5 import Client

async def test_workflow_execution():
    client = Client("http://localhost:8080")

    # Trigger workflow
    result = await client.start_workflow(
        workflow_name="data_pipeline",
        input_data={"source": "test_data"}
    )

    workflow_id = result["workflow_id"]

    # Poll for completion
    while True:
        status = await client.get_workflow_status(workflow_id)
        if status["state"] in ["completed", "failed"]:
            break
        await asyncio.sleep(1)

    assert status["state"] == "completed"

Best Practices

Design Patterns

  1. Idempotent Steps - Design steps to be safely retryable
  2. Small Steps - Break complex operations into smaller, focused steps
  3. Clear Dependencies - Make step relationships explicit
  4. Meaningful Names - Use descriptive names for steps and workflows
  5. Error Handling - Plan for failure and recovery scenarios

Performance

  1. Parallel Execution - Remove unnecessary dependencies to enable parallelism
  2. Batch Operations - Group related operations into single steps
  3. Resource Management - Consider resource usage when designing workflows
  4. State Minimization - Keep workflow state as small as possible

Monitoring

  1. Structured Logging - Add logging to workflow steps
  2. Progress Tracking - Use meaningful step names and descriptions
  3. Metrics Collection - Track workflow success rates and durations
  4. Error Alerting - Set up alerts for workflow failures

Next Steps


Context API

The Context (ctx) is the execution environment provided to all AGNT5 components. It provides APIs for orchestration, state management, LLM interactions, coordination, and observability.

Core Capabilities

  • Orchestration - Execute tasks, spawn functions, parallel execution
  • State Management - Get/set/delete state for entities
  • Coordination - Signals, timers, human approvals
  • AI Integration - LLM calls, tool registration
  • Observability - Logging, metrics, tracing

Orchestration APIs

Task Execution

Execute functions and wait for results (workflows only):

@workflow
async def process_workflow(ctx):
    # Execute a task
    result = await ctx.task(
        service_name="analytics",
        handler_name="process_data",
        input={"dataset": "users"}
    )

    return result

Parallel Execution

Run multiple tasks concurrently:

Async Invocation

Spawn child functions without waiting:

@function
async def batch_processor(ctx, items: list):
    # Spawn child invocations
    handles = []
    for item in items:
        handle = ctx.spawn(process_item, item, key=f"item-{item['id']}")
        handles.append(handle)

    # Continue other work...

    # Wait for results later if needed
    results = [await h.result() for h in handles]
    return {"processed": len(results)}

Checkpointing

Checkpoint expensive operations (functions only):

@function
async def process_pipeline(ctx, data_id: str):
    # Each step is checkpointed
    raw = await ctx.step("extract", lambda: extract_data(data_id))
    cleaned = await ctx.step("clean", lambda: clean_data(raw))
    result = await ctx.step("analyze", lambda: analyze(cleaned))

    # If crash occurs, resumes from last completed step
    return result

State Management (Entities)

Get, Set, Delete

Manage entity state:

@entity.write
async def update_profile(ctx, name: str, age: int):
    # Get with default
    profile = await ctx.get("profile", {})

    # Update profile
    profile.update({"name": name, "age": age})

    # Set state
    ctx.set("profile", profile)
    ctx.set("last_updated", datetime.now().isoformat())

    # Delete temporary data
    ctx.delete("temp_cache")

    return {"status": "updated"}

Entity Method Calls

Call entity methods from functions:

@function
async def chat(ctx, conversation_id: str, message: str):
    # Call entity method
    response = await ctx.entity(
        "ChatAgent",
        conversation_id
    ).send_message(message)

    return response

Coordination APIs

Signals

Wait for external events:

Timers & Sleep

Add delays and scheduled execution:

@workflow
async def scheduled_job(ctx):
    # Wait 5 seconds
    await ctx.timer(delay_ms=5000)

    # Or use sleep (alternative syntax)
    await ctx.sleep(30)  # 30 seconds

    # Wait until specific time (cron)
    await ctx.timer(cron="0 0 * * *")  # Daily at midnight

    return {"status": "completed"}

Human-in-the-Loop

Request human approval:

@workflow
async def deployment_workflow(ctx, version: str):
    # Run tests
    test_results = await ctx.task("ci", "run_tests", input={"version": version})

    if test_results["passed"]:
        # Request human approval for production
        approval = await ctx.human.approval(
            "deploy_production",
            payload={"version": version, "tests": test_results},
            timeout=timedelta(minutes=30),
            required_roles=["admin", "devops"]
        )

        if approval.decision == "approved":
            await ctx.task("deploy", "to_production", input={"version": version})
            return {"status": "deployed"}

    return {"status": "cancelled"}

AI Integration

LLM Generation

Generate text or structured responses:

Streaming

Stream responses for real-time output:

@function
async def stream_story(ctx, topic: str):
    # Stream text generation
    async for chunk in await ctx.llm.stream(
        prompt=f"Write a story about {topic}",
        model="gpt-4o"
    ):
        if chunk.text:
            yield chunk.text  # Stream to client

Tool Registration

Register tools for LLM use:

@function
async def agent_with_tools(ctx, query: str):
    # Register search tool
    search_tool = ctx.tools.register(
        "web_search",
        handler=perform_search,
        description="Search the web for information",
        schema={
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "max_results": {"type": "integer"}
            }
        }
    )

    # Generate with tool
    response = await ctx.llm.generate(
        prompt=query,
        tools=[search_tool],
        model="gpt-4o"
    )

    return response

Observability

Logging

Structured logging with context:

@function
async def tracked_operation(ctx, data: dict):
    logger = ctx.log()

    logger.info("Processing started", extra={"data_size": len(data)})

    try:
        result = process(data)
        logger.info("Processing completed", extra={"result_size": len(result)})
        return result
    except Exception as e:
        logger.error("Processing failed", exc_info=True)
        raise

Metrics

Record custom metrics:

@function
async def monitored_function(ctx, request: dict):
    metrics = ctx.metrics()

    # Increment counter
    metrics.increment("requests.count", service="api")

    # Record timing
    start = time.time()
    result = await process_request(request)
    duration = (time.time() - start) * 1000

    metrics.observe("latency.ms", duration, endpoint="/api/process")

    return result

Distributed Tracing

Create spans for tracing:

@function
async def traced_operation(ctx, data: dict):
    # Create span for external API call
    with ctx.trace_span().start("external_api_call", service="payments"):
        result = await call_payment_api(data)

    # Create span for database operation
    with ctx.trace_span().start("database_query", service="postgres"):
        await save_to_db(result)

    return result

Configuration & Secrets

Secrets

Access secrets securely:

@function
async def api_call(ctx, endpoint: str):
    # Get API key from secrets
    api_key = ctx.secrets().get("openai_api_key")
    db_password = ctx.secrets().get("database_password")

    # Use secrets in API calls
    response = await make_request(endpoint, api_key=api_key)
    return response

Configuration

Feature flags and config:

@function
async def feature_gated_handler(ctx, data: dict):
    config = ctx.config()

    # Check feature flag
    if config.get("new_feature_enabled", default=False):
        return await new_implementation(data)
    else:
        return await legacy_implementation(data)

    # A/B testing variant
    variant = config.variant("experiment_group", default="control")
    if variant == "treatment":
        return await experimental_flow(data)

Request Headers

Access incoming headers:

@function
async def header_aware(ctx, data: dict):
    headers = ctx.headers()

    user_agent = headers.get("user-agent", "unknown")
    correlation_id = headers.get("x-correlation-id")

    logger = ctx.log()
    logger.info(f"Request from {user_agent}", extra={"correlation_id": correlation_id})

    return {"processed": True}

Context Properties

Access execution metadata:

@function
async def introspective(ctx, data: dict):
    return {
        "run_id": ctx.run_id,              # Workflow/run identifier
        "step_id": ctx.step_id,            # Current step identifier
        "attempt": ctx.attempt,            # Retry attempt number
        "component_type": ctx.component_type,  # "function", "entity", "workflow"
        "object_id": ctx.object_id,        # Entity key (for entities)
        "method_name": ctx.method_name,    # Entity method name (for entities)
        "processed": data
    }

API Reference

Orchestration

API Description
ctx.task(service, handler, input) Execute function (workflows only)
ctx.parallel(*tasks) Run tasks in parallel
ctx.gather(**tasks) Parallel with named results
ctx.spawn(fn, *args, key) Async child invocation
ctx.step(name, fn) Checkpoint operation (functions)

State (Entities)

API Description
await ctx.get(key, default) Get state value
ctx.set(key, value) Set state value
ctx.delete(key) Delete state key
await ctx.entity(type, key).method() Call entity method

Coordination

API Description
await ctx.signal(name, timeout_ms, default) Wait for signal
await ctx.signal.emit(name, payload) Send signal
await ctx.timer(delay_ms) Wait with delay
await ctx.timer(cron) Wait until cron time
await ctx.sleep(seconds) Durable sleep
await ctx.human.approval(...) Request approval

AI Integration

API Description
await ctx.llm.generate(prompt, model) Generate text/JSON
await ctx.llm.stream(prompt, model) Stream generation
ctx.tools.register(name, handler, schema) Register tool

Observability

API Description
ctx.log() Get logger
ctx.metrics() Get metrics recorder
ctx.trace_span().start(name, service) Create trace span

Configuration

API Description
ctx.secrets().get(key) Get secret
ctx.config().get(key, default) Get config value
ctx.config().variant(key, default) Get A/B variant
ctx.headers() Get request headers

Common Patterns

Parallel with Error Handling

@workflow
async def robust_workflow(ctx):
    results = await ctx.gather(
        task1=ctx.task("svc", "task1"),
        task2=ctx.task("svc", "task2")
    )

    if results["task1"] and results["task2"]:
        return {"status": "success", "results": results}
    else:
        return {"status": "partial_failure"}

Conditional Signal Waiting

@workflow
async def conditional_approval(ctx, needs_approval: bool):
    if needs_approval:
        approval = await ctx.signal("approval_signal", timeout_ms=60000)
        if not approval.get("approved"):
            return {"status": "rejected"}

    # Proceed with operation
    result = await ctx.task("service", "operation")
    return {"status": "completed", "result": result}

LLM with Tool Execution

@function
async def agent_handler(ctx, query: str):
    # Register tools
    search = ctx.tools.register("search", handler=search_web, ...)
    calc = ctx.tools.register("calculator", handler=calculate, ...)

    # Generate with tools
    response = await ctx.llm.generate(
        prompt=query,
        tools=[search, calc],
        model="gpt-4o"
    )

    # Execute tool calls if needed
    if response.tool_calls:
        for tool_call in response.tool_calls:
            handler = ctx.tools.handler(tool_call.name)
            await handler(**tool_call.arguments)

    return response

Next Steps

  • Functions - Using context in functions
  • Entity - Using context in entities
  • Workflows - Using context in workflows
  • Agent - AI integration with context

Agents

Agents are autonomous LLM-driven systems that reason, plan, and execute tasks using tools. They orchestrate complex multi-step workflows by breaking down problems, selecting appropriate tools, and iterating until complete.

Key Characteristics

  • LLM-Powered - Driven by language models for reasoning and decision-making
  • Tool Orchestration - Automatically selects and executes appropriate tools
  • Memory Integration - Maintains long-term knowledge across conversations
  • Session Aware - Uses sessions for conversation context
  • Streaming Support - Real-time event streaming for responsive UX
  • Durable - Built on AGNT5 primitives for automatic fault tolerance

Basic Usage

Simple Agent

from agnt5 import Agent, LanguageModel

lm = LanguageModel()

agent = Agent(
    name="assistant",
    model=lm,
    instructions="You are a helpful coding assistant."
)

# Run agent
result = await agent.run("Explain recursion")
print(result.output)

Agent with Tools

from agnt5 import Agent, tool, LanguageModel

@tool.function(auto_schema=True)
def search_docs(query: str, language: str = "python") -> List[Dict]:
    """Search programming language documentation."""
    # Implementation
    return search_results

@tool.function(auto_schema=True)
def run_code(code: str, language: str = "python") -> Dict[str, str]:
    """Execute code and return output."""
    # Implementation
    return {"output": result}

lm = LanguageModel()
agent = Agent(
    name="coding_assistant",
    model=lm,
    instructions="""You are a coding assistant.
    Use search_docs to find API references.
    Use run_code to test code examples.""",
    tools=[search_docs, run_code]
)

result = await agent.run("How do I read a file in Python? Show me an example.")

Agent with Session and Memory

from agnt5 import Agent, Session, Memory, LanguageModel

# Create session
session = Session(
    id="tutoring-session-789",
    user_id="student-123",
    metadata={"subject": "mathematics"}
)

# Create memory
memory = Memory(service=VectorMemoryService())
await memory.store("student_level", "Advanced calculus")

# Create agent
lm = LanguageModel()
agent = Agent(
    name="math_tutor",
    model=lm,
    instructions="You are a patient math tutor. Adapt to student's level.",
    tools=[solve_equation_tool, plot_function_tool],
    session=session,
    memory=memory
)

result = await agent.run("Help me understand limits")

Agent Configuration

Parameters

Parameter Type Description
name str Unique agent name
model LanguageModel LLM to use for reasoning
instructions str System prompt and guidelines
tools List[Tool] Tools available to agent
session Session | None Session for conversation context
memory Memory | None Long-term knowledge storage
max_iterations int Max reasoning loops (default: 10)

Instructions

Write clear, actionable instructions:

Streaming Agents

Stream events in real-time:

async for event in agent.stream("Analyze this dataset", session=session):
    match event.type:
        case "thinking":
            print(f"🤔 {event.content}")
        case "tool_call":
            print(f"🔧 Calling {event.tool_name}({event.arguments})")
        case "tool_result":
            print(f"✓ Result: {event.result}")
        case "response":
            print(f"💬 {event.content}")
        case "error":
            print(f"❌ Error: {event.error}")

Agent Planning

Preview execution plan before running:

# Get plan without executing
plan = agent.plan("Analyze competitor pricing strategies")

print(f"Estimated steps: {len(plan.steps)}")
for step in plan.steps:
    print(f"- {step.type}: {step.description}")
    if step.tool:
        print(f"  Tool: {step.tool.name}")

# Review and execute if approved
if user_approves(plan):
    result = await agent.run("Analyze competitor pricing strategies")

Common Patterns

Research Agent

from agnt5 import Agent, Session, Memory, tool

@tool.function(auto_schema=True)
def search_academic(query: str, year_from: int = 2020) -> List[Dict]:
    """Search academic papers."""
    pass

@tool.function(auto_schema=True)
def extract_insights(paper_text: str) -> Dict[str, List[str]]:
    """Extract key insights from paper."""
    pass

# Create research agent
session = Session(id="research-ai-safety-001", user_id="researcher-123")
memory = Memory(service=VectorMemoryService())

lm = LanguageModel()
research_agent = Agent(
    name="research_agent",
    model=lm,
    instructions="""You are a research assistant specializing in AI safety.

    Research process:
    1. Search for relevant recent papers
    2. Extract key insights from each paper
    3. Identify common themes and gaps
    4. Synthesize findings into comprehensive summary""",
    tools=[search_academic, extract_insights],
    session=session,
    memory=memory
)

result = await research_agent.run(
    "Survey the current state of AI alignment research"
)

# Store findings in memory
await memory.ingest_from_session(session, strategy="smart")

Multi-Agent Workflow

# Shared session for coordination
session = Session(id="product-launch-001", user_id="pm-456")

# Specialized agents
market_researcher = Agent(
    name="market_analyst",
    model=lm,
    tools=[market_data_tool, competitor_analysis_tool],
    session=session,
    instructions="Analyze market opportunities and competitive landscape."
)

product_designer = Agent(
    name="designer",
    model=lm,
    tools=[design_tool, user_research_tool],
    session=session,
    instructions="Design products based on market research and user needs."
)

technical_lead = Agent(
    name="tech_lead",
    model=lm,
    tools=[architecture_tool, feasibility_tool],
    session=session,
    instructions="Assess technical feasibility and propose architecture."
)

# Sequential execution with shared context
market_analysis = await market_researcher.run(
    "Analyze market for AI-powered code review tools"
)

product_specs = await product_designer.run(
    "Design product based on market analysis"
)

tech_assessment = await technical_lead.run(
    "Evaluate technical feasibility of proposed product"
)

Agent Handoff

from agnt5.tools import AgentTool

# Create specialized agents
billing_agent = Agent(
    name="billing_specialist",
    model=lm,
    tools=[payment_tool, invoice_tool, refund_tool],
    instructions="Handle billing, payments, and refunds."
)

technical_agent = Agent(
    name="tech_support",
    model=lm,
    tools=[diagnostic_tool, fix_tool],
    instructions="Diagnose and fix technical issues."
)

# Coordinator with handoff capability
coordinator = Agent(
    name="coordinator",
    model=lm,
    tools=[
        classify_request_tool,
        AgentTool(target_agent=billing_agent),
        AgentTool(target_agent=technical_agent)
    ],
    instructions="""You are a support coordinator.
    Classify requests and hand off to appropriate specialist.

    Hand off to:
    - billing_specialist: payment, invoice, refund questions
    - tech_support: technical issues, bugs, troubleshooting"""
)

session = Session(id="support-ticket-789", user_id="customer-123")
result = await coordinator.run(
    "I was charged twice for my subscription",
    session=session
)

Human-in-the-Loop Agent

@tool.function(auto_schema=True, confirmation=True)
def deploy_to_production(version: str) -> Dict[str, str]:
    """Deploy application to production.

    Warning: Requires human approval.
    """
    pass

deployment_agent = Agent(
    name="deployer",
    model=lm,
    tools=[run_tests_tool, deploy_to_production],
    instructions="""Run all tests before deploying.
    Always request human approval for production deployments."""
)

result = await deployment_agent.run("Deploy version 2.0 to production")
# Agent runs tests, then waits for human approval before deploying

Iterative Problem Solving

debugging_agent = Agent(
    name="debugger",
    model=lm,
    tools=[
        analyze_logs_tool,
        run_diagnostic_tool,
        apply_fix_tool,
        verify_fix_tool
    ],
    instructions="""You are a debugging assistant.

    Process:
    1. Analyze error logs to identify root cause
    2. Run diagnostics to confirm hypothesis
    3. Apply potential fix
    4. Verify fix works
    5. If not fixed, iterate (max 3 attempts)

    Always verify fixes before considering issue resolved."""
)

result = await debugging_agent.run(
    "Users are experiencing 500 errors on the checkout page"
)

Best Practices

1. Write Clear Instructions

Provide specific, actionable guidance:

# ✓ Good - Specific process
agent = Agent(
    name="analyst",
    instructions="""Analyze data systematically:
    1. Identify data patterns and anomalies
    2. Calculate key statistics
    3. Generate visualizations
    4. Provide actionable insights"""
)

# ✗ Bad - Too vague
agent = Agent(
    name="helper",
    instructions="Help with analysis"
)

2. Use Sessions for Coordination

Share context across agents:

# Create shared session
session = Session(id="project-workflow-123", user_id="user-456")

# Set shared context
session.set_state("project_name", "ai-safety-research")
session.set_state("deadline", "2024-12-31")

# All agents access shared context
agent1 = Agent(name="agent1", session=session, ...)
agent2 = Agent(name="agent2", session=session, ...)

3. Leverage Memory

Use memory for persistent knowledge:

# Store user preferences
await memory.store("user_expertise", "Expert in React and TypeScript")
await memory.store("coding_style", "Prefers functional programming")

# Agent recalls automatically
agent = Agent(
    name="assistant",
    model=lm,
    tools=[code_gen_tool],
    memory=memory
)

result = await agent.run("Help me build a component")
# Agent uses stored preferences

4. Limit Iterations

Prevent infinite loops:

agent = Agent(
    name="bounded_agent",
    model=lm,
    max_iterations=5,  # Stop after 5 reasoning loops
    instructions="Solve problems efficiently."
)

Agent Architecture

Agents orchestrate AGNT5 primitives:

  1. LLM Core - Language model for reasoning
  2. Tool Execution - Tools built on Function primitive
  3. State Management - Sessions use Entity for state
  4. Long-Term Storage - Memory uses Entity for persistence
  5. Orchestration - Workflow patterns for multi-step tasks
  6. Streaming - Real-time event emission
Agent
├── LanguageModel (reasoning)
├── Tools (actions via Function)
├── Session (context via Entity)
├── Memory (knowledge via Entity)
└── Planner (orchestration)

Comparison with Primitives

Aspect Function Workflow Agent
Autonomy None Scripted Autonomous
Decision Making Pre-programmed Control flow LLM-driven
Tool Use N/A Explicit calls Dynamic selection
Adaptability Fixed Fixed steps Adaptive reasoning
Use Case Single operation Multi-step process Complex tasks

When to use Function:

  • Single, deterministic operation
  • No decision-making needed

When to use Workflow:

  • Pre-defined multi-step process
  • Explicit control flow

When to use Agent:

  • Complex, open-ended tasks
  • Requires reasoning and adaptation
  • Dynamic tool selection needed

Next Steps


Sessions

Sessions are conversation containers built on the Entity primitive. They manage multi-turn interactions between users and AI agents, providing structured state management, message history, and audit trails.

Key Characteristics

  • Built on Entity - Inherits durability and consistency from Entity
  • Scoped State - Organize state with session/user/app/temp scopes
  • Message History - Automatic conversation tracking with metadata
  • Multi-Agent Ready - Share context across multiple agents
  • Audit Trail - Complete history of interactions for compliance
  • Flexible Retention - Configurable data retention policies

Basic Usage

Creating a Session

from agnt5 import Session

# Create session with user context
session = Session(
    id="conv-2024-001",
    app_name="research_assistant",
    user_id="user-123",
    metadata={"project": "ai-safety-research"}
)

Managing Messages

Scoped State Management

Session state uses four scopes for different persistence levels:

# Session scope - conversation-specific (default)
session.set_state("shopping_cart", ["item1", "item2"])
session.set_state("current_step", "checkout")

# User scope - persists across all user sessions
session.set_state("language", "English", scope="user")
session.set_state("timezone", "America/Los_Angeles", scope="user")

# App scope - application-wide global state
session.set_state("api_version", "v2", scope="app")
session.set_state("feature_flags", {"new_ui": True}, scope="app")

# Temp scope - temporary invocation-specific
session.set_state("processing_step", "validation", scope="temp")

Integration with Agents

from agnt5 import Agent, Session, LanguageModel

# Create session
session = Session(
    id="support-ticket-456",
    user_id="customer-789",
    metadata={"ticket_type": "billing"}
)

# Create agent with session
lm = LanguageModel()
agent = Agent(
    name="support_agent",
    model=lm,
    instructions="You are a helpful customer support agent.",
    tools=[search_kb_tool, create_ticket_tool],
    session=session
)

# Agent automatically uses session for context
result = await agent.run("I need help with my recent charge")

# Session maintains full conversation history
history = await session.history()

Common Patterns

Multi-Agent Coordination

Share context across multiple specialized agents:

# Create shared session
session = Session(
    id="research-workflow-001",
    user_id="researcher-123",
    metadata={"project": "quantum-computing-review"}
)

# Set shared context
session.set_state("research_topic", "quantum error correction")
session.set_state("target_depth", "comprehensive")

# Multiple specialized agents work together
literature_agent = Agent(
    name="literature_reviewer",
    session=session,
    tools=[paper_search]
)

code_agent = Agent(
    name="code_analyzer",
    session=session,
    tools=[github_search]
)

synthesis_agent = Agent(
    name="synthesizer",
    session=session,
    tools=[document_tool]
)

# Execute research pipeline
papers = await literature_agent.run("Find recent papers")
implementations = await code_agent.run("Find implementations")
report = await synthesis_agent.run("Synthesize findings")

# All agents see shared context and each other's work
full_history = await session.history()

Agent Handoff Pattern

Seamlessly transfer conversations between specialized agents:

session = Session(id="customer-inquiry-789", user_id="customer-456")

# Coordinator routes to appropriate specialist
coordinator = Agent(
    name="router",
    session=session,
    tools=[classification_tool]
)
routing = await coordinator.run("How do I upgrade my subscription?")

if routing.category == "billing":
    # Billing agent gets full conversation context
    billing_agent = Agent(
        name="billing_specialist",
        session=session,
        tools=[billing_tools]
    )
    result = await billing_agent.run("Continue from coordinator's analysis")
    # billing_agent sees all previous messages

elif routing.category == "technical":
    tech_agent = Agent(
        name="tech_support",
        session=session,
        tools=[tech_tools]
    )
    result = await tech_agent.run("Handle technical inquiry")

Session State vs Memory

Session Export and Audit

# Create session with audit metadata
session = Session(
    id="compliance-audit-001",
    user_id="analyst-789",
    metadata={
        "regulation": "SOC2",
        "audit_period": "Q4-2024",
        "auditor": "external-firm"
    },
    retention={"ttl_days": 730}  # 2 years
)

# Conduct conversation with full tracking
agent = Agent(name="data_analyst", session=session)
await agent.run("Analyze user access patterns")

# Export session for compliance review
jsonl_export = await session.export(format="jsonl")
# Each line contains: timestamp, role, message, metadata, tool_calls

# Query specific events
recent_events = await session.events(since="2024-01-01", limit=100)

# Prune old messages while keeping metadata
await session.prune(strategy="keep_last_50")

Long-Running Sessions with Pruning

# Create session with automatic pruning
session = Session(
    id="long-conversation-456",
    user_id="user-123",
    metadata={"type": "ongoing_project"}
)

# After many interactions, prune intelligently
await session.prune(strategy="keep_important")  # Uses LLM
await session.prune(strategy="sliding_window", window_size=100)
await session.prune(strategy="summarize_old", threshold=50)

# Session remains performant even with thousands of messages

Configuration

Session Parameters

Parameter Type Description
id str Unique session identifier
app_name str | None Application name
user_id str | None User identifier
metadata dict | None Session metadata
retention dict | None Retention policy configuration

Retention Policies

Configure data retention:

Best Practices

1. Use Appropriate State Scopes

Match state scope to persistence requirements:

# ✓ Session scope - conversation-specific
session.set_state("current_page", 3)
session.set_state("draft_document", content)

# ✓ User scope - user preferences
session.set_state("theme", "dark", scope="user")
session.set_state("notification_preference", "email", scope="user")

# ✓ App scope - global configuration
session.set_state("rate_limit", 1000, scope="app")
session.set_state("feature_flags", flags, scope="app")

# ✓ Temp scope - transient data
session.set_state("validation_step", "in_progress", scope="temp")

2. Design for Multi-Agent Coordination

Structure session state for agent collaboration:

# Good - Clear coordination structure
session.set_state("workflow_stage", "research")
session.set_state("agent_outputs", {
    "researcher": {"status": "completed", "findings": [...]},
    "analyzer": {"status": "in_progress"},
    "writer": {"status": "pending"}
})

# Agents can check dependencies
current_stage = session.get_state("workflow_stage")
researcher_output = session.get_state("agent_outputs")["researcher"]

3. Implement Retention Strategies

Manage session lifecycle appropriately:

# For regulated industries
session = Session(
    id="medical-consultation",
    retention={
        "ttl_days": 2555,  # Legal requirement
        "immutable": True
    }
)

# For ephemeral conversations
session = Session(
    id="temp-chat",
    retention={
        "ttl_days": 1,  # Delete after 1 day
        "auto_prune": True
    }
)

Entity vs Session

Aspect Entity Session
Purpose General stateful primitive Conversation-specific
State Structure Flexible key-value Opinionated message + state
API Low-level (get/set/delete) High-level (send_message/history)
Scoping Manual Built-in (session/user/app/temp)
Audit Manual event tracking Automatic conversation log
Use Case Custom stateful components AI agent conversations

When to use Entity:

  • Building custom stateful patterns
  • Need complete control over state structure
  • Non-conversation workloads

When to use Session:

  • AI agent conversations
  • Multi-agent coordination needed
  • Audit trails required
  • Standard conversation patterns

Next Steps

  • Entity - Underlying primitive for Session
  • Agent - Agents use Sessions for context
  • Memory - Long-term storage vs Session state
  • Context API - Session context operations

Tools

Tools are callable capabilities that extend what agents can do. Tools provide structured interfaces to functions, APIs, services, and other agents, with automatic schema extraction from Python code.

Key Characteristics

  • Automatic Schema - Extract input/output schemas from docstrings and type hints
  • Multiple Types - Function, Hosted, MCP, OpenAPI, and Agent tools
  • Built on Function - Inherits durability and retry logic
  • Confirmation Policies - Optional user approval for dangerous operations
  • Rich Metadata - Descriptions, examples, and parameter constraints

Basic Usage

Function Tools with Auto-Schema

The simplest way to create tools is with the @tool() decorator:

from agnt5 import tool

@tool(auto_schema=True)
def search_web(query: str, max_results: int = 10) -> List[Dict[str, str]]:
    """Search the web for information.

    Args:
        query: The search query string
        max_results: Maximum number of results to return

    Returns:
        List of search results with title, url, and snippet
    """
    # Implementation
    return search_results

Schema automatically extracted:

{
  "name": "search_web",
  "description": "Search the web for information.",
  "input_schema": {
    "type": "object",
    "properties": {
      "query": {"type": "string", "description": "The search query string"},
      "max_results": {"type": "integer", "default": 10}
    },
    "required": ["query"]
  }
}

Using Tools with Agents

from agnt5 import Agent, tool, LanguageModel

@tool(auto_schema=True)
def calculate_area(length: float, width: float) -> float:
    """Calculate the area of a rectangle.

    Args:
        length: Length in meters
        width: Width in meters

    Returns:
        Area in square meters
    """
    return length * width

lm = LanguageModel()
agent = Agent(
    name="math_assistant",
    model=lm,
    tools=[calculate_area],
    instructions="Help users with geometry calculations."
)

result = await agent.run("What's the area of a 5m by 3m room?")
# Agent automatically calls calculate_area(5.0, 3.0)

Tool Types

Function Tools

Direct Python function execution:

Hosted Tools

Tools deployed as durable AGNT5 workers:

from agnt5 import worker
from agnt5.tools import HostedTool

# Define worker function
@worker.handler
def analyze_data(data: Dict) -> Dict:
    """Worker function for complex data analysis."""
    # Heavy computation here
    return analysis_results

# Create hosted tool
analysis_tool = HostedTool(
    name="analyze_data",
    description="Perform complex data analysis",
    endpoint="agnt5://data-analysis-service/analyze_data"
)

# Use with agent
agent = Agent(name="analyst", tools=[analysis_tool])

MCP Tools

Integrate with Model Context Protocol servers:

from agnt5.tools import MCPTool

# Connect to MCP server
filesystem_tool = MCPTool(
    name="filesystem",
    mcp_server_url="http://localhost:3000/mcp",
    capabilities=["read_file", "write_file", "list_directory"]
)

agent = Agent(name="file_assistant", tools=[filesystem_tool])

OpenAPI Tools

Generate tools from OpenAPI specifications:

from agnt5.tools import OpenAPITool

# Create tools from OpenAPI spec
github_tools = OpenAPITool.from_spec(
    spec_url="https://api.github.com/openapi.json",
    operations=["get_repo", "list_issues", "create_issue"]
)

agent = Agent(name="github_bot", tools=github_tools)

Tool Configuration

Manual Schema Definition

For more control, define schemas explicitly:

from agnt5 import Tool

search_tool = Tool(
    name="search",
    description="Search for information",
    input_schema={
        "type": "object",
        "properties": {
            "query": {"type": "string", "minLength": 1},
            "filters": {"type": "object"}
        },
        "required": ["query"]
    },
    handler=search_function
)

Confirmation for Dangerous Operations

Require user approval for destructive actions:

@tool(auto_schema=True, confirmation=True)
def delete_database(database_name: str) -> Dict[str, str]:
    """Delete a database permanently.

    Args:
        database_name: Name of the database to delete

    Returns:
        Status of deletion operation

    Warning:
        This operation is irreversible and will delete all data.
    """
    # Requires human approval before execution
    pass

# Agent proposes deletion but waits for approval
agent = Agent(name="admin", tools=[delete_database])
result = await agent.run("Clean up the test database")
# User receives confirmation prompt before tool executes

Common Patterns

Tool Composition

Combine multiple tools for complex capabilities:

@tool(auto_schema=True)
def search_papers(query: str, year_from: int = 2020) -> List[Dict]:
    """Search academic papers."""
    pass

@tool(auto_schema=True)
def download_pdf(url: str) -> bytes:
    """Download PDF document."""
    pass

@tool(auto_schema=True)
def extract_text(pdf_data: bytes) -> str:
    """Extract text from PDF."""
    pass

# Agent orchestrates multiple tools
research_agent = Agent(
    name="researcher",
    tools=[search_papers, download_pdf, extract_text],
    instructions="Search papers, download them, and extract key findings."
)

result = await research_agent.run("Survey recent work on transformers")
# Agent chains: search_papers → download_pdf → extract_text

Tool Error Handling

Tools with robust error handling:

@tool(auto_schema=True)
def fetch_stock_price(symbol: str) -> Dict[str, Any]:
    """Fetch current stock price.

    Args:
        symbol: Stock ticker symbol (e.g., 'AAPL', 'GOOGL')

    Returns:
        Stock price data

    Raises:
        ValueError: If symbol is invalid
        ConnectionError: If market data service is unavailable
    """
    try:
        price_data = market_api.get_price(symbol)
        return {
            "symbol": symbol,
            "price": price_data.current,
            "change": price_data.change
        }
    except InvalidSymbolError:
        raise ValueError(f"Invalid stock symbol: {symbol}")
    except MarketAPIError as e:
        raise ConnectionError(f"Market data unavailable: {e}")

# Agent handles tool errors gracefully
agent = Agent(name="stock_advisor", tools=[fetch_stock_price])

Dynamic Tool Registration

Register tools at runtime based on context:

# Base toolset
base_tools = [search_tool, calculate_tool]

# Add specialized tools based on user role
if user.role == "admin":
    admin_tools = [delete_user_tool, modify_permissions_tool]
    all_tools = base_tools + admin_tools
else:
    all_tools = base_tools

agent = Agent(
    name="assistant",
    tools=all_tools,
    instructions=f"You are assisting a {user.role}."
)

Tool with Context Access

Tools can access execution context for advanced operations:

from agnt5 import tool, Context

@tool(auto_schema=True)
async def store_memory(ctx: Context, key: str, value: str) -> Dict[str, str]:
    """Store information in long-term memory.

    Args:
        ctx: Execution context (automatically provided)
        key: Memory key
        value: Content to store

    Returns:
        Confirmation of storage
    """
    # Access context for durable storage
    await ctx.memory.set(key, value)

    return {
        "status": "stored",
        "key": key,
        "timestamp": ctx.now()
    }

# Context is automatically injected when tool is called
agent = Agent(name="memory_agent", tools=[store_memory])

Best Practices

1. Write Clear Tool Descriptions

Good descriptions help agents use tools correctly:

2. Use Type Hints and Docstrings

Enable automatic schema extraction:

from typing import List, Dict, Optional

@tool(auto_schema=True)
def analyze_sentiment(
    text: str,
    language: str = "en",
    return_scores: bool = False
) -> Dict[str, Any]:
    """Analyze sentiment of text.

    Args:
        text: Text to analyze (minimum 10 characters)
        language: ISO language code (en, es, fr, de)
        return_scores: Include detailed confidence scores

    Returns:
        Sentiment analysis with label (positive/negative/neutral)
        and optional confidence scores
    """
    # Type hints + docstring = complete schema
    pass

3. Implement Confirmation for Dangerous Operations

Protect users from destructive actions:

# Dangerous operations should require confirmation
@tool(auto_schema=True, confirmation=True)
def execute_code(code: str, language: str = "python") -> Dict[str, str]:
    """Execute arbitrary code in a sandboxed environment.

    Warning:
        Code execution can be dangerous. Requires explicit user approval.
    """
    pass

@tool(auto_schema=True, confirmation=True)
def send_email_blast(recipients: List[str], subject: str, body: str) -> Dict:
    """Send email to multiple recipients.

    Warning:
        Bulk email requires confirmation to prevent spam.
    """
    pass

Function vs Tool

Aspect Function Tool
Purpose General computation Agent capability
Schema Optional Required (auto-generated)
Discovery Manual invocation Agent-driven selection
Metadata Basic Rich (description, examples)
Use Case Backend logic Agent actions

When to use Function:

  • Backend processing
  • Internal system operations
  • Not exposed to agents

When to use Tool:

  • Agent capabilities
  • External system integration
  • User-facing operations

Next Steps


Memory

Memory is a long-term knowledge storage system that enables agents to remember facts, preferences, and context across conversations. Unlike Session state (short-term), Memory provides persistent, searchable knowledge that agents build upon over time.

Key Characteristics

  • Long-Term Persistence - Knowledge survives across sessions and conversations
  • Semantic Search - Find relevant memories using natural language queries
  • Smart Ingestion - Automatically extract important facts using LLMs
  • Multiple Backends - InMemory (dev), Vector (semantic), Database (persistent)
  • Built on Entity - Inherits durability and consistency
  • Cross-Session - Shared knowledge accessible to all agents

Basic Usage

Creating Memory

Storing and Retrieving Memories

# Store individual memories
await memory.store(
    key="user_role",
    content="Senior Software Engineer specializing in distributed systems",
    type="user_info",
    confidence=0.95
)

await memory.store(
    key="project_context",
    content="Building a real-time analytics platform for financial data",
    type="project_info"
)

# Retrieve specific memories
memories = await memory.recall(["user_role", "project_context"])
for mem in memories:
    print(f"{mem.key}: {mem.content}")
# Search using natural language
results = await memory.search(
    query="What does the user know about databases?",
    limit=5
)

for result in results:
    print(f"Score: {result.score:.2f}")
    print(f"Content: {result.content}")
    print(f"Source: {result.metadata.get('source_session')}")

Integration with Agents

from agnt5 import Agent, Session, Memory, LanguageModel

# Create memory
memory = Memory(service=VectorMemoryService())

# Store long-term knowledge
await memory.store("user_expertise", "PhD in Machine Learning, specializes in NLP")
await memory.store("preferred_tools", "Prefers PyTorch over TensorFlow")

# First conversation
session1 = Session(id="conv-001", user_id="researcher-123")
lm = LanguageModel()
agent = Agent(name="assistant", model=lm, memory=memory, session=session1)
await agent.run("Help me implement attention mechanisms")
# Agent recalls user's ML expertise and PyTorch preference

# Later conversation (different session)
session2 = Session(id="conv-042", user_id="researcher-123")
agent2 = Agent(name="assistant", model=lm, memory=memory, session=session2)
await agent2.run("Review my transformer code")
# Agent still remembers user's background and preferences

Smart Ingestion

Automatically extract and store important information from conversations:

# Agent conversation
session = Session(id="consultation-123", user_id="user-456")
agent = Agent(name="advisor", memory=memory, session=session)

await agent.run("I'm building a recommendation system for e-commerce")
await agent.run("We have 10 million users and need sub-100ms latency")
await agent.run("Our team is experienced with Python and Go")

# Extract and store important facts
memory_keys = await memory.ingest_from_session(
    session,
    strategy="smart"  # Uses LLM to identify important facts
)

# Memory now contains:
# - "User building recommendation system for e-commerce"
# - "System requirements: 10M users, <100ms latency"
# - "Team expertise: Python, Go"

# Future conversations automatically recall these facts

Ingestion Strategies

Choose the right strategy for your use case:

Common Patterns

User Profile Memory

Build comprehensive user profiles over time:

class UserProfileMemory:
    def __init__(self, user_id: str, memory: Memory):
        self.user_id = user_id
        self.memory = memory
        self.prefix = f"user_{user_id}_"

    async def store_preference(self, category: str, value: str):
        """Store user preference."""
        await self.memory.store(
            key=f"{self.prefix}pref_{category}",
            content=value,
            type="preference",
            user_id=self.user_id
        )

    async def store_expertise(self, domain: str, level: str, details: str):
        """Store user expertise."""
        await self.memory.store(
            key=f"{self.prefix}expertise_{domain}",
            content=f"{level} expertise in {domain}: {details}",
            type="expertise",
            user_id=self.user_id
        )

    async def get_profile(self) -> Dict[str, Any]:
        """Retrieve complete user profile."""
        results = await self.memory.search(
            query=f"user {self.user_id} profile preferences expertise",
            limit=50
        )

        profile = {
            "preferences": {},
            "expertise": {},
            "context": []
        }

        for result in results:
            if result.metadata.get("type") == "preference":
                category = result.key.replace(f"{self.prefix}pref_", "")
                profile["preferences"][category] = result.content
            elif result.metadata.get("type") == "expertise":
                domain = result.key.replace(f"{self.prefix}expertise_", "")
                profile["expertise"][domain] = result.content

        return profile

# Usage
user_memory = UserProfileMemory("user-123", memory)
await user_memory.store_preference("language", "Python")
await user_memory.store_expertise("ml", "advanced", "10+ years in NLP")

profile = await user_memory.get_profile()

Conversation Summarization

Archive long sessions as summaries:

async def archive_session_to_memory(
    session: Session,
    memory: Memory,
    summary_threshold: int = 50
):
    """Archive long sessions as summaries in memory."""

    history = await session.history()

    if len(history) > summary_threshold:
        # Generate comprehensive summary
        from agnt5 import LanguageModel

        lm = LanguageModel()
        conversation = "\n".join([
            f"{msg.role}: {msg.content}" for msg in history
        ])

        summary = await lm.generate(
            prompt=f"""Summarize this conversation in 2-3 paragraphs.
            Focus on key decisions, important facts, and outcomes.

            Conversation:
            {conversation}

            Summary:""",
            max_tokens=300
        )

        # Store summary in memory
        await memory.store(
            key=f"session_summary_{session.id}",
            content=summary.text,
            type="session_summary",
            session_id=session.id,
            user_id=session.user_id,
            message_count=len(history)
        )

        # Optionally prune session to save space
        await session.prune(strategy="keep_summary")

# Usage
await archive_session_to_memory(session, memory, summary_threshold=100)

Learning Agent

Agent that learns from every interaction:

class LearningAgent:
    def __init__(self, name: str, model, tools, memory: Memory):
        self.agent = Agent(name=name, model=model, tools=tools, memory=memory)
        self.memory = memory

    async def run_and_learn(self, prompt: str, session: Session):
        """Execute task and learn from interaction."""

        # Run agent
        result = await self.agent.run(prompt, session=session)

        # Extract and store learnings
        memory_keys = await self.memory.ingest_from_session(
            session,
            strategy="smart"
        )

        # Store outcome for future reference
        await self.memory.store(
            key=f"interaction_{session.id}",
            content=f"Task: {prompt}\nOutcome: {result.output}",
            type="interaction_history",
            success=result.status == "completed"
        )

        return result

    async def recall_similar_tasks(self, prompt: str) -> List[Dict]:
        """Find similar past interactions."""
        return await self.memory.search(
            query=f"Similar to: {prompt}",
            limit=5
        )

# Usage
learning_agent = LearningAgent("assistant", lm, tools, memory)

# Agent learns from each interaction
result = await learning_agent.run_and_learn(
    "Debug this performance issue",
    session
)

# Later: Recall similar past work
similar = await learning_agent.recall_similar_tasks(
    "Another performance problem"
)

Best Practices

1. Distinguish Memory from Session State

Use Memory for long-term, Session for short-term:

2. Add Metadata for Better Retrieval

Enrich memories with metadata:

await memory.store(
    key="technical_decision_001",
    content="Chose PostgreSQL for transactional data, Redis for caching",
    type="decision",
    category="architecture",
    confidence=0.9,
    source_session="planning-session-789",
    timestamp=datetime.now(),
    decision_maker="tech-lead-123",
    rationale="Need ACID guarantees and high read performance"
)

# Later: Search by metadata
postgres_decisions = await memory.search(
    query="database decisions",
    filter={"type": "decision", "category": "architecture"}
)

3. Implement Memory Maintenance

Manage memory lifecycle:

async def maintain_memory(memory: Memory, user_id: str):
    """Prune old or low-confidence memories."""

    # Remove outdated information
    await memory.forget([
        key for key in await memory.list_keys(user_id=user_id)
        if is_outdated(key)
    ])

    # Update confidence scores based on usage
    for key in await memory.list_keys(user_id=user_id):
        mem = await memory.recall([key])
        if mem[0].metadata.get("last_accessed"):
            days_since_access = calculate_days_since(
                mem[0].metadata["last_accessed"]
            )
            new_confidence = calculate_confidence_decay(
                mem[0].metadata.get("confidence", 1.0),
                days_since_access
            )
            await memory.update(key, confidence=new_confidence)

Session State vs Memory

Aspect Session State Memory
Lifetime Single conversation Indefinite
Scope Session-specific Cross-session
Search Direct key access Semantic search
Purpose Current context Long-term knowledge
Storage Entity state Entity + Vector DB
Example Shopping cart User preferences

When to use Session State:

  • Current conversation context
  • Temporary workflow state
  • UI state and navigation

When to use Memory:

  • User profile and preferences
  • Historical interactions
  • Domain knowledge
  • Learned facts and insights

Next Steps

  • Session - Short-term conversation state
  • Agent - Agents use Memory for context
  • Entity - Underlying primitive for Memory
  • Context API - Memory context operations

Worker Runtime

The Worker class is the high-level runtime that integrates with the AGNT5 platform, automatically registers decorated components, and handles execution coordination.

Worker Configuration

Basic Worker

import asyncio
from agnt5 import Worker, function

@function()
def hello(name: str) -> str:
    return f"Hello, {name}!"

async def main():
    worker = Worker(service_name="hello-service")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Configuration Parameters

worker = Worker(
    service_name="my-service",           # Required: Service identifier
    service_version="1.2.0",             # Version string (default: "1.0.0")
    coordinator_endpoint="http://localhost:9091",  # Worker coordinator URL
    runtime="standalone"                  # Runtime mode: "standalone" or "asgi"
)
Parameter Type Description Default
service_name str Service identifier for registration Required
service_version str Version string for this service "1.0.0"
coordinator_endpoint str Worker coordinator URL "http://localhost:9091"
runtime str Runtime adapter: "standalone" or "asgi" "standalone"

Runtime Modes

Standalone Runtime

For background workers, batch processing, and daemon processes:

import asyncio
from agnt5 import Worker, function

@function()
def background_task(data: dict) -> dict:
    # Process data in background
    return {"processed": True, "result": data}

async def main():
    # Standalone worker blocks until stopped
    worker = Worker(
        service_name="background-processor",
        runtime="standalone"
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Characteristics:

  • Blocks until manually stopped (Ctrl+C)
  • Connects to worker coordinator
  • Ideal for background processing
  • Built-in signal handling
  • OpenTelemetry integration

ASGI Runtime

For web applications and HTTP endpoints:

from agnt5 import Worker, function

@function()
def web_handler(request: dict) -> dict:
    return {"message": "Hello from AGNT5!", "data": request}

# Create ASGI application
app = Worker(
    service_name="web-service",
    runtime="asgi"
)

# Enable CORS for browser access
app.enable_cors()

Run with any ASGI server:

# Install ASGI server
pip install uvicorn

# Run the application
uvicorn main:app --reload --port 8000

ASGI Endpoints:

Endpoint Method Description
/health GET Health check endpoint
/functions GET List registered functions
/invoke/{handler} POST Invoke specific function

Environment Configuration

Environment Variables

Configure workers using environment variables:

# Service configuration
export AGNT5_SERVICE_NAME=my-service
export AGNT5_SERVICE_VERSION=2.0.0
export AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com

# Logging configuration
export AGNT5_LOG_LEVEL=INFO
export AGNT5_LOG_FORMAT=json

# Runtime configuration
export AGNT5_RUNTIME=standalone
export AGNT5_DISABLE_TELEMETRY=false

Configuration Priority

Configuration sources in order of precedence:

  1. Constructor parameters - Highest priority
  2. Environment variables - Medium priority
  3. Default values - Lowest priority
# This worker uses constructor values over environment
worker = Worker(
    service_name="explicit-service",  # Overrides AGNT5_SERVICE_NAME
    coordinator_endpoint="http://localhost:9091"
)

Worker Lifecycle

Initialization

async def main():
    worker = Worker("my-service")

    # Worker validates Rust extension availability
    # Creates runtime adapter (standalone or ASGI)
    # Installs OpenTelemetry logging
    # Registers all decorated functions and workflows

    await worker.run()  # Starts the worker loop

Registration Process

  1. Function Discovery: Scans for @function decorated callables
  2. Workflow Discovery: Scans for @workflow decorated factories
  3. Component Registration: Sends metadata to coordinator
  4. Service Announcement: Service becomes available for invocations

Graceful Shutdown

import signal
import asyncio
from agnt5 import Worker

class GracefulWorker:
    def __init__(self):
        self.worker = Worker("graceful-service")
        self.shutdown_requested = False

    async def run(self):
        # Register signal handlers
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGINT, self._signal_handler)

        try:
            await self.worker.run()
        except KeyboardInterrupt:
            print("Shutdown requested via keyboard interrupt")
        finally:
            await self._cleanup()

    def _signal_handler(self, signum, frame):
        print(f"Received signal {signum}, initiating graceful shutdown...")
        self.shutdown_requested = True

    async def _cleanup(self):
        print("Cleaning up resources...")
        # Perform cleanup tasks
        # Close database connections
        # Finish in-flight requests
        # Remove OpenTelemetry handlers
        print("Cleanup complete")

async def main():
    graceful_worker = GracefulWorker()
    await graceful_worker.run()

if __name__ == "__main__":
    asyncio.run(main())

ASGI Integration

Basic ASGI App

from agnt5 import Worker, function

@function()
def api_endpoint(data: dict) -> dict:
    return {"status": "success", "received": data}

# Create ASGI app
app = Worker("api-service", runtime="asgi")

CORS Configuration

Enable CORS for browser access:

# Enable CORS with defaults (allows all origins)
app.enable_cors()

# Enable CORS with specific origins
app.enable_cors(origins=["https://myapp.com", "https://localhost:3000"])

# Disable CORS
app.disable_cors()

Custom Middleware

Add ASGI middleware:

from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware

app = Worker("api-service", runtime="asgi")

# Add middleware (if using Starlette/FastAPI patterns)
# Note: This is conceptual - actual middleware integration depends on ASGI runtime implementation

Error Handling

ASGI runtime provides consistent error responses:

# Function that raises an exception
@function()
def failing_function(data: dict) -> dict:
    raise ValueError("Something went wrong")

# ASGI runtime catches and formats the error:
# {
#   "error": "Function failing_function failed: Something went wrong",
#   "status": 500
# }

Worker Methods

Runtime Control

worker = Worker("my-service")

# Check if worker is running
if worker.is_running():
    print("Worker is active")

# For ASGI workers only - get ASGI callable
if worker.runtime == "asgi":
    asgi_app = worker.__call__  # ASGI callable interface

Component Registration

# Manual component registration (usually automatic)
worker._register_components()

# Internal message handling (not part of public API)
# worker._handle_message(request)

Observability

OpenTelemetry Integration

Workers automatically install OpenTelemetry logging:

import logging
from agnt5 import Worker
from agnt5.logging import install_opentelemetry_logging, remove_opentelemetry_logging

# Custom logging setup
logger = logging.getLogger("my-service")

# Install telemetry with custom formatter
install_opentelemetry_logging(
    logger=logger,
    level=logging.DEBUG,
    format_string="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

worker = Worker("my-service")

# Telemetry is automatically cleaned up on worker shutdown

Structured Logging

import logging
from agnt5 import Worker, function

logger = logging.getLogger(__name__)

@function()
def logged_function(ctx, data: dict) -> dict:
    # Structured logging with context
    logger.info(
        "Processing function",
        extra={
            "invocation_id": ctx.invocation_id,
            "service_name": ctx.metadata.get("service_name"),
            "data_size": len(str(data))
        }
    )

    result = {"processed": True}

    logger.info(
        "Function completed",
        extra={
            "invocation_id": ctx.invocation_id,
            "success": True
        }
    )

    return result

Development Patterns

Hot Reload Development

import os
import sys
from agnt5 import Worker, function

# Development configuration
if os.getenv("ENVIRONMENT") == "development":
    import logging
    logging.basicConfig(level=logging.DEBUG)

@function()
def development_handler(data: dict) -> dict:
    return {"env": "development", "data": data}

async def main():
    worker = Worker(
        service_name="dev-service",
        coordinator_endpoint=os.getenv("COORDINATOR_URL", "http://localhost:9091")
    )

    try:
        await worker.run()
    except KeyboardInterrupt:
        print("\nDevelopment worker stopped")
        sys.exit(0)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Testing Workers

import pytest
from unittest.mock import AsyncMock, patch
from agnt5 import Worker, function

@function()
def test_function(data: str) -> str:
    return data.upper()

@pytest.fixture
async def mock_worker():
    with patch('agnt5.worker.PyWorker'):
        worker = Worker("test-service")
        yield worker

@pytest.mark.asyncio
async def test_worker_registration(mock_worker):
    # Test component registration
    mock_worker._register_components()

    # Verify functions are registered
    from agnt5.decorators import get_registered_functions
    functions = get_registered_functions()
    assert "test_function" in functions

@pytest.mark.asyncio
async def test_worker_asgi_mode():
    app = Worker("test-service", runtime="asgi")
    assert callable(app)  # ASGI callable interface

Production Deployment

Container Deployment

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# Run worker
CMD ["python", "worker.py"]

Environment Configuration

# Production environment variables
AGNT5_SERVICE_NAME=production-service
AGNT5_SERVICE_VERSION=1.2.0
AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com
AGNT5_LOG_LEVEL=INFO
AGNT5_LOG_FORMAT=json

Health Monitoring

import asyncio
import logging
from agnt5 import Worker, function

logger = logging.getLogger(__name__)

@function()
def health_check() -> dict:
    """Health check endpoint."""
    return {
        "status": "healthy",
        "service": "production-service",
        "timestamp": time.time()
    }

async def main():
    worker = Worker(
        service_name="production-service",
        service_version=os.getenv("SERVICE_VERSION", "1.0.0")
    )

    logger.info("Starting production worker")

    try:
        await worker.run()
    except Exception as e:
        logger.error(f"Worker failed: {e}")
        raise
    finally:
        logger.info("Worker shutdown complete")

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

Service Design

  1. Service Naming - Use consistent, descriptive service names
  2. Version Management - Use semantic versioning for service versions
  3. Resource Management - Clean up resources in shutdown handlers
  4. Error Handling - Handle exceptions gracefully in workers
  5. Health Checks - Implement health check functions for monitoring

Performance

  1. Connection Pooling - Reuse database and HTTP connections
  2. Async Operations - Use async functions for I/O operations
  3. Resource Limits - Configure appropriate memory and CPU limits
  4. Scaling - Deploy multiple worker instances for high throughput
  5. Monitoring - Track worker performance and error rates

Security

  1. Input Validation - Validate all function inputs
  2. Error Messages - Don’t expose sensitive information in errors
  3. Authentication - Use proper authentication for coordinator connections
  4. Network Security - Use secure connections (HTTPS/TLS) in production
  5. Secrets Management - Use environment variables for sensitive configuration

Next Steps

Core Primitives

Agent Development Kit

  • Agents - Autonomous LLM-driven systems
  • Tools - Extend agent capabilities
  • Sessions - Conversation management
  • Memory - Long-term knowledge storage

Resources