AGNT5 Python SDK
Build AI agents and durable workflows with the AGNT5 Python SDK
Build AI agents and reliable workflows with automatic recovery. AGNT5 combines agent orchestration and fault-tolerant execution in one lightweight framework.
Primitives comparison
| Function | Entity | Workflow | Agent | Tool | |
|---|---|---|---|---|---|
| What | Stateless operation with retries | Stateful component with unique key | Multi-step orchestrated process | LLM with instructions and tools | Python function LLMs can call |
| State | None | Isolated per entity key | Isolated per workflow instance | Conversation history via Entity | None |
| Durability | Automatic retries, checkpointing | Persistent state across runs | Checkpointed steps, resume on failure | Context preserved in Entity | Runs within agent context |
| Best For | Document analysis, embeddings generation, LLM API calls | AI chat sessions, agent memory, conversation history | RAG pipelines, content generation with review, AI evals | Customer support, research assistants, code review | Vector search, knowledge base queries, API integrations |
Key Features
- Automatic recovery from failures with configurable retry policies
- Checkpointing resumes from exact failure point
- Multi-agent coordination via handoffs and composition
- Python-native - decorators, async/await, type hints
- Multi-provider - OpenAI, Anthropic, Groq, Azure, Bedrock, OpenRouter
- Built-in tracing for debugging and monitoring
Installation
pip install agnt5
Quick example
from agnt5 import Agent, workflow, tool, Context, WorkflowContext
# Define a tool for the agent
@tool(auto_schema=True)
async def search_docs(ctx: Context, query: str) -> str:
"""Search documentation for answers."""
# Your search logic here
return f"Found documentation about: {query}"
# Create an AI agent with tools
agent = Agent(
name="assistant",
model="openai/gpt-4o-mini",
instructions="You are a helpful assistant. Search docs when needed.",
tools=[search_docs]
)
# Create a durable workflow that orchestrates the agent
@workflow
async def process_question(ctx: WorkflowContext, question: str) -> dict:
"""Durable workflow for processing questions."""
# Step 1: Get answer from agent (checkpointed)
answer = await ctx.step("get_answer", agent.run(question))
# Step 2: Store result (checkpointed)
await ctx.step("store", save_answer(question, answer))
return {"question": question, "answer": answer}
# If this crashes after step 1, it resumes from step 2 on restart
Note: Set your OPENAI_API_KEY environment variable before running.
Next Steps
Getting Started
- Quickstart - Installation, first worker, and local development setup
- Worker Runtime - Configure and deploy workers
Core Primitives
- Functions - Stateless operations with retries
- Entities - Stateful components with unique keys
- Workflows - Multi-step orchestration patterns
- Context API - Orchestration, state, AI, and observability APIs
Agent Development Kit (ADK)
- Agents - Autonomous LLM-driven systems
- Sessions - Conversation containers and multi-agent coordination
- Tools - Callable capabilities that extend agent abilities
- Memory - Long-term knowledge storage with semantic search
Examples
- Examples - Practical usage examples
Getting Started
Get up and running with the AGNT5 Python SDK in minutes. This guide covers installation, your first worker, and local development setup.
Installation
System Requirements
- Python 3.8 or higher
- pip or uv package manager
- Docker (for local development server)
Install from PyPI
pip install agnt5
Development Installation
For development or contributing to the SDK:
git clone https://github.com/agnt5/agnt5
cd agnt5/sdk/sdk-python
pip install -e .
Verify Installation
import agnt5
print(agnt5.__version__)
First Worker
Create a simple worker with a greeting function:
import asyncio
from agnt5 import Worker, function
@function()
def greet(name: str) -> str:
"""Greet a user by name."""
return f"Hello, {name}!"
@function("math_add")
def add_numbers(a: int, b: int) -> int:
"""Add two numbers together."""
return a + b
async def main():
worker = Worker(service_name="hello-service")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Local Development Setup
Start the AGNT5 Development Server
The development server runs all AGNT5 services locally in Docker:
Run Your Worker
In a new terminal, run your worker:
python worker.py
You should see output like:
INFO:agnt5.worker:Starting worker for service: hello-service
INFO:agnt5.worker:Registered function: greet
INFO:agnt5.worker:Registered function: math_add
INFO:agnt5.worker:Worker running, waiting for tasks...
Test Your Functions
Using HTTP API
Test your functions using the Gateway HTTP API:
# Test the greet function
curl -X POST http://localhost:8080/call \
-H "Content-Type: application/json" \
-d '{
"serviceName": "hello-service",
"handlerName": "greet",
"inputData": "QWxpY2U="
}'
The inputData is base64-encoded JSON. For "Alice", the base64 is "QWxpY2U=".
Using Python Client
import asyncio
import json
import base64
from agnt5 import Client
async def test_functions():
client = Client("http://localhost:8080")
# Test greet function
name = "Alice"
input_data = base64.b64encode(json.dumps(name).encode()).decode()
result = await client.call(
service_name="hello-service",
handler_name="greet",
input_data=input_data
)
print(f"Greeting result: {result}")
# Test add function
numbers = {"a": 5, "b": 3}
input_data = base64.b64encode(json.dumps(numbers).encode()).decode()
result = await client.call(
service_name="hello-service",
handler_name="math_add",
input_data=input_data
)
print(f"Addition result: {result}")
asyncio.run(test_functions())
ASGI Integration
For web applications, use the ASGI runtime:
from agnt5 import Worker, function
@function()
def api_handler(request: dict) -> dict:
return {
"message": "Hello from AGNT5!",
"received": request
}
# Create ASGI application
app = Worker("web-service", runtime="asgi")
app.enable_cors() # Enable CORS for browser access
Run with uvicorn:
pip install uvicorn
uvicorn asgi_app:app --reload --port 8000
Test the ASGI endpoints:
# Health check
curl http://localhost:8000/health
# List available functions
curl http://localhost:8000/functions
# Call a function
curl -X POST http://localhost:8000/invoke/api_handler \
-H "Content-Type: application/json" \
-d '{"test": "data"}'
Configuration
Environment Variables
Configure your worker using environment variables:
export AGNT5_COORDINATOR_ENDPOINT=http://localhost:9091
export AGNT5_SERVICE_NAME=my-service
export AGNT5_LOG_LEVEL=DEBUG
python worker.py
Configuration in Code
import logging
from agnt5 import Worker
from agnt5.logging import install_opentelemetry_logging
# Configure logging
logging.basicConfig(level=logging.INFO)
install_opentelemetry_logging()
# Create worker with custom configuration
worker = Worker(
service_name="configured-service",
service_version="1.2.0",
coordinator_endpoint="http://localhost:9091",
runtime="standalone"
)
Error Handling
Handle errors gracefully in your functions:
from agnt5 import function
import logging
logger = logging.getLogger(__name__)
@function()
def safe_divide(a: float, b: float) -> dict:
try:
if b == 0:
return {"error": "Division by zero", "result": None}
result = a / b
logger.info(f"Division successful: {a} / {b} = {result}")
return {"result": result, "error": None}
except Exception as e:
logger.error(f"Unexpected error: {e}")
return {"error": str(e), "result": None}
Development Tips
Hot Reload
During development, restart your worker when code changes:
import asyncio
import sys
from pathlib import Path
from agnt5 import Worker, function
# Add auto-reload during development
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nWorker stopped")
sys.exit(0)
Debugging
Enable debug logging to see detailed execution information:
import logging
logging.basicConfig(level=logging.DEBUG)
from agnt5.logging import install_opentelemetry_logging
install_opentelemetry_logging(level=logging.DEBUG)
Testing Functions
Test your functions locally without the full platform:
from agnt5.decorators import execute_component
# Test function directly
result = execute_component("greet", b'{"name": "Alice"}')
print(result)
Next Steps
Core Primitives
- Functions - Stateless operations with retries
- Entities - Stateful components with unique keys
- Workflows - Multi-step orchestration
- Context API - Full API reference
Agent Development Kit
- Agents - Autonomous LLM-driven systems
- Tools - Extend agent capabilities
- Sessions - Conversation management
- Memory - Long-term knowledge storage
Configuration
- Worker Runtime - Configure and deploy workers
- Examples - Real-world usage patterns
Functions
Functions are the core building blocks of AGNT5 applications. Use the @function decorator to register Python callables as invokable components that can be discovered and executed by the platform.
Basic Usage
Simple Function
from agnt5 import function
@function()
def greet_user(name: str) -> str:
"""Greet a user by name."""
return f"Hello, {name}!"
Named Function
Override the registered name:
@function("math.add")
def add_numbers(a: int, b: int) -> int:
"""Add two numbers together."""
return a + b
Function with Context
Access execution metadata through the context parameter:
from agnt5 import function
from agnt5.components import ExecutionContext
@function()
def context_aware(ctx: ExecutionContext, data: dict) -> dict:
"""Process data with execution context."""
return {
"invocation_id": ctx.invocation_id,
"service_name": ctx.metadata.get("service_name"),
"processed_data": data,
"component_type": ctx.component_type.value
}
Decorator Parameters
function(name=None)
| Parameter | Type | Description |
|---|---|---|
name | str | None | Override the registered function name. Defaults to the original function name. |
# Uses function name "process_data"
@function()
def process_data(data: dict) -> dict:
return data
# Uses custom name "data_processor"
@function("data_processor")
def process_data(data: dict) -> dict:
return data
Handler Signatures
AGNT5 supports flexible function signatures to accommodate different use cases.
Without Context
For simple stateless functions:
@function()
def calculate_tax(amount: float, rate: float) -> float:
return amount * rate
@function()
def format_message(template: str, **kwargs) -> str:
return template.format(**kwargs)
With Context
When you need access to invocation metadata:
@function()
def audit_handler(ctx: ExecutionContext, action: str, data: dict) -> dict:
"""Handler that logs audit information."""
import logging
logger = logging.getLogger(__name__)
logger.info(f"Audit: {action} from {ctx.invocation_id}")
return {
"action": action,
"invocation_id": ctx.invocation_id,
"data": data,
"timestamp": time.time()
}
Async Functions
AGNT5 supports both synchronous and asynchronous functions:
Async Handler
import asyncio
from agnt5 import function
@function()
async def async_processor(data: dict) -> dict:
"""Async processing with I/O operations."""
# Simulate async I/O
await asyncio.sleep(0.1)
# Async HTTP request example
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.example.com/process",
json=data
)
external_result = response.json()
return {
"original": data,
"external": external_result,
"processed_at": datetime.utcnow().isoformat()
}
Async with Context
@function()
async def async_context_handler(ctx: ExecutionContext, query: str) -> dict:
"""Async handler with context access."""
# Use context for correlation
correlation_id = ctx.metadata.get("correlation_id", ctx.invocation_id)
# Async database query
result = await database.execute(
"SELECT * FROM items WHERE name LIKE %s",
f"%{query}%"
)
return {
"correlation_id": correlation_id,
"query": query,
"results": [dict(row) for row in result]
}
Streaming Functions
For functions that need to return multiple responses over time:
Basic Streaming
from agnt5 import function
@function(streaming=True)
async def stream_data(count: int):
"""Stream multiple data chunks."""
for i in range(count):
yield {
"chunk": i,
"data": f"Data chunk {i}",
"timestamp": time.time()
}
await asyncio.sleep(0.1) # Simulate processing time
Streaming with Context
@function(streaming=True)
async def stream_with_context(ctx: ExecutionContext, query: str):
"""Stream search results progressively."""
search_id = ctx.invocation_id
# Stream results as they're found
async for result in search_engine.stream_search(query):
yield {
"search_id": search_id,
"result": result,
"timestamp": time.time()
}
Function Metadata
The SDK automatically captures and provides metadata about registered functions:
Inspecting Functions
from agnt5.decorators import get_registered_functions, get_function_metadata
# Get all registered functions
functions = get_registered_functions()
print(f"Registered functions: {list(functions.keys())}")
# Get metadata for a specific function
@function()
def sample_function(name: str, age: int = 25) -> dict:
return {"name": name, "age": age}
metadata = get_function_metadata(sample_function)
print(metadata)
Output:
{
"name": "sample_function",
"type": "function",
"parameters": [
{"name": "name", "type": "str", "required": True},
{"name": "age", "type": "int", "required": False, "default": 25}
],
"return_type": "dict"
}
Runtime Annotations
The decorator adds runtime annotations to functions:
@function()
def annotated_function(data: str) -> str:
return data.upper()
# Check annotations
print(annotated_function._agnt5_handler_name) # "annotated_function"
print(annotated_function._agnt5_is_function) # True
Error Handling
Basic Error Handling
@function()
def safe_divider(a: float, b: float) -> dict:
"""Safely divide two numbers."""
try:
if b == 0:
return {"error": "Division by zero", "result": None}
result = a / b
return {"result": result, "error": None}
except Exception as e:
return {"error": str(e), "result": None}
Context-Aware Error Handling
import logging
from agnt5 import function
from agnt5.components import ExecutionContext
@function()
def robust_handler(ctx: ExecutionContext, data: dict) -> dict:
"""Handler with comprehensive error handling."""
logger = logging.getLogger(__name__)
try:
# Log the invocation
logger.info(f"Processing invocation {ctx.invocation_id}")
# Validate input
if not isinstance(data, dict):
raise ValueError("Input must be a dictionary")
required_fields = ["id", "name"]
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
raise ValueError(f"Missing required fields: {missing_fields}")
# Process data
result = {
"processed": True,
"id": data["id"],
"name": data["name"].upper(),
"invocation_id": ctx.invocation_id
}
logger.info(f"Successfully processed {data['id']}")
return result
except ValueError as e:
logger.warning(f"Validation error in {ctx.invocation_id}: {e}")
return {"error": f"Validation error: {e}", "result": None}
except Exception as e:
logger.error(f"Unexpected error in {ctx.invocation_id}: {e}")
return {"error": "Internal error", "result": None}
Type Annotations
Use Python type hints for better documentation and validation:
Basic Types
from typing import Dict, List, Optional, Union
@function()
def typed_handler(
name: str,
age: int,
tags: List[str],
metadata: Optional[Dict[str, any]] = None
) -> Dict[str, Union[str, int, List[str]]]:
"""Handler with comprehensive type annotations."""
return {
"name": name,
"age": age,
"tags": tags,
"has_metadata": metadata is not None
}
Pydantic Models
For complex data validation:
from pydantic import BaseModel, Field
from typing import Optional
from agnt5 import function
class UserRequest(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
email: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
age: Optional[int] = Field(None, ge=0, le=150)
class UserResponse(BaseModel):
id: str
name: str
email: str
age: Optional[int]
created_at: str
@function()
def create_user(request: UserRequest) -> UserResponse:
"""Create a user with validation."""
user_id = generate_user_id()
return UserResponse(
id=user_id,
name=request.name,
email=request.email,
age=request.age,
created_at=datetime.utcnow().isoformat()
)
Testing Functions
Direct Testing
Test functions directly without the full platform:
import pytest
from agnt5.decorators import execute_component
from agnt5.components import ExecutionContext, ComponentType
def test_greet_function():
# Test with execute_component
result = execute_component(
"greet_user",
b'{"name": "Alice"}',
context=None
)
# Result is JSON bytes
import json
parsed = json.loads(result.decode())
assert parsed == "Hello, Alice!"
def test_context_function():
# Create mock context
ctx = ExecutionContext(
invocation_id="test-123",
component_type=ComponentType.FUNCTION
)
# Test directly
result = context_aware(ctx, {"test": "data"})
assert result["invocation_id"] == "test-123"
Async Testing
import pytest
import asyncio
@pytest.mark.asyncio
async def test_async_function():
result = await async_processor({"test": "data"})
assert "original" in result
assert "processed_at" in result
Mock Context Testing
from unittest.mock import Mock
def test_with_mock_context():
# Create mock context
mock_ctx = Mock(spec=ExecutionContext)
mock_ctx.invocation_id = "mock-123"
mock_ctx.component_type = ComponentType.FUNCTION
mock_ctx.metadata = {"service_name": "test-service"}
# Test function
result = context_aware(mock_ctx, {"test": "data"})
assert result["invocation_id"] == "mock-123"
assert result["service_name"] == "test-service"
Function Registry
Registry Management
from agnt5.decorators import (
get_registered_functions,
clear_registry,
get_function_metadata
)
# Get all registered functions
functions = get_registered_functions()
# Clear registry (useful for testing)
clear_registry()
# Re-register functions
@function()
def new_function(data: str) -> str:
return data.upper()
# Inspect metadata
metadata = get_function_metadata(new_function)
Custom Registration
For advanced use cases, register functions manually:
from agnt5.decorators import register_function
def my_handler(data: str) -> str:
return data.lower()
# Manual registration
register_function("custom_handler", my_handler)
Best Practices
Function Design
- Keep functions focused - Each function should have a single responsibility
- Use type hints - Improve documentation and enable validation
- Handle errors gracefully - Return error information rather than raising exceptions
- Log appropriately - Use structured logging for debugging and monitoring
Performance
- Minimize imports - Import only what you need
- Use async for I/O - Async functions for database queries and API calls
- Cache expensive operations - Use local caching for repeated computations
- Batch operations - Process multiple items together when possible
Testing
- Test functions directly - Unit test without the platform
- Mock external dependencies - Use mocks for databases, APIs, etc.
- Test error conditions - Ensure error handling works correctly
- Use fixtures - Share common test data and setup
Next Steps
- Workflows - Multi-step orchestration patterns
- Worker Runtime - Configure and deploy workers
- API Reference - Complete decorator API reference
- Examples - Real-world function examples
Entities
Entities are stateful components identified by unique keys. Use entities to model AI agents with conversation history, workflow orchestrators, or any business object that maintains state across interactions.
Key Characteristics
- Unique Key - Each instance identified by a unique key (e.g.,
agent-conv-123) - Private State - Built-in key-value storage per instance
- Single-Writer - Automatic consistency - only one write operation per key at a time
- Durable - State survives crashes and restarts
- Scalable - Different keys execute in parallel
Implementation Status
Entities are being implemented in Phase 2 of AGNT5 (Target: Q1 2025). The API shown represents the planned design. Check current SDK status for availability.
Basic Usage
Creating an Entity
from agnt5 import entity
# Create entity type
agent = entity("ConversationAgent")
# Write method (exclusive access per key)
@agent.write
async def send_message(ctx, message: str) -> dict:
history = await ctx.get("history", [])
history.append({"role": "user", "content": message})
response = await call_llm(history)
history.append({"role": "assistant", "content": response})
ctx.set("history", history)
return {"response": response}
# Shared method (read-only, concurrent)
@agent.shared
async def get_history(ctx) -> list:
return await ctx.get("history", [])
Calling Entities
Call entity methods from functions:
from agnt5 import function
@function
async def chat(ctx, conv_id: str, msg: str):
# Call entity method with unique key
return await ctx.entity("ConversationAgent", conv_id).send_message(msg)
Entity API
Core Methods
| API | Description |
|---|---|
entity("name") | Create entity type |
@entity.write | Write method (exclusive per key) |
@entity.shared | Shared method (read-only, concurrent) |
ctx.get(key, default) | Get state value |
ctx.set(key, value) | Set state value |
ctx.delete(key) | Delete state key |
ctx.entity(type, key).method() | Call entity from function |
State Operations
Common Patterns
Conversational AI Agent
agent = entity("ChatAgent")
@agent.write
async def send_message(ctx, message: str) -> dict:
"""Handle conversational turns with LLM."""
history = await ctx.get("history", [])
history.append({"role": "user", "content": message})
# Generate response
response = await ctx.llm.generate(
prompt=history,
model="gpt-4"
)
history.append({"role": "assistant", "content": response.text})
# Keep last 20 messages
if len(history) > 20:
history = history[-20:]
ctx.set("history", history)
return {"response": response.text}
@agent.shared
async def get_history(ctx) -> list:
"""Get conversation history (read-only)."""
return await ctx.get("history", [])
@agent.shared
async def get_message_count(ctx) -> int:
"""Get total message count."""
history = await ctx.get("history", [])
return len(history)
Usage:
@function
async def chat_endpoint(ctx, conversation_id: str, message: str):
# Call entity with unique conversation ID
return await ctx.entity("ChatAgent", conversation_id).send_message(message)
Research Agent
research_agent = entity("ResearchAgent")
@research_agent.write
async def start_research(ctx, topic: str) -> dict:
"""Initialize research task."""
ctx.set("topic", topic)
ctx.set("findings", [])
ctx.set("status", "in_progress")
return {"status": "started", "topic": topic}
@research_agent.write
async def add_finding(ctx, finding: str, source: str) -> dict:
"""Add research finding."""
findings = await ctx.get("findings", [])
findings.append({
"content": finding,
"source": source,
"timestamp": datetime.now().isoformat()
})
ctx.set("findings", findings)
return {"count": len(findings)}
@research_agent.write
async def synthesize(ctx) -> dict:
"""Generate summary from findings."""
findings = await ctx.get("findings", [])
topic = await ctx.get("topic")
# Use LLM to synthesize
summary = await ctx.llm.generate(
prompt=f"Synthesize these findings about {topic}: {findings}",
model="gpt-4"
)
ctx.set("summary", summary.text)
ctx.set("status", "completed")
return {"summary": summary.text}
@research_agent.shared
async def get_progress(ctx) -> dict:
"""Check research progress."""
return {
"status": await ctx.get("status"),
"topic": await ctx.get("topic"),
"findings_count": len(await ctx.get("findings", []))
}
Workflow Orchestrator
workflow = entity("WorkflowOrchestrator")
@workflow.write
async def start(ctx, steps: list) -> dict:
"""Start workflow execution."""
ctx.set("steps", steps)
ctx.set("current_step", 0)
ctx.set("results", [])
ctx.set("status", "running")
return {"status": "started", "total_steps": len(steps)}
@workflow.write
async def complete_step(ctx, result: dict) -> dict:
"""Mark step as complete and store result."""
results = await ctx.get("results", [])
results.append(result)
ctx.set("results", results)
current = len(results)
ctx.set("current_step", current)
# Check if workflow is done
steps = await ctx.get("steps", [])
if current >= len(steps):
ctx.set("status", "completed")
return {"completed": current, "total": len(steps)}
@workflow.shared
async def get_progress(ctx) -> dict:
"""Get workflow progress."""
return {
"current_step": await ctx.get("current_step", 0),
"total_steps": len(await ctx.get("steps", [])),
"status": await ctx.get("status", "unknown")
}
Consistency & Concurrency
Single-Writer Per Key
Only one write operation per entity key executes at a time:
# Same key = serial execution (consistency guaranteed)
await ctx.entity("agent", "conv-1").send_message("msg1") # Runs first
await ctx.entity("agent", "conv-1").send_message("msg2") # Runs second
# No race conditions, no lost updates
Parallel Execution Across Keys
Different entity keys execute in parallel:
# Different keys = parallel execution (scales horizontally)
await ctx.entity("agent", "conv-1").send_message(msg) # Parallel
await ctx.entity("agent", "conv-2").send_message(msg) # Parallel
await ctx.entity("agent", "conv-3").send_message(msg) # Parallel
Shared Methods for Reads
Use @entity.shared for read-only operations that can run concurrently:
# Multiple shared calls can run in parallel for same key
@agent.shared
async def get_history(ctx) -> list:
return await ctx.get("history", [])
# These execute concurrently
await ctx.entity("agent", "conv-1").get_history() # Concurrent
await ctx.entity("agent", "conv-1").get_history() # Concurrent
Best Practices
1. Choose Stable, Meaningful Keys
Use unique, stable identifiers for entity keys:
2. Design for Concurrency
Choose key granularity for optimal parallelism:
# ✓ Good - One entity per conversation
await ctx.entity("ChatAgent", f"conv-{conv_id}").send_message(msg)
# ✗ Bad - Single global entity (serializes everything)
await ctx.entity("ChatAgent", "global").send_message(msg)
3. Use Shared for Read Operations
Enable concurrent reads with @entity.shared:
# Write methods - exclusive access
@agent.write
async def update_state(ctx, data: dict):
ctx.set("state", data)
# Read methods - concurrent access
@agent.shared
async def get_state(ctx) -> dict:
return await ctx.get("state", {})
4. Keep State Minimal
Store only what you need:
# ✓ Good - Essential state only
ctx.set("history", recent_messages[-20:])
ctx.set("summary", summary_text)
# ✗ Avoid - Excessive state
ctx.set("full_transcript", all_messages) # Could be huge
ctx.set("raw_responses", all_llm_responses) # Redundant
Entity Use Cases
| Use Case | Entity Key | State Stored |
|---|---|---|
| AI Chat Agent | agent-conv-{id} | Conversation history, context |
| Research Task | research-{task_id} | Findings, sources, summary |
| Workflow Orchestrator | workflow-{run_id} | Step progress, results |
| User Context | user-{user_id} | Preferences, personalization |
| Shopping Cart | cart-{session_id} | Items, totals, discounts |
| Game Session | game-{session_id} | Player state, score, progress |
Functions vs Entities
| Aspect | Functions | Entities |
|---|---|---|
| State | Stateless | Stateful (KV store) |
| Identity | No identity | Unique key per instance |
| Concurrency | Parallel by default | Serial per key, parallel across keys |
| Consistency | No consistency needed | Single-writer guarantee |
| Use Case | Transformations, API calls | Stateful AI agents, workflows |
When to use Functions:
- Stateless operations
- Independent requests
- Data transformations
- API integrations
When to use Entities:
- Stateful AI agents with memory
- Workflow orchestration
- User sessions and context
- Any state that needs consistency
Next Steps
- Context API - Entity state operations and APIs
- Functions - Stateless operations
- Workflows - Multi-step orchestration
- Agent Component - AI agents built on entities
Workflows
Workflows enable durable, multi-step orchestration with automatic recovery, state persistence, and complex dependency management. Built on the AGNT5 orchestration plane for exactly-once execution guarantees.
Basic Workflow
Simple Sequential Workflow
from agnt5 import workflow, task_step
from agnt5.workflows import FlowDefinition
@workflow()
def data_pipeline() -> FlowDefinition:
return FlowDefinition([
task_step(
name="extract",
service_name="etl-service",
handler_name="extract_data"
),
task_step(
name="transform",
service_name="etl-service",
handler_name="transform_data",
dependencies=["extract"]
),
task_step(
name="load",
service_name="etl-service",
handler_name="load_data",
dependencies=["transform"]
)
])
Parallel Execution
Steps without dependencies execute in parallel:
@workflow()
def parallel_processing() -> FlowDefinition:
return FlowDefinition([
# These three steps run in parallel
task_step(
name="process_a",
service_name="service",
handler_name="process_type_a"
),
task_step(
name="process_b",
service_name="service",
handler_name="process_type_b"
),
task_step(
name="process_c",
service_name="service",
handler_name="process_type_c"
),
# This step waits for all three to complete
task_step(
name="merge_results",
service_name="service",
handler_name="merge_results",
dependencies=["process_a", "process_b", "process_c"]
)
])
Step Types
Task Steps
Execute function handlers on services:
from agnt5.workflows import task_step
# Basic task step
step = task_step(
name="unique_step_name",
service_name="my-service",
handler_name="my_handler"
)
# Task with dependencies and input data
step = task_step(
name="dependent_step",
service_name="my-service",
handler_name="process_data",
dependencies=["previous_step"],
input_data={"config": "production", "batch_size": 100}
)
# Task with object keys (Phase 2)
step = task_step(
name="object_step",
service_name="object-service",
handler_name="update_state",
dependencies=["init_step"],
object_keys=["user:123", "cart:456"]
)
Wait Signal Steps
Pause execution until external signals:
from agnt5.workflows import wait_signal_step
# Basic signal wait
step = wait_signal_step(
name="wait_for_approval",
signal_name="approval_granted",
dependencies=["review_step"]
)
# Signal wait with timeout
step = wait_signal_step(
name="wait_with_timeout",
signal_name="user_action",
dependencies=["prompt_user"],
timeout_ms=300000, # 5 minutes
on_timeout="timeout_handler_step"
)
Wait Timer Steps
Scheduled delays and cron-based execution:
from agnt5.workflows import wait_timer_step
# Fixed delay
step = wait_timer_step(
name="delay_before_retry",
timer_key="retry_delay",
delay_ms=30000, # 30 seconds
dependencies=["failed_step"]
)
# Cron schedule
step = wait_timer_step(
name="nightly_batch",
timer_key="nightly",
cron_expr="0 2 * * *", # 2 AM daily
dependencies=["prep_step"]
)
# Timer with retries
step = wait_timer_step(
name="retry_with_backoff",
timer_key="exponential_backoff",
delay_ms=5000,
max_retries=3,
dependencies=["error_step"]
)
Workflow Examples
ETL Pipeline
@workflow()
def nightly_etl() -> FlowDefinition:
return FlowDefinition([
# Start with data validation
task_step(
name="validate_sources",
service_name="etl-service",
handler_name="validate_data_sources"
),
# Extract from multiple sources in parallel
task_step(
name="extract_database",
service_name="etl-service",
handler_name="extract_from_database",
dependencies=["validate_sources"]
),
task_step(
name="extract_api",
service_name="etl-service",
handler_name="extract_from_api",
dependencies=["validate_sources"]
),
task_step(
name="extract_files",
service_name="etl-service",
handler_name="extract_from_files",
dependencies=["validate_sources"]
),
# Wait for all extractions to complete
task_step(
name="merge_extracted_data",
service_name="etl-service",
handler_name="merge_data",
dependencies=["extract_database", "extract_api", "extract_files"]
),
# Transform data
task_step(
name="clean_data",
service_name="etl-service",
handler_name="clean_and_normalize",
dependencies=["merge_extracted_data"]
),
task_step(
name="enrich_data",
service_name="etl-service",
handler_name="enrich_with_metadata",
dependencies=["clean_data"]
),
# Wait for maintenance window
wait_timer_step(
name="wait_for_maintenance_window",
timer_key="maintenance",
cron_expr="0 3 * * *", # 3 AM
dependencies=["enrich_data"]
),
# Load data
task_step(
name="load_to_warehouse",
service_name="etl-service",
handler_name="load_data_warehouse",
dependencies=["wait_for_maintenance_window"]
),
# Generate reports
task_step(
name="generate_reports",
service_name="reporting-service",
handler_name="generate_daily_reports",
dependencies=["load_to_warehouse"]
)
])
Approval Workflow
@workflow()
def document_approval() -> FlowDefinition:
return FlowDefinition([
# Submit document for review
task_step(
name="submit_document",
service_name="doc-service",
handler_name="submit_for_review",
input_data={"priority": "normal"}
),
# Notify reviewers
task_step(
name="notify_reviewers",
service_name="notification-service",
handler_name="send_review_notifications",
dependencies=["submit_document"]
),
# Wait for approval (with timeout)
wait_signal_step(
name="wait_for_approval",
signal_name="document_approved",
dependencies=["notify_reviewers"],
timeout_ms=172800000, # 48 hours
on_timeout="escalate_approval"
),
# Escalation path
task_step(
name="escalate_approval",
service_name="doc-service",
handler_name="escalate_to_manager",
# No dependencies - triggered by timeout
),
# Wait for escalated approval
wait_signal_step(
name="wait_escalated_approval",
signal_name="escalated_approval",
dependencies=["escalate_approval"],
timeout_ms=86400000 # 24 hours
),
# Publish approved document
task_step(
name="publish_document",
service_name="doc-service",
handler_name="publish_approved_document",
dependencies=["wait_for_approval", "wait_escalated_approval"]
)
])
Workflow Registration
Using the Decorator
from agnt5 import workflow
from agnt5.workflows import get_registered_workflows
@workflow()
def my_workflow() -> FlowDefinition:
return FlowDefinition([...])
@workflow("custom_name")
def workflow_with_custom_name() -> FlowDefinition:
return FlowDefinition([...])
# Inspect registered workflows
workflows = get_registered_workflows()
print(f"Registered workflows: {list(workflows.keys())}")
Manual Registration
from agnt5.workflows import register_workflow
def create_workflow_definition() -> FlowDefinition:
return FlowDefinition([
task_step("step1", service_name="service", handler_name="handler1"),
task_step("step2", service_name="service", handler_name="handler2",
dependencies=["step1"])
])
# Manual registration
register_workflow("manual_workflow", create_workflow_definition())
Workflow Data Classes
FlowDefinition
Container for workflow steps with serialization:
from agnt5.workflows import FlowDefinition
# Create definition
flow = FlowDefinition([
task_step("step1", service_name="svc", handler_name="h1"),
task_step("step2", service_name="svc", handler_name="h2", dependencies=["step1"])
])
# Serialize to dictionary
flow_dict = flow.to_dict()
# Serialize to JSON string
flow_json = flow.to_json()
WorkflowStep
Individual step configuration:
from agnt5.workflows import WorkflowStep, StepType
# Manual step creation (usually use helper functions instead)
step = WorkflowStep(
name="custom_step",
step_type=StepType.TASK,
service_name="my-service",
handler_name="my_handler",
dependencies=["previous_step"],
input_data={"key": "value"}
)
Configuration Classes
from agnt5.workflows import SignalConfig, TimerConfig
# Signal configuration
signal_config = SignalConfig(
name="approval_signal",
timeout_ms=3600000, # 1 hour
on_timeout="timeout_step"
)
# Timer configuration
timer_config = TimerConfig(
key="batch_timer",
delay_ms=60000, # 1 minute
max_retries=5
)
# Cron timer configuration
cron_timer = TimerConfig(
key="daily_job",
cron_expr="0 0 * * *", # Daily at midnight
max_retries=3
)
Validation and Error Handling
Workflow Validation
AGNT5 validates workflows during registration:
from agnt5.workflows import register_workflow, FlowDefinition, task_step
# This will raise ValueError: Missing dependencies
try:
invalid_flow = FlowDefinition([
task_step("step2", service_name="svc", handler_name="h2",
dependencies=["step1"]), # step1 doesn't exist
task_step("step3", service_name="svc", handler_name="h3",
dependencies=["step2"])
])
register_workflow("invalid", invalid_flow)
except ValueError as e:
print(f"Validation error: {e}")
Validation Rules
- At least one step must be defined
- Step names must be unique within the workflow
- Dependencies must reference existing steps
- Dependencies must appear earlier in the definition (causal order)
- Required fields must be populated based on step type
Testing Workflows
Workflow Definition Testing
import pytest
from agnt5.workflows import FlowDefinition, task_step, get_registered_workflows
def test_workflow_definition():
# Test workflow structure
flow = FlowDefinition([
task_step("extract", service_name="etl", handler_name="extract"),
task_step("transform", service_name="etl", handler_name="transform",
dependencies=["extract"])
])
# Verify serialization
flow_dict = flow.to_dict()
assert len(flow_dict["steps"]) == 2
assert flow_dict["steps"][1]["dependencies"] == ["extract"]
def test_workflow_registration():
@workflow()
def test_workflow() -> FlowDefinition:
return FlowDefinition([
task_step("test_step", service_name="test", handler_name="test")
])
# Verify registration
workflows = get_registered_workflows()
assert "test_workflow" in workflows
Integration Testing
Test workflows with a local development environment:
import asyncio
from agnt5 import Client
async def test_workflow_execution():
client = Client("http://localhost:8080")
# Trigger workflow
result = await client.start_workflow(
workflow_name="data_pipeline",
input_data={"source": "test_data"}
)
workflow_id = result["workflow_id"]
# Poll for completion
while True:
status = await client.get_workflow_status(workflow_id)
if status["state"] in ["completed", "failed"]:
break
await asyncio.sleep(1)
assert status["state"] == "completed"
Best Practices
Design Patterns
- Idempotent Steps - Design steps to be safely retryable
- Small Steps - Break complex operations into smaller, focused steps
- Clear Dependencies - Make step relationships explicit
- Meaningful Names - Use descriptive names for steps and workflows
- Error Handling - Plan for failure and recovery scenarios
Performance
- Parallel Execution - Remove unnecessary dependencies to enable parallelism
- Batch Operations - Group related operations into single steps
- Resource Management - Consider resource usage when designing workflows
- State Minimization - Keep workflow state as small as possible
Monitoring
- Structured Logging - Add logging to workflow steps
- Progress Tracking - Use meaningful step names and descriptions
- Metrics Collection - Track workflow success rates and durations
- Error Alerting - Set up alerts for workflow failures
Next Steps
- Worker Runtime - Configure and deploy workers
- API Reference - Complete workflows API reference
- Examples - Real-world workflow patterns
Context API
The Context (ctx) is the execution environment provided to all AGNT5 components. It provides APIs for orchestration, state management, LLM interactions, coordination, and observability.
Core Capabilities
- Orchestration - Execute tasks, spawn functions, parallel execution
- State Management - Get/set/delete state for entities
- Coordination - Signals, timers, human approvals
- AI Integration - LLM calls, tool registration
- Observability - Logging, metrics, tracing
Orchestration APIs
Task Execution
Execute functions and wait for results (workflows only):
@workflow
async def process_workflow(ctx):
# Execute a task
result = await ctx.task(
service_name="analytics",
handler_name="process_data",
input={"dataset": "users"}
)
return result
Parallel Execution
Run multiple tasks concurrently:
Async Invocation
Spawn child functions without waiting:
@function
async def batch_processor(ctx, items: list):
# Spawn child invocations
handles = []
for item in items:
handle = ctx.spawn(process_item, item, key=f"item-{item['id']}")
handles.append(handle)
# Continue other work...
# Wait for results later if needed
results = [await h.result() for h in handles]
return {"processed": len(results)}
Checkpointing
Checkpoint expensive operations (functions only):
@function
async def process_pipeline(ctx, data_id: str):
# Each step is checkpointed
raw = await ctx.step("extract", lambda: extract_data(data_id))
cleaned = await ctx.step("clean", lambda: clean_data(raw))
result = await ctx.step("analyze", lambda: analyze(cleaned))
# If crash occurs, resumes from last completed step
return result
State Management (Entities)
Get, Set, Delete
Manage entity state:
@entity.write
async def update_profile(ctx, name: str, age: int):
# Get with default
profile = await ctx.get("profile", {})
# Update profile
profile.update({"name": name, "age": age})
# Set state
ctx.set("profile", profile)
ctx.set("last_updated", datetime.now().isoformat())
# Delete temporary data
ctx.delete("temp_cache")
return {"status": "updated"}
Entity Method Calls
Call entity methods from functions:
@function
async def chat(ctx, conversation_id: str, message: str):
# Call entity method
response = await ctx.entity(
"ChatAgent",
conversation_id
).send_message(message)
return response
Coordination APIs
Signals
Wait for external events:
Timers & Sleep
Add delays and scheduled execution:
@workflow
async def scheduled_job(ctx):
# Wait 5 seconds
await ctx.timer(delay_ms=5000)
# Or use sleep (alternative syntax)
await ctx.sleep(30) # 30 seconds
# Wait until specific time (cron)
await ctx.timer(cron="0 0 * * *") # Daily at midnight
return {"status": "completed"}
Human-in-the-Loop
Request human approval:
@workflow
async def deployment_workflow(ctx, version: str):
# Run tests
test_results = await ctx.task("ci", "run_tests", input={"version": version})
if test_results["passed"]:
# Request human approval for production
approval = await ctx.human.approval(
"deploy_production",
payload={"version": version, "tests": test_results},
timeout=timedelta(minutes=30),
required_roles=["admin", "devops"]
)
if approval.decision == "approved":
await ctx.task("deploy", "to_production", input={"version": version})
return {"status": "deployed"}
return {"status": "cancelled"}
AI Integration
LLM Generation
Generate text or structured responses:
Streaming
Stream responses for real-time output:
@function
async def stream_story(ctx, topic: str):
# Stream text generation
async for chunk in await ctx.llm.stream(
prompt=f"Write a story about {topic}",
model="gpt-4o"
):
if chunk.text:
yield chunk.text # Stream to client
Tool Registration
Register tools for LLM use:
@function
async def agent_with_tools(ctx, query: str):
# Register search tool
search_tool = ctx.tools.register(
"web_search",
handler=perform_search,
description="Search the web for information",
schema={
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer"}
}
}
)
# Generate with tool
response = await ctx.llm.generate(
prompt=query,
tools=[search_tool],
model="gpt-4o"
)
return response
Observability
Logging
Structured logging with context:
@function
async def tracked_operation(ctx, data: dict):
logger = ctx.log()
logger.info("Processing started", extra={"data_size": len(data)})
try:
result = process(data)
logger.info("Processing completed", extra={"result_size": len(result)})
return result
except Exception as e:
logger.error("Processing failed", exc_info=True)
raise
Metrics
Record custom metrics:
@function
async def monitored_function(ctx, request: dict):
metrics = ctx.metrics()
# Increment counter
metrics.increment("requests.count", service="api")
# Record timing
start = time.time()
result = await process_request(request)
duration = (time.time() - start) * 1000
metrics.observe("latency.ms", duration, endpoint="/api/process")
return result
Distributed Tracing
Create spans for tracing:
@function
async def traced_operation(ctx, data: dict):
# Create span for external API call
with ctx.trace_span().start("external_api_call", service="payments"):
result = await call_payment_api(data)
# Create span for database operation
with ctx.trace_span().start("database_query", service="postgres"):
await save_to_db(result)
return result
Configuration & Secrets
Secrets
Access secrets securely:
@function
async def api_call(ctx, endpoint: str):
# Get API key from secrets
api_key = ctx.secrets().get("openai_api_key")
db_password = ctx.secrets().get("database_password")
# Use secrets in API calls
response = await make_request(endpoint, api_key=api_key)
return response
Configuration
Feature flags and config:
@function
async def feature_gated_handler(ctx, data: dict):
config = ctx.config()
# Check feature flag
if config.get("new_feature_enabled", default=False):
return await new_implementation(data)
else:
return await legacy_implementation(data)
# A/B testing variant
variant = config.variant("experiment_group", default="control")
if variant == "treatment":
return await experimental_flow(data)
Request Headers
Access incoming headers:
@function
async def header_aware(ctx, data: dict):
headers = ctx.headers()
user_agent = headers.get("user-agent", "unknown")
correlation_id = headers.get("x-correlation-id")
logger = ctx.log()
logger.info(f"Request from {user_agent}", extra={"correlation_id": correlation_id})
return {"processed": True}
Context Properties
Access execution metadata:
@function
async def introspective(ctx, data: dict):
return {
"run_id": ctx.run_id, # Workflow/run identifier
"step_id": ctx.step_id, # Current step identifier
"attempt": ctx.attempt, # Retry attempt number
"component_type": ctx.component_type, # "function", "entity", "workflow"
"object_id": ctx.object_id, # Entity key (for entities)
"method_name": ctx.method_name, # Entity method name (for entities)
"processed": data
}
API Reference
Orchestration
| API | Description |
|---|---|
ctx.task(service, handler, input) | Execute function (workflows only) |
ctx.parallel(*tasks) | Run tasks in parallel |
ctx.gather(**tasks) | Parallel with named results |
ctx.spawn(fn, *args, key) | Async child invocation |
ctx.step(name, fn) | Checkpoint operation (functions) |
State (Entities)
| API | Description |
|---|---|
await ctx.get(key, default) | Get state value |
ctx.set(key, value) | Set state value |
ctx.delete(key) | Delete state key |
await ctx.entity(type, key).method() | Call entity method |
Coordination
| API | Description |
|---|---|
await ctx.signal(name, timeout_ms, default) | Wait for signal |
await ctx.signal.emit(name, payload) | Send signal |
await ctx.timer(delay_ms) | Wait with delay |
await ctx.timer(cron) | Wait until cron time |
await ctx.sleep(seconds) | Durable sleep |
await ctx.human.approval(...) | Request approval |
AI Integration
| API | Description |
|---|---|
await ctx.llm.generate(prompt, model) | Generate text/JSON |
await ctx.llm.stream(prompt, model) | Stream generation |
ctx.tools.register(name, handler, schema) | Register tool |
Observability
| API | Description |
|---|---|
ctx.log() | Get logger |
ctx.metrics() | Get metrics recorder |
ctx.trace_span().start(name, service) | Create trace span |
Configuration
| API | Description |
|---|---|
ctx.secrets().get(key) | Get secret |
ctx.config().get(key, default) | Get config value |
ctx.config().variant(key, default) | Get A/B variant |
ctx.headers() | Get request headers |
Common Patterns
Parallel with Error Handling
@workflow
async def robust_workflow(ctx):
results = await ctx.gather(
task1=ctx.task("svc", "task1"),
task2=ctx.task("svc", "task2")
)
if results["task1"] and results["task2"]:
return {"status": "success", "results": results}
else:
return {"status": "partial_failure"}
Conditional Signal Waiting
@workflow
async def conditional_approval(ctx, needs_approval: bool):
if needs_approval:
approval = await ctx.signal("approval_signal", timeout_ms=60000)
if not approval.get("approved"):
return {"status": "rejected"}
# Proceed with operation
result = await ctx.task("service", "operation")
return {"status": "completed", "result": result}
LLM with Tool Execution
@function
async def agent_handler(ctx, query: str):
# Register tools
search = ctx.tools.register("search", handler=search_web, ...)
calc = ctx.tools.register("calculator", handler=calculate, ...)
# Generate with tools
response = await ctx.llm.generate(
prompt=query,
tools=[search, calc],
model="gpt-4o"
)
# Execute tool calls if needed
if response.tool_calls:
for tool_call in response.tool_calls:
handler = ctx.tools.handler(tool_call.name)
await handler(**tool_call.arguments)
return response
Next Steps
Agents
Agents are autonomous LLM-driven systems that reason, plan, and execute tasks using tools. They orchestrate complex multi-step workflows by breaking down problems, selecting appropriate tools, and iterating until complete.
Key Characteristics
- LLM-Powered - Driven by language models for reasoning and decision-making
- Tool Orchestration - Automatically selects and executes appropriate tools
- Memory Integration - Maintains long-term knowledge across conversations
- Session Aware - Uses sessions for conversation context
- Streaming Support - Real-time event streaming for responsive UX
- Durable - Built on AGNT5 primitives for automatic fault tolerance
Basic Usage
Simple Agent
from agnt5 import Agent, LanguageModel
lm = LanguageModel()
agent = Agent(
name="assistant",
model=lm,
instructions="You are a helpful coding assistant."
)
# Run agent
result = await agent.run("Explain recursion")
print(result.output)
Agent with Tools
from agnt5 import Agent, tool, LanguageModel
@tool.function(auto_schema=True)
def search_docs(query: str, language: str = "python") -> List[Dict]:
"""Search programming language documentation."""
# Implementation
return search_results
@tool.function(auto_schema=True)
def run_code(code: str, language: str = "python") -> Dict[str, str]:
"""Execute code and return output."""
# Implementation
return {"output": result}
lm = LanguageModel()
agent = Agent(
name="coding_assistant",
model=lm,
instructions="""You are a coding assistant.
Use search_docs to find API references.
Use run_code to test code examples.""",
tools=[search_docs, run_code]
)
result = await agent.run("How do I read a file in Python? Show me an example.")
Agent with Session and Memory
from agnt5 import Agent, Session, Memory, LanguageModel
# Create session
session = Session(
id="tutoring-session-789",
user_id="student-123",
metadata={"subject": "mathematics"}
)
# Create memory
memory = Memory(service=VectorMemoryService())
await memory.store("student_level", "Advanced calculus")
# Create agent
lm = LanguageModel()
agent = Agent(
name="math_tutor",
model=lm,
instructions="You are a patient math tutor. Adapt to student's level.",
tools=[solve_equation_tool, plot_function_tool],
session=session,
memory=memory
)
result = await agent.run("Help me understand limits")
Agent Configuration
Parameters
| Parameter | Type | Description |
|---|---|---|
name | str | Unique agent name |
model | LanguageModel | LLM to use for reasoning |
instructions | str | System prompt and guidelines |
tools | List[Tool] | Tools available to agent |
session | Session | None | Session for conversation context |
memory | Memory | None | Long-term knowledge storage |
max_iterations | int | Max reasoning loops (default: 10) |
Instructions
Write clear, actionable instructions:
Streaming Agents
Stream events in real-time:
async for event in agent.stream("Analyze this dataset", session=session):
match event.type:
case "thinking":
print(f"🤔 {event.content}")
case "tool_call":
print(f"🔧 Calling {event.tool_name}({event.arguments})")
case "tool_result":
print(f"✓ Result: {event.result}")
case "response":
print(f"💬 {event.content}")
case "error":
print(f"❌ Error: {event.error}")
Agent Planning
Preview execution plan before running:
# Get plan without executing
plan = agent.plan("Analyze competitor pricing strategies")
print(f"Estimated steps: {len(plan.steps)}")
for step in plan.steps:
print(f"- {step.type}: {step.description}")
if step.tool:
print(f" Tool: {step.tool.name}")
# Review and execute if approved
if user_approves(plan):
result = await agent.run("Analyze competitor pricing strategies")
Common Patterns
Research Agent
from agnt5 import Agent, Session, Memory, tool
@tool.function(auto_schema=True)
def search_academic(query: str, year_from: int = 2020) -> List[Dict]:
"""Search academic papers."""
pass
@tool.function(auto_schema=True)
def extract_insights(paper_text: str) -> Dict[str, List[str]]:
"""Extract key insights from paper."""
pass
# Create research agent
session = Session(id="research-ai-safety-001", user_id="researcher-123")
memory = Memory(service=VectorMemoryService())
lm = LanguageModel()
research_agent = Agent(
name="research_agent",
model=lm,
instructions="""You are a research assistant specializing in AI safety.
Research process:
1. Search for relevant recent papers
2. Extract key insights from each paper
3. Identify common themes and gaps
4. Synthesize findings into comprehensive summary""",
tools=[search_academic, extract_insights],
session=session,
memory=memory
)
result = await research_agent.run(
"Survey the current state of AI alignment research"
)
# Store findings in memory
await memory.ingest_from_session(session, strategy="smart")
Multi-Agent Workflow
# Shared session for coordination
session = Session(id="product-launch-001", user_id="pm-456")
# Specialized agents
market_researcher = Agent(
name="market_analyst",
model=lm,
tools=[market_data_tool, competitor_analysis_tool],
session=session,
instructions="Analyze market opportunities and competitive landscape."
)
product_designer = Agent(
name="designer",
model=lm,
tools=[design_tool, user_research_tool],
session=session,
instructions="Design products based on market research and user needs."
)
technical_lead = Agent(
name="tech_lead",
model=lm,
tools=[architecture_tool, feasibility_tool],
session=session,
instructions="Assess technical feasibility and propose architecture."
)
# Sequential execution with shared context
market_analysis = await market_researcher.run(
"Analyze market for AI-powered code review tools"
)
product_specs = await product_designer.run(
"Design product based on market analysis"
)
tech_assessment = await technical_lead.run(
"Evaluate technical feasibility of proposed product"
)
Agent Handoff
from agnt5.tools import AgentTool
# Create specialized agents
billing_agent = Agent(
name="billing_specialist",
model=lm,
tools=[payment_tool, invoice_tool, refund_tool],
instructions="Handle billing, payments, and refunds."
)
technical_agent = Agent(
name="tech_support",
model=lm,
tools=[diagnostic_tool, fix_tool],
instructions="Diagnose and fix technical issues."
)
# Coordinator with handoff capability
coordinator = Agent(
name="coordinator",
model=lm,
tools=[
classify_request_tool,
AgentTool(target_agent=billing_agent),
AgentTool(target_agent=technical_agent)
],
instructions="""You are a support coordinator.
Classify requests and hand off to appropriate specialist.
Hand off to:
- billing_specialist: payment, invoice, refund questions
- tech_support: technical issues, bugs, troubleshooting"""
)
session = Session(id="support-ticket-789", user_id="customer-123")
result = await coordinator.run(
"I was charged twice for my subscription",
session=session
)
Human-in-the-Loop Agent
@tool.function(auto_schema=True, confirmation=True)
def deploy_to_production(version: str) -> Dict[str, str]:
"""Deploy application to production.
Warning: Requires human approval.
"""
pass
deployment_agent = Agent(
name="deployer",
model=lm,
tools=[run_tests_tool, deploy_to_production],
instructions="""Run all tests before deploying.
Always request human approval for production deployments."""
)
result = await deployment_agent.run("Deploy version 2.0 to production")
# Agent runs tests, then waits for human approval before deploying
Iterative Problem Solving
debugging_agent = Agent(
name="debugger",
model=lm,
tools=[
analyze_logs_tool,
run_diagnostic_tool,
apply_fix_tool,
verify_fix_tool
],
instructions="""You are a debugging assistant.
Process:
1. Analyze error logs to identify root cause
2. Run diagnostics to confirm hypothesis
3. Apply potential fix
4. Verify fix works
5. If not fixed, iterate (max 3 attempts)
Always verify fixes before considering issue resolved."""
)
result = await debugging_agent.run(
"Users are experiencing 500 errors on the checkout page"
)
Best Practices
1. Write Clear Instructions
Provide specific, actionable guidance:
# ✓ Good - Specific process
agent = Agent(
name="analyst",
instructions="""Analyze data systematically:
1. Identify data patterns and anomalies
2. Calculate key statistics
3. Generate visualizations
4. Provide actionable insights"""
)
# ✗ Bad - Too vague
agent = Agent(
name="helper",
instructions="Help with analysis"
)
2. Use Sessions for Coordination
Share context across agents:
# Create shared session
session = Session(id="project-workflow-123", user_id="user-456")
# Set shared context
session.set_state("project_name", "ai-safety-research")
session.set_state("deadline", "2024-12-31")
# All agents access shared context
agent1 = Agent(name="agent1", session=session, ...)
agent2 = Agent(name="agent2", session=session, ...)
3. Leverage Memory
Use memory for persistent knowledge:
# Store user preferences
await memory.store("user_expertise", "Expert in React and TypeScript")
await memory.store("coding_style", "Prefers functional programming")
# Agent recalls automatically
agent = Agent(
name="assistant",
model=lm,
tools=[code_gen_tool],
memory=memory
)
result = await agent.run("Help me build a component")
# Agent uses stored preferences
4. Limit Iterations
Prevent infinite loops:
agent = Agent(
name="bounded_agent",
model=lm,
max_iterations=5, # Stop after 5 reasoning loops
instructions="Solve problems efficiently."
)
Agent Architecture
Agents orchestrate AGNT5 primitives:
- LLM Core - Language model for reasoning
- Tool Execution - Tools built on Function primitive
- State Management - Sessions use Entity for state
- Long-Term Storage - Memory uses Entity for persistence
- Orchestration - Workflow patterns for multi-step tasks
- Streaming - Real-time event emission
Agent
├── LanguageModel (reasoning)
├── Tools (actions via Function)
├── Session (context via Entity)
├── Memory (knowledge via Entity)
└── Planner (orchestration)
Comparison with Primitives
| Aspect | Function | Workflow | Agent |
|---|---|---|---|
| Autonomy | None | Scripted | Autonomous |
| Decision Making | Pre-programmed | Control flow | LLM-driven |
| Tool Use | N/A | Explicit calls | Dynamic selection |
| Adaptability | Fixed | Fixed steps | Adaptive reasoning |
| Use Case | Single operation | Multi-step process | Complex tasks |
When to use Function:
- Single, deterministic operation
- No decision-making needed
When to use Workflow:
- Pre-defined multi-step process
- Explicit control flow
When to use Agent:
- Complex, open-ended tasks
- Requires reasoning and adaptation
- Dynamic tool selection needed
Next Steps
- Session - Agent conversation context
- Tool - Agent capabilities
- Memory - Agent long-term knowledge
- Workflows - Orchestration patterns
- Context API - Agent execution context
Sessions
Sessions are conversation containers built on the Entity primitive. They manage multi-turn interactions between users and AI agents, providing structured state management, message history, and audit trails.
Key Characteristics
- Built on Entity - Inherits durability and consistency from Entity
- Scoped State - Organize state with session/user/app/temp scopes
- Message History - Automatic conversation tracking with metadata
- Multi-Agent Ready - Share context across multiple agents
- Audit Trail - Complete history of interactions for compliance
- Flexible Retention - Configurable data retention policies
Basic Usage
Creating a Session
from agnt5 import Session
# Create session with user context
session = Session(
id="conv-2024-001",
app_name="research_assistant",
user_id="user-123",
metadata={"project": "ai-safety-research"}
)
Managing Messages
Scoped State Management
Session state uses four scopes for different persistence levels:
# Session scope - conversation-specific (default)
session.set_state("shopping_cart", ["item1", "item2"])
session.set_state("current_step", "checkout")
# User scope - persists across all user sessions
session.set_state("language", "English", scope="user")
session.set_state("timezone", "America/Los_Angeles", scope="user")
# App scope - application-wide global state
session.set_state("api_version", "v2", scope="app")
session.set_state("feature_flags", {"new_ui": True}, scope="app")
# Temp scope - temporary invocation-specific
session.set_state("processing_step", "validation", scope="temp")
Integration with Agents
from agnt5 import Agent, Session, LanguageModel
# Create session
session = Session(
id="support-ticket-456",
user_id="customer-789",
metadata={"ticket_type": "billing"}
)
# Create agent with session
lm = LanguageModel()
agent = Agent(
name="support_agent",
model=lm,
instructions="You are a helpful customer support agent.",
tools=[search_kb_tool, create_ticket_tool],
session=session
)
# Agent automatically uses session for context
result = await agent.run("I need help with my recent charge")
# Session maintains full conversation history
history = await session.history()
Common Patterns
Multi-Agent Coordination
Share context across multiple specialized agents:
# Create shared session
session = Session(
id="research-workflow-001",
user_id="researcher-123",
metadata={"project": "quantum-computing-review"}
)
# Set shared context
session.set_state("research_topic", "quantum error correction")
session.set_state("target_depth", "comprehensive")
# Multiple specialized agents work together
literature_agent = Agent(
name="literature_reviewer",
session=session,
tools=[paper_search]
)
code_agent = Agent(
name="code_analyzer",
session=session,
tools=[github_search]
)
synthesis_agent = Agent(
name="synthesizer",
session=session,
tools=[document_tool]
)
# Execute research pipeline
papers = await literature_agent.run("Find recent papers")
implementations = await code_agent.run("Find implementations")
report = await synthesis_agent.run("Synthesize findings")
# All agents see shared context and each other's work
full_history = await session.history()
Agent Handoff Pattern
Seamlessly transfer conversations between specialized agents:
session = Session(id="customer-inquiry-789", user_id="customer-456")
# Coordinator routes to appropriate specialist
coordinator = Agent(
name="router",
session=session,
tools=[classification_tool]
)
routing = await coordinator.run("How do I upgrade my subscription?")
if routing.category == "billing":
# Billing agent gets full conversation context
billing_agent = Agent(
name="billing_specialist",
session=session,
tools=[billing_tools]
)
result = await billing_agent.run("Continue from coordinator's analysis")
# billing_agent sees all previous messages
elif routing.category == "technical":
tech_agent = Agent(
name="tech_support",
session=session,
tools=[tech_tools]
)
result = await tech_agent.run("Handle technical inquiry")
Session State vs Memory
Session Export and Audit
# Create session with audit metadata
session = Session(
id="compliance-audit-001",
user_id="analyst-789",
metadata={
"regulation": "SOC2",
"audit_period": "Q4-2024",
"auditor": "external-firm"
},
retention={"ttl_days": 730} # 2 years
)
# Conduct conversation with full tracking
agent = Agent(name="data_analyst", session=session)
await agent.run("Analyze user access patterns")
# Export session for compliance review
jsonl_export = await session.export(format="jsonl")
# Each line contains: timestamp, role, message, metadata, tool_calls
# Query specific events
recent_events = await session.events(since="2024-01-01", limit=100)
# Prune old messages while keeping metadata
await session.prune(strategy="keep_last_50")
Long-Running Sessions with Pruning
# Create session with automatic pruning
session = Session(
id="long-conversation-456",
user_id="user-123",
metadata={"type": "ongoing_project"}
)
# After many interactions, prune intelligently
await session.prune(strategy="keep_important") # Uses LLM
await session.prune(strategy="sliding_window", window_size=100)
await session.prune(strategy="summarize_old", threshold=50)
# Session remains performant even with thousands of messages
Configuration
Session Parameters
| Parameter | Type | Description |
|---|---|---|
id | str | Unique session identifier |
app_name | str | None | Application name |
user_id | str | None | User identifier |
metadata | dict | None | Session metadata |
retention | dict | None | Retention policy configuration |
Retention Policies
Configure data retention:
Best Practices
1. Use Appropriate State Scopes
Match state scope to persistence requirements:
# ✓ Session scope - conversation-specific
session.set_state("current_page", 3)
session.set_state("draft_document", content)
# ✓ User scope - user preferences
session.set_state("theme", "dark", scope="user")
session.set_state("notification_preference", "email", scope="user")
# ✓ App scope - global configuration
session.set_state("rate_limit", 1000, scope="app")
session.set_state("feature_flags", flags, scope="app")
# ✓ Temp scope - transient data
session.set_state("validation_step", "in_progress", scope="temp")
2. Design for Multi-Agent Coordination
Structure session state for agent collaboration:
# Good - Clear coordination structure
session.set_state("workflow_stage", "research")
session.set_state("agent_outputs", {
"researcher": {"status": "completed", "findings": [...]},
"analyzer": {"status": "in_progress"},
"writer": {"status": "pending"}
})
# Agents can check dependencies
current_stage = session.get_state("workflow_stage")
researcher_output = session.get_state("agent_outputs")["researcher"]
3. Implement Retention Strategies
Manage session lifecycle appropriately:
# For regulated industries
session = Session(
id="medical-consultation",
retention={
"ttl_days": 2555, # Legal requirement
"immutable": True
}
)
# For ephemeral conversations
session = Session(
id="temp-chat",
retention={
"ttl_days": 1, # Delete after 1 day
"auto_prune": True
}
)
Entity vs Session
| Aspect | Entity | Session |
|---|---|---|
| Purpose | General stateful primitive | Conversation-specific |
| State Structure | Flexible key-value | Opinionated message + state |
| API | Low-level (get/set/delete) | High-level (send_message/history) |
| Scoping | Manual | Built-in (session/user/app/temp) |
| Audit | Manual event tracking | Automatic conversation log |
| Use Case | Custom stateful components | AI agent conversations |
When to use Entity:
- Building custom stateful patterns
- Need complete control over state structure
- Non-conversation workloads
When to use Session:
- AI agent conversations
- Multi-agent coordination needed
- Audit trails required
- Standard conversation patterns
Next Steps
- Entity - Underlying primitive for Session
- Agent - Agents use Sessions for context
- Memory - Long-term storage vs Session state
- Context API - Session context operations
Tools
Tools are callable capabilities that extend what agents can do. Tools provide structured interfaces to functions, APIs, services, and other agents, with automatic schema extraction from Python code.
Key Characteristics
- Automatic Schema - Extract input/output schemas from docstrings and type hints
- Multiple Types - Function, Hosted, MCP, OpenAPI, and Agent tools
- Built on Function - Inherits durability and retry logic
- Confirmation Policies - Optional user approval for dangerous operations
- Rich Metadata - Descriptions, examples, and parameter constraints
Basic Usage
Function Tools with Auto-Schema
The simplest way to create tools is with the @tool() decorator:
from agnt5 import tool
@tool(auto_schema=True)
def search_web(query: str, max_results: int = 10) -> List[Dict[str, str]]:
"""Search the web for information.
Args:
query: The search query string
max_results: Maximum number of results to return
Returns:
List of search results with title, url, and snippet
"""
# Implementation
return search_results
Schema automatically extracted:
{
"name": "search_web",
"description": "Search the web for information.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query string"},
"max_results": {"type": "integer", "default": 10}
},
"required": ["query"]
}
}
Using Tools with Agents
from agnt5 import Agent, tool, LanguageModel
@tool(auto_schema=True)
def calculate_area(length: float, width: float) -> float:
"""Calculate the area of a rectangle.
Args:
length: Length in meters
width: Width in meters
Returns:
Area in square meters
"""
return length * width
lm = LanguageModel()
agent = Agent(
name="math_assistant",
model=lm,
tools=[calculate_area],
instructions="Help users with geometry calculations."
)
result = await agent.run("What's the area of a 5m by 3m room?")
# Agent automatically calls calculate_area(5.0, 3.0)
Tool Types
Function Tools
Direct Python function execution:
Hosted Tools
Tools deployed as durable AGNT5 workers:
from agnt5 import worker
from agnt5.tools import HostedTool
# Define worker function
@worker.handler
def analyze_data(data: Dict) -> Dict:
"""Worker function for complex data analysis."""
# Heavy computation here
return analysis_results
# Create hosted tool
analysis_tool = HostedTool(
name="analyze_data",
description="Perform complex data analysis",
endpoint="agnt5://data-analysis-service/analyze_data"
)
# Use with agent
agent = Agent(name="analyst", tools=[analysis_tool])
MCP Tools
Integrate with Model Context Protocol servers:
from agnt5.tools import MCPTool
# Connect to MCP server
filesystem_tool = MCPTool(
name="filesystem",
mcp_server_url="http://localhost:3000/mcp",
capabilities=["read_file", "write_file", "list_directory"]
)
agent = Agent(name="file_assistant", tools=[filesystem_tool])
OpenAPI Tools
Generate tools from OpenAPI specifications:
from agnt5.tools import OpenAPITool
# Create tools from OpenAPI spec
github_tools = OpenAPITool.from_spec(
spec_url="https://api.github.com/openapi.json",
operations=["get_repo", "list_issues", "create_issue"]
)
agent = Agent(name="github_bot", tools=github_tools)
Tool Configuration
Manual Schema Definition
For more control, define schemas explicitly:
from agnt5 import Tool
search_tool = Tool(
name="search",
description="Search for information",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string", "minLength": 1},
"filters": {"type": "object"}
},
"required": ["query"]
},
handler=search_function
)
Confirmation for Dangerous Operations
Require user approval for destructive actions:
@tool(auto_schema=True, confirmation=True)
def delete_database(database_name: str) -> Dict[str, str]:
"""Delete a database permanently.
Args:
database_name: Name of the database to delete
Returns:
Status of deletion operation
Warning:
This operation is irreversible and will delete all data.
"""
# Requires human approval before execution
pass
# Agent proposes deletion but waits for approval
agent = Agent(name="admin", tools=[delete_database])
result = await agent.run("Clean up the test database")
# User receives confirmation prompt before tool executes
Common Patterns
Tool Composition
Combine multiple tools for complex capabilities:
@tool(auto_schema=True)
def search_papers(query: str, year_from: int = 2020) -> List[Dict]:
"""Search academic papers."""
pass
@tool(auto_schema=True)
def download_pdf(url: str) -> bytes:
"""Download PDF document."""
pass
@tool(auto_schema=True)
def extract_text(pdf_data: bytes) -> str:
"""Extract text from PDF."""
pass
# Agent orchestrates multiple tools
research_agent = Agent(
name="researcher",
tools=[search_papers, download_pdf, extract_text],
instructions="Search papers, download them, and extract key findings."
)
result = await research_agent.run("Survey recent work on transformers")
# Agent chains: search_papers → download_pdf → extract_text
Tool Error Handling
Tools with robust error handling:
@tool(auto_schema=True)
def fetch_stock_price(symbol: str) -> Dict[str, Any]:
"""Fetch current stock price.
Args:
symbol: Stock ticker symbol (e.g., 'AAPL', 'GOOGL')
Returns:
Stock price data
Raises:
ValueError: If symbol is invalid
ConnectionError: If market data service is unavailable
"""
try:
price_data = market_api.get_price(symbol)
return {
"symbol": symbol,
"price": price_data.current,
"change": price_data.change
}
except InvalidSymbolError:
raise ValueError(f"Invalid stock symbol: {symbol}")
except MarketAPIError as e:
raise ConnectionError(f"Market data unavailable: {e}")
# Agent handles tool errors gracefully
agent = Agent(name="stock_advisor", tools=[fetch_stock_price])
Dynamic Tool Registration
Register tools at runtime based on context:
# Base toolset
base_tools = [search_tool, calculate_tool]
# Add specialized tools based on user role
if user.role == "admin":
admin_tools = [delete_user_tool, modify_permissions_tool]
all_tools = base_tools + admin_tools
else:
all_tools = base_tools
agent = Agent(
name="assistant",
tools=all_tools,
instructions=f"You are assisting a {user.role}."
)
Tool with Context Access
Tools can access execution context for advanced operations:
from agnt5 import tool, Context
@tool(auto_schema=True)
async def store_memory(ctx: Context, key: str, value: str) -> Dict[str, str]:
"""Store information in long-term memory.
Args:
ctx: Execution context (automatically provided)
key: Memory key
value: Content to store
Returns:
Confirmation of storage
"""
# Access context for durable storage
await ctx.memory.set(key, value)
return {
"status": "stored",
"key": key,
"timestamp": ctx.now()
}
# Context is automatically injected when tool is called
agent = Agent(name="memory_agent", tools=[store_memory])
Best Practices
1. Write Clear Tool Descriptions
Good descriptions help agents use tools correctly:
2. Use Type Hints and Docstrings
Enable automatic schema extraction:
from typing import List, Dict, Optional
@tool(auto_schema=True)
def analyze_sentiment(
text: str,
language: str = "en",
return_scores: bool = False
) -> Dict[str, Any]:
"""Analyze sentiment of text.
Args:
text: Text to analyze (minimum 10 characters)
language: ISO language code (en, es, fr, de)
return_scores: Include detailed confidence scores
Returns:
Sentiment analysis with label (positive/negative/neutral)
and optional confidence scores
"""
# Type hints + docstring = complete schema
pass
3. Implement Confirmation for Dangerous Operations
Protect users from destructive actions:
# Dangerous operations should require confirmation
@tool(auto_schema=True, confirmation=True)
def execute_code(code: str, language: str = "python") -> Dict[str, str]:
"""Execute arbitrary code in a sandboxed environment.
Warning:
Code execution can be dangerous. Requires explicit user approval.
"""
pass
@tool(auto_schema=True, confirmation=True)
def send_email_blast(recipients: List[str], subject: str, body: str) -> Dict:
"""Send email to multiple recipients.
Warning:
Bulk email requires confirmation to prevent spam.
"""
pass
Function vs Tool
| Aspect | Function | Tool |
|---|---|---|
| Purpose | General computation | Agent capability |
| Schema | Optional | Required (auto-generated) |
| Discovery | Manual invocation | Agent-driven selection |
| Metadata | Basic | Rich (description, examples) |
| Use Case | Backend logic | Agent actions |
When to use Function:
- Backend processing
- Internal system operations
- Not exposed to agents
When to use Tool:
- Agent capabilities
- External system integration
- User-facing operations
Next Steps
- Functions - Underlying primitive for tools
- Agent - Agents use tools for actions
- Context API - Tool context operations
- Worker - Hosted tool deployment
Memory
Memory is a long-term knowledge storage system that enables agents to remember facts, preferences, and context across conversations. Unlike Session state (short-term), Memory provides persistent, searchable knowledge that agents build upon over time.
Key Characteristics
- Long-Term Persistence - Knowledge survives across sessions and conversations
- Semantic Search - Find relevant memories using natural language queries
- Smart Ingestion - Automatically extract important facts using LLMs
- Multiple Backends - InMemory (dev), Vector (semantic), Database (persistent)
- Built on Entity - Inherits durability and consistency
- Cross-Session - Shared knowledge accessible to all agents
Basic Usage
Creating Memory
Storing and Retrieving Memories
# Store individual memories
await memory.store(
key="user_role",
content="Senior Software Engineer specializing in distributed systems",
type="user_info",
confidence=0.95
)
await memory.store(
key="project_context",
content="Building a real-time analytics platform for financial data",
type="project_info"
)
# Retrieve specific memories
memories = await memory.recall(["user_role", "project_context"])
for mem in memories:
print(f"{mem.key}: {mem.content}")
Semantic Search
# Search using natural language
results = await memory.search(
query="What does the user know about databases?",
limit=5
)
for result in results:
print(f"Score: {result.score:.2f}")
print(f"Content: {result.content}")
print(f"Source: {result.metadata.get('source_session')}")
Integration with Agents
from agnt5 import Agent, Session, Memory, LanguageModel
# Create memory
memory = Memory(service=VectorMemoryService())
# Store long-term knowledge
await memory.store("user_expertise", "PhD in Machine Learning, specializes in NLP")
await memory.store("preferred_tools", "Prefers PyTorch over TensorFlow")
# First conversation
session1 = Session(id="conv-001", user_id="researcher-123")
lm = LanguageModel()
agent = Agent(name="assistant", model=lm, memory=memory, session=session1)
await agent.run("Help me implement attention mechanisms")
# Agent recalls user's ML expertise and PyTorch preference
# Later conversation (different session)
session2 = Session(id="conv-042", user_id="researcher-123")
agent2 = Agent(name="assistant", model=lm, memory=memory, session=session2)
await agent2.run("Review my transformer code")
# Agent still remembers user's background and preferences
Smart Ingestion
Automatically extract and store important information from conversations:
# Agent conversation
session = Session(id="consultation-123", user_id="user-456")
agent = Agent(name="advisor", memory=memory, session=session)
await agent.run("I'm building a recommendation system for e-commerce")
await agent.run("We have 10 million users and need sub-100ms latency")
await agent.run("Our team is experienced with Python and Go")
# Extract and store important facts
memory_keys = await memory.ingest_from_session(
session,
strategy="smart" # Uses LLM to identify important facts
)
# Memory now contains:
# - "User building recommendation system for e-commerce"
# - "System requirements: 10M users, <100ms latency"
# - "Team expertise: Python, Go"
# Future conversations automatically recall these facts
Ingestion Strategies
Choose the right strategy for your use case:
Common Patterns
User Profile Memory
Build comprehensive user profiles over time:
class UserProfileMemory:
def __init__(self, user_id: str, memory: Memory):
self.user_id = user_id
self.memory = memory
self.prefix = f"user_{user_id}_"
async def store_preference(self, category: str, value: str):
"""Store user preference."""
await self.memory.store(
key=f"{self.prefix}pref_{category}",
content=value,
type="preference",
user_id=self.user_id
)
async def store_expertise(self, domain: str, level: str, details: str):
"""Store user expertise."""
await self.memory.store(
key=f"{self.prefix}expertise_{domain}",
content=f"{level} expertise in {domain}: {details}",
type="expertise",
user_id=self.user_id
)
async def get_profile(self) -> Dict[str, Any]:
"""Retrieve complete user profile."""
results = await self.memory.search(
query=f"user {self.user_id} profile preferences expertise",
limit=50
)
profile = {
"preferences": {},
"expertise": {},
"context": []
}
for result in results:
if result.metadata.get("type") == "preference":
category = result.key.replace(f"{self.prefix}pref_", "")
profile["preferences"][category] = result.content
elif result.metadata.get("type") == "expertise":
domain = result.key.replace(f"{self.prefix}expertise_", "")
profile["expertise"][domain] = result.content
return profile
# Usage
user_memory = UserProfileMemory("user-123", memory)
await user_memory.store_preference("language", "Python")
await user_memory.store_expertise("ml", "advanced", "10+ years in NLP")
profile = await user_memory.get_profile()
Conversation Summarization
Archive long sessions as summaries:
async def archive_session_to_memory(
session: Session,
memory: Memory,
summary_threshold: int = 50
):
"""Archive long sessions as summaries in memory."""
history = await session.history()
if len(history) > summary_threshold:
# Generate comprehensive summary
from agnt5 import LanguageModel
lm = LanguageModel()
conversation = "\n".join([
f"{msg.role}: {msg.content}" for msg in history
])
summary = await lm.generate(
prompt=f"""Summarize this conversation in 2-3 paragraphs.
Focus on key decisions, important facts, and outcomes.
Conversation:
{conversation}
Summary:""",
max_tokens=300
)
# Store summary in memory
await memory.store(
key=f"session_summary_{session.id}",
content=summary.text,
type="session_summary",
session_id=session.id,
user_id=session.user_id,
message_count=len(history)
)
# Optionally prune session to save space
await session.prune(strategy="keep_summary")
# Usage
await archive_session_to_memory(session, memory, summary_threshold=100)
Learning Agent
Agent that learns from every interaction:
class LearningAgent:
def __init__(self, name: str, model, tools, memory: Memory):
self.agent = Agent(name=name, model=model, tools=tools, memory=memory)
self.memory = memory
async def run_and_learn(self, prompt: str, session: Session):
"""Execute task and learn from interaction."""
# Run agent
result = await self.agent.run(prompt, session=session)
# Extract and store learnings
memory_keys = await self.memory.ingest_from_session(
session,
strategy="smart"
)
# Store outcome for future reference
await self.memory.store(
key=f"interaction_{session.id}",
content=f"Task: {prompt}\nOutcome: {result.output}",
type="interaction_history",
success=result.status == "completed"
)
return result
async def recall_similar_tasks(self, prompt: str) -> List[Dict]:
"""Find similar past interactions."""
return await self.memory.search(
query=f"Similar to: {prompt}",
limit=5
)
# Usage
learning_agent = LearningAgent("assistant", lm, tools, memory)
# Agent learns from each interaction
result = await learning_agent.run_and_learn(
"Debug this performance issue",
session
)
# Later: Recall similar past work
similar = await learning_agent.recall_similar_tasks(
"Another performance problem"
)
Best Practices
1. Distinguish Memory from Session State
Use Memory for long-term, Session for short-term:
2. Add Metadata for Better Retrieval
Enrich memories with metadata:
await memory.store(
key="technical_decision_001",
content="Chose PostgreSQL for transactional data, Redis for caching",
type="decision",
category="architecture",
confidence=0.9,
source_session="planning-session-789",
timestamp=datetime.now(),
decision_maker="tech-lead-123",
rationale="Need ACID guarantees and high read performance"
)
# Later: Search by metadata
postgres_decisions = await memory.search(
query="database decisions",
filter={"type": "decision", "category": "architecture"}
)
3. Implement Memory Maintenance
Manage memory lifecycle:
async def maintain_memory(memory: Memory, user_id: str):
"""Prune old or low-confidence memories."""
# Remove outdated information
await memory.forget([
key for key in await memory.list_keys(user_id=user_id)
if is_outdated(key)
])
# Update confidence scores based on usage
for key in await memory.list_keys(user_id=user_id):
mem = await memory.recall([key])
if mem[0].metadata.get("last_accessed"):
days_since_access = calculate_days_since(
mem[0].metadata["last_accessed"]
)
new_confidence = calculate_confidence_decay(
mem[0].metadata.get("confidence", 1.0),
days_since_access
)
await memory.update(key, confidence=new_confidence)
Session State vs Memory
| Aspect | Session State | Memory |
|---|---|---|
| Lifetime | Single conversation | Indefinite |
| Scope | Session-specific | Cross-session |
| Search | Direct key access | Semantic search |
| Purpose | Current context | Long-term knowledge |
| Storage | Entity state | Entity + Vector DB |
| Example | Shopping cart | User preferences |
When to use Session State:
- Current conversation context
- Temporary workflow state
- UI state and navigation
When to use Memory:
- User profile and preferences
- Historical interactions
- Domain knowledge
- Learned facts and insights
Next Steps
- Session - Short-term conversation state
- Agent - Agents use Memory for context
- Entity - Underlying primitive for Memory
- Context API - Memory context operations
Worker Runtime
The Worker class is the high-level runtime that integrates with the AGNT5 platform, automatically registers decorated components, and handles execution coordination.
Worker Configuration
Basic Worker
import asyncio
from agnt5 import Worker, function
@function()
def hello(name: str) -> str:
return f"Hello, {name}!"
async def main():
worker = Worker(service_name="hello-service")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Configuration Parameters
worker = Worker(
service_name="my-service", # Required: Service identifier
service_version="1.2.0", # Version string (default: "1.0.0")
coordinator_endpoint="http://localhost:9091", # Worker coordinator URL
runtime="standalone" # Runtime mode: "standalone" or "asgi"
)
| Parameter | Type | Description | Default |
|---|---|---|---|
service_name | str | Service identifier for registration | Required |
service_version | str | Version string for this service | "1.0.0" |
coordinator_endpoint | str | Worker coordinator URL | "http://localhost:9091" |
runtime | str | Runtime adapter: "standalone" or "asgi" | "standalone" |
Runtime Modes
Standalone Runtime
For background workers, batch processing, and daemon processes:
import asyncio
from agnt5 import Worker, function
@function()
def background_task(data: dict) -> dict:
# Process data in background
return {"processed": True, "result": data}
async def main():
# Standalone worker blocks until stopped
worker = Worker(
service_name="background-processor",
runtime="standalone"
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Characteristics:
- Blocks until manually stopped (Ctrl+C)
- Connects to worker coordinator
- Ideal for background processing
- Built-in signal handling
- OpenTelemetry integration
ASGI Runtime
For web applications and HTTP endpoints:
from agnt5 import Worker, function
@function()
def web_handler(request: dict) -> dict:
return {"message": "Hello from AGNT5!", "data": request}
# Create ASGI application
app = Worker(
service_name="web-service",
runtime="asgi"
)
# Enable CORS for browser access
app.enable_cors()
Run with any ASGI server:
# Install ASGI server
pip install uvicorn
# Run the application
uvicorn main:app --reload --port 8000
ASGI Endpoints:
| Endpoint | Method | Description |
|---|---|---|
/health | GET | Health check endpoint |
/functions | GET | List registered functions |
/invoke/{handler} | POST | Invoke specific function |
Environment Configuration
Environment Variables
Configure workers using environment variables:
# Service configuration
export AGNT5_SERVICE_NAME=my-service
export AGNT5_SERVICE_VERSION=2.0.0
export AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com
# Logging configuration
export AGNT5_LOG_LEVEL=INFO
export AGNT5_LOG_FORMAT=json
# Runtime configuration
export AGNT5_RUNTIME=standalone
export AGNT5_DISABLE_TELEMETRY=false
Configuration Priority
Configuration sources in order of precedence:
- Constructor parameters - Highest priority
- Environment variables - Medium priority
- Default values - Lowest priority
# This worker uses constructor values over environment
worker = Worker(
service_name="explicit-service", # Overrides AGNT5_SERVICE_NAME
coordinator_endpoint="http://localhost:9091"
)
Worker Lifecycle
Initialization
async def main():
worker = Worker("my-service")
# Worker validates Rust extension availability
# Creates runtime adapter (standalone or ASGI)
# Installs OpenTelemetry logging
# Registers all decorated functions and workflows
await worker.run() # Starts the worker loop
Registration Process
- Function Discovery: Scans for
@functiondecorated callables - Workflow Discovery: Scans for
@workflowdecorated factories - Component Registration: Sends metadata to coordinator
- Service Announcement: Service becomes available for invocations
Graceful Shutdown
import signal
import asyncio
from agnt5 import Worker
class GracefulWorker:
def __init__(self):
self.worker = Worker("graceful-service")
self.shutdown_requested = False
async def run(self):
# Register signal handlers
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
try:
await self.worker.run()
except KeyboardInterrupt:
print("Shutdown requested via keyboard interrupt")
finally:
await self._cleanup()
def _signal_handler(self, signum, frame):
print(f"Received signal {signum}, initiating graceful shutdown...")
self.shutdown_requested = True
async def _cleanup(self):
print("Cleaning up resources...")
# Perform cleanup tasks
# Close database connections
# Finish in-flight requests
# Remove OpenTelemetry handlers
print("Cleanup complete")
async def main():
graceful_worker = GracefulWorker()
await graceful_worker.run()
if __name__ == "__main__":
asyncio.run(main())
ASGI Integration
Basic ASGI App
from agnt5 import Worker, function
@function()
def api_endpoint(data: dict) -> dict:
return {"status": "success", "received": data}
# Create ASGI app
app = Worker("api-service", runtime="asgi")
CORS Configuration
Enable CORS for browser access:
# Enable CORS with defaults (allows all origins)
app.enable_cors()
# Enable CORS with specific origins
app.enable_cors(origins=["https://myapp.com", "https://localhost:3000"])
# Disable CORS
app.disable_cors()
Custom Middleware
Add ASGI middleware:
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware
app = Worker("api-service", runtime="asgi")
# Add middleware (if using Starlette/FastAPI patterns)
# Note: This is conceptual - actual middleware integration depends on ASGI runtime implementation
Error Handling
ASGI runtime provides consistent error responses:
# Function that raises an exception
@function()
def failing_function(data: dict) -> dict:
raise ValueError("Something went wrong")
# ASGI runtime catches and formats the error:
# {
# "error": "Function failing_function failed: Something went wrong",
# "status": 500
# }
Worker Methods
Runtime Control
worker = Worker("my-service")
# Check if worker is running
if worker.is_running():
print("Worker is active")
# For ASGI workers only - get ASGI callable
if worker.runtime == "asgi":
asgi_app = worker.__call__ # ASGI callable interface
Component Registration
# Manual component registration (usually automatic)
worker._register_components()
# Internal message handling (not part of public API)
# worker._handle_message(request)
Observability
OpenTelemetry Integration
Workers automatically install OpenTelemetry logging:
import logging
from agnt5 import Worker
from agnt5.logging import install_opentelemetry_logging, remove_opentelemetry_logging
# Custom logging setup
logger = logging.getLogger("my-service")
# Install telemetry with custom formatter
install_opentelemetry_logging(
logger=logger,
level=logging.DEBUG,
format_string="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
worker = Worker("my-service")
# Telemetry is automatically cleaned up on worker shutdown
Structured Logging
import logging
from agnt5 import Worker, function
logger = logging.getLogger(__name__)
@function()
def logged_function(ctx, data: dict) -> dict:
# Structured logging with context
logger.info(
"Processing function",
extra={
"invocation_id": ctx.invocation_id,
"service_name": ctx.metadata.get("service_name"),
"data_size": len(str(data))
}
)
result = {"processed": True}
logger.info(
"Function completed",
extra={
"invocation_id": ctx.invocation_id,
"success": True
}
)
return result
Development Patterns
Hot Reload Development
import os
import sys
from agnt5 import Worker, function
# Development configuration
if os.getenv("ENVIRONMENT") == "development":
import logging
logging.basicConfig(level=logging.DEBUG)
@function()
def development_handler(data: dict) -> dict:
return {"env": "development", "data": data}
async def main():
worker = Worker(
service_name="dev-service",
coordinator_endpoint=os.getenv("COORDINATOR_URL", "http://localhost:9091")
)
try:
await worker.run()
except KeyboardInterrupt:
print("\nDevelopment worker stopped")
sys.exit(0)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Testing Workers
import pytest
from unittest.mock import AsyncMock, patch
from agnt5 import Worker, function
@function()
def test_function(data: str) -> str:
return data.upper()
@pytest.fixture
async def mock_worker():
with patch('agnt5.worker.PyWorker'):
worker = Worker("test-service")
yield worker
@pytest.mark.asyncio
async def test_worker_registration(mock_worker):
# Test component registration
mock_worker._register_components()
# Verify functions are registered
from agnt5.decorators import get_registered_functions
functions = get_registered_functions()
assert "test_function" in functions
@pytest.mark.asyncio
async def test_worker_asgi_mode():
app = Worker("test-service", runtime="asgi")
assert callable(app) # ASGI callable interface
Production Deployment
Container Deployment
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run worker
CMD ["python", "worker.py"]
Environment Configuration
# Production environment variables
AGNT5_SERVICE_NAME=production-service
AGNT5_SERVICE_VERSION=1.2.0
AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com
AGNT5_LOG_LEVEL=INFO
AGNT5_LOG_FORMAT=json
Health Monitoring
import asyncio
import logging
from agnt5 import Worker, function
logger = logging.getLogger(__name__)
@function()
def health_check() -> dict:
"""Health check endpoint."""
return {
"status": "healthy",
"service": "production-service",
"timestamp": time.time()
}
async def main():
worker = Worker(
service_name="production-service",
service_version=os.getenv("SERVICE_VERSION", "1.0.0")
)
logger.info("Starting production worker")
try:
await worker.run()
except Exception as e:
logger.error(f"Worker failed: {e}")
raise
finally:
logger.info("Worker shutdown complete")
if __name__ == "__main__":
asyncio.run(main())
Best Practices
Service Design
- Service Naming - Use consistent, descriptive service names
- Version Management - Use semantic versioning for service versions
- Resource Management - Clean up resources in shutdown handlers
- Error Handling - Handle exceptions gracefully in workers
- Health Checks - Implement health check functions for monitoring
Performance
- Connection Pooling - Reuse database and HTTP connections
- Async Operations - Use async functions for I/O operations
- Resource Limits - Configure appropriate memory and CPU limits
- Scaling - Deploy multiple worker instances for high throughput
- Monitoring - Track worker performance and error rates
Security
- Input Validation - Validate all function inputs
- Error Messages - Don’t expose sensitive information in errors
- Authentication - Use proper authentication for coordinator connections
- Network Security - Use secure connections (HTTPS/TLS) in production
- Secrets Management - Use environment variables for sensitive configuration
Next Steps
Core Primitives
- Functions - Stateless operations with retries
- Entities - Stateful components
- Workflows - Multi-step orchestration
- Context API - Full API reference
Agent Development Kit
- Agents - Autonomous LLM-driven systems
- Tools - Extend agent capabilities
- Sessions - Conversation management
- Memory - Long-term knowledge storage
Resources
- Examples - Worker deployment examples