Context API
Execution context with APIs for orchestration, state, AI, and observability
The Context (ctx) is the execution environment provided to all AGNT5 components. It provides APIs for orchestration, state management, LLM interactions, coordination, and observability.
Core Capabilities
- Orchestration - Execute tasks, spawn functions, parallel execution
- State Management - Get/set/delete state for entities
- Coordination - Signals, timers, human approvals
- AI Integration - LLM calls, tool registration
- Observability - Logging, metrics, tracing
Orchestration APIs
Task Execution
Execute functions and wait for results (workflows only):
@workflow
async def process_workflow(ctx):
# Execute a task
result = await ctx.task(
service_name="analytics",
handler_name="process_data",
input={"dataset": "users"}
)
return resultParallel Execution
Run multiple tasks concurrently:
Async Invocation
Spawn child functions without waiting:
@function
async def batch_processor(ctx, items: list):
# Spawn child invocations
handles = []
for item in items:
handle = ctx.spawn(process_item, item, key=f"item-{item['id']}")
handles.append(handle)
# Continue other work...
# Wait for results later if needed
results = [await h.result() for h in handles]
return {"processed": len(results)}Checkpointing
Checkpoint expensive operations (functions only):
@function
async def process_pipeline(ctx, data_id: str):
# Each step is checkpointed
raw = await ctx.step("extract", lambda: extract_data(data_id))
cleaned = await ctx.step("clean", lambda: clean_data(raw))
result = await ctx.step("analyze", lambda: analyze(cleaned))
# If crash occurs, resumes from last completed step
return resultState Management (Entities)
Get, Set, Delete
Manage entity state:
@entity.write
async def update_profile(ctx, name: str, age: int):
# Get with default
profile = await ctx.get("profile", {})
# Update profile
profile.update({"name": name, "age": age})
# Set state
ctx.set("profile", profile)
ctx.set("last_updated", datetime.now().isoformat())
# Delete temporary data
ctx.delete("temp_cache")
return {"status": "updated"}Entity Method Calls
Call entity methods from functions:
@function
async def chat(ctx, conversation_id: str, message: str):
# Call entity method
response = await ctx.entity(
"ChatAgent",
conversation_id
).send_message(message)
return responseCoordination APIs
Signals
Wait for external events:
Timers & Sleep
Add delays and scheduled execution:
@workflow
async def scheduled_job(ctx):
# Wait 5 seconds
await ctx.timer(delay_ms=5000)
# Or use sleep (alternative syntax)
await ctx.sleep(30) # 30 seconds
# Wait until specific time (cron)
await ctx.timer(cron="0 0 * * *") # Daily at midnight
return {"status": "completed"}Human-in-the-Loop
Request human approval:
@workflow
async def deployment_workflow(ctx, version: str):
# Run tests
test_results = await ctx.task("ci", "run_tests", input={"version": version})
if test_results["passed"]:
# Request human approval for production
approval = await ctx.human.approval(
"deploy_production",
payload={"version": version, "tests": test_results},
timeout=timedelta(minutes=30),
required_roles=["admin", "devops"]
)
if approval.decision == "approved":
await ctx.task("deploy", "to_production", input={"version": version})
return {"status": "deployed"}
return {"status": "cancelled"}AI Integration
LLM Generation
Generate text or structured responses:
Streaming
Stream responses for real-time output:
@function
async def stream_story(ctx, topic: str):
# Stream text generation
async for chunk in await ctx.llm.stream(
prompt=f"Write a story about {topic}",
model="gpt-4o"
):
if chunk.text:
yield chunk.text # Stream to clientTool Registration
Register tools for LLM use:
@function
async def agent_with_tools(ctx, query: str):
# Register search tool
search_tool = ctx.tools.register(
"web_search",
handler=perform_search,
description="Search the web for information",
schema={
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer"}
}
}
)
# Generate with tool
response = await ctx.llm.generate(
prompt=query,
tools=[search_tool],
model="gpt-4o"
)
return responseObservability
Logging
Structured logging with context:
@function
async def tracked_operation(ctx, data: dict):
logger = ctx.log()
logger.info("Processing started", extra={"data_size": len(data)})
try:
result = process(data)
logger.info("Processing completed", extra={"result_size": len(result)})
return result
except Exception as e:
logger.error("Processing failed", exc_info=True)
raiseMetrics
Record custom metrics:
@function
async def monitored_function(ctx, request: dict):
metrics = ctx.metrics()
# Increment counter
metrics.increment("requests.count", service="api")
# Record timing
start = time.time()
result = await process_request(request)
duration = (time.time() - start) * 1000
metrics.observe("latency.ms", duration, endpoint="/api/process")
return resultDistributed Tracing
Create spans for tracing:
@function
async def traced_operation(ctx, data: dict):
# Create span for external API call
with ctx.trace_span().start("external_api_call", service="payments"):
result = await call_payment_api(data)
# Create span for database operation
with ctx.trace_span().start("database_query", service="postgres"):
await save_to_db(result)
return resultConfiguration & Secrets
Secrets
Access secrets securely:
@function
async def api_call(ctx, endpoint: str):
# Get API key from secrets
api_key = ctx.secrets().get("openai_api_key")
db_password = ctx.secrets().get("database_password")
# Use secrets in API calls
response = await make_request(endpoint, api_key=api_key)
return responseConfiguration
Feature flags and config:
@function
async def feature_gated_handler(ctx, data: dict):
config = ctx.config()
# Check feature flag
if config.get("new_feature_enabled", default=False):
return await new_implementation(data)
else:
return await legacy_implementation(data)
# A/B testing variant
variant = config.variant("experiment_group", default="control")
if variant == "treatment":
return await experimental_flow(data)Request Headers
Access incoming headers:
@function
async def header_aware(ctx, data: dict):
headers = ctx.headers()
user_agent = headers.get("user-agent", "unknown")
correlation_id = headers.get("x-correlation-id")
logger = ctx.log()
logger.info(f"Request from {user_agent}", extra={"correlation_id": correlation_id})
return {"processed": True}Context Properties
Access execution metadata:
@function
async def introspective(ctx, data: dict):
return {
"run_id": ctx.run_id, # Workflow/run identifier
"step_id": ctx.step_id, # Current step identifier
"attempt": ctx.attempt, # Retry attempt number
"component_type": ctx.component_type, # "function", "entity", "workflow"
"object_id": ctx.object_id, # Entity key (for entities)
"method_name": ctx.method_name, # Entity method name (for entities)
"processed": data
}API Reference
Orchestration
| API | Description |
|---|---|
ctx.task(service, handler, input) | Execute function (workflows only) |
ctx.parallel(*tasks) | Run tasks in parallel |
ctx.gather(**tasks) | Parallel with named results |
ctx.spawn(fn, *args, key) | Async child invocation |
ctx.step(name, fn) | Checkpoint operation (functions) |
State (Entities)
| API | Description |
|---|---|
await ctx.get(key, default) | Get state value |
ctx.set(key, value) | Set state value |
ctx.delete(key) | Delete state key |
await ctx.entity(type, key).method() | Call entity method |
Coordination
| API | Description |
|---|---|
await ctx.signal(name, timeout_ms, default) | Wait for signal |
await ctx.signal.emit(name, payload) | Send signal |
await ctx.timer(delay_ms) | Wait with delay |
await ctx.timer(cron) | Wait until cron time |
await ctx.sleep(seconds) | Durable sleep |
await ctx.human.approval(...) | Request approval |
AI Integration
| API | Description |
|---|---|
await ctx.llm.generate(prompt, model) | Generate text/JSON |
await ctx.llm.stream(prompt, model) | Stream generation |
ctx.tools.register(name, handler, schema) | Register tool |
Observability
| API | Description |
|---|---|
ctx.log() | Get logger |
ctx.metrics() | Get metrics recorder |
ctx.trace_span().start(name, service) | Create trace span |
Configuration
| API | Description |
|---|---|
ctx.secrets().get(key) | Get secret |
ctx.config().get(key, default) | Get config value |
ctx.config().variant(key, default) | Get A/B variant |
ctx.headers() | Get request headers |
Common Patterns
Parallel with Error Handling
@workflow
async def robust_workflow(ctx):
results = await ctx.gather(
task1=ctx.task("svc", "task1"),
task2=ctx.task("svc", "task2")
)
if results["task1"] and results["task2"]:
return {"status": "success", "results": results}
else:
return {"status": "partial_failure"}Conditional Signal Waiting
@workflow
async def conditional_approval(ctx, needs_approval: bool):
if needs_approval:
approval = await ctx.signal("approval_signal", timeout_ms=60000)
if not approval.get("approved"):
return {"status": "rejected"}
# Proceed with operation
result = await ctx.task("service", "operation")
return {"status": "completed", "result": result}LLM with Tool Execution
@function
async def agent_handler(ctx, query: str):
# Register tools
search = ctx.tools.register("search", handler=search_web, ...)
calc = ctx.tools.register("calculator", handler=calculate, ...)
# Generate with tools
response = await ctx.llm.generate(
prompt=query,
tools=[search, calc],
model="gpt-4o"
)
# Execute tool calls if needed
if response.tool_calls:
for tool_call in response.tool_calls:
handler = ctx.tools.handler(tool_call.name)
await handler(**tool_call.arguments)
return response