Skip to content
Docs
API Reference Context API — AGNT5 Python SDK

Context API

Execution context with APIs for orchestration, state, AI, and observability

The Context (ctx) is the execution environment provided to all AGNT5 components. It provides APIs for orchestration, state management, LLM interactions, coordination, and observability.

Core Capabilities

  • Orchestration - Execute tasks, spawn functions, parallel execution
  • State Management - Get/set/delete state for entities
  • Coordination - Signals, timers, human approvals
  • AI Integration - LLM calls, tool registration
  • Observability - Logging, metrics, tracing

Orchestration APIs

Task Execution

Execute functions and wait for results (workflows only):

@workflow
async def process_workflow(ctx):
    # Execute a task
    result = await ctx.task(
        service_name="analytics",
        handler_name="process_data",
        input={"dataset": "users"}
    )

    return result

Parallel Execution

Run multiple tasks concurrently:

Async Invocation

Spawn child functions without waiting:

@function
async def batch_processor(ctx, items: list):
    # Spawn child invocations
    handles = []
    for item in items:
        handle = ctx.spawn(process_item, item, key=f"item-{item['id']}")
        handles.append(handle)

    # Continue other work...

    # Wait for results later if needed
    results = [await h.result() for h in handles]
    return {"processed": len(results)}

Checkpointing

Checkpoint expensive operations (functions only):

@function
async def process_pipeline(ctx, data_id: str):
    # Each step is checkpointed
    raw = await ctx.step("extract", lambda: extract_data(data_id))
    cleaned = await ctx.step("clean", lambda: clean_data(raw))
    result = await ctx.step("analyze", lambda: analyze(cleaned))

    # If crash occurs, resumes from last completed step
    return result

State Management (Entities)

Get, Set, Delete

Manage entity state:

@entity.write
async def update_profile(ctx, name: str, age: int):
    # Get with default
    profile = await ctx.get("profile", {})

    # Update profile
    profile.update({"name": name, "age": age})

    # Set state
    ctx.set("profile", profile)
    ctx.set("last_updated", datetime.now().isoformat())

    # Delete temporary data
    ctx.delete("temp_cache")

    return {"status": "updated"}

Entity Method Calls

Call entity methods from functions:

@function
async def chat(ctx, conversation_id: str, message: str):
    # Call entity method
    response = await ctx.entity(
        "ChatAgent",
        conversation_id
    ).send_message(message)

    return response

Coordination APIs

Signals

Wait for external events:

Timers & Sleep

Add delays and scheduled execution:

@workflow
async def scheduled_job(ctx):
    # Wait 5 seconds
    await ctx.timer(delay_ms=5000)

    # Or use sleep (alternative syntax)
    await ctx.sleep(30)  # 30 seconds

    # Wait until specific time (cron)
    await ctx.timer(cron="0 0 * * *")  # Daily at midnight

    return {"status": "completed"}

Human-in-the-Loop

Request human approval:

@workflow
async def deployment_workflow(ctx, version: str):
    # Run tests
    test_results = await ctx.task("ci", "run_tests", input={"version": version})

    if test_results["passed"]:
        # Request human approval for production
        approval = await ctx.human.approval(
            "deploy_production",
            payload={"version": version, "tests": test_results},
            timeout=timedelta(minutes=30),
            required_roles=["admin", "devops"]
        )

        if approval.decision == "approved":
            await ctx.task("deploy", "to_production", input={"version": version})
            return {"status": "deployed"}

    return {"status": "cancelled"}

AI Integration

LLM Generation

Generate text or structured responses:

Streaming

Stream responses for real-time output:

@function
async def stream_story(ctx, topic: str):
    # Stream text generation
    async for chunk in await ctx.llm.stream(
        prompt=f"Write a story about {topic}",
        model="gpt-4o"
    ):
        if chunk.text:
            yield chunk.text  # Stream to client

Tool Registration

Register tools for LLM use:

@function
async def agent_with_tools(ctx, query: str):
    # Register search tool
    search_tool = ctx.tools.register(
        "web_search",
        handler=perform_search,
        description="Search the web for information",
        schema={
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "max_results": {"type": "integer"}
            }
        }
    )

    # Generate with tool
    response = await ctx.llm.generate(
        prompt=query,
        tools=[search_tool],
        model="gpt-4o"
    )

    return response

Observability

Logging

Structured logging with context:

@function
async def tracked_operation(ctx, data: dict):
    logger = ctx.log()

    logger.info("Processing started", extra={"data_size": len(data)})

    try:
        result = process(data)
        logger.info("Processing completed", extra={"result_size": len(result)})
        return result
    except Exception as e:
        logger.error("Processing failed", exc_info=True)
        raise

Metrics

Record custom metrics:

@function
async def monitored_function(ctx, request: dict):
    metrics = ctx.metrics()

    # Increment counter
    metrics.increment("requests.count", service="api")

    # Record timing
    start = time.time()
    result = await process_request(request)
    duration = (time.time() - start) * 1000

    metrics.observe("latency.ms", duration, endpoint="/api/process")

    return result

Distributed Tracing

Create spans for tracing:

@function
async def traced_operation(ctx, data: dict):
    # Create span for external API call
    with ctx.trace_span().start("external_api_call", service="payments"):
        result = await call_payment_api(data)

    # Create span for database operation
    with ctx.trace_span().start("database_query", service="postgres"):
        await save_to_db(result)

    return result

Configuration & Secrets

Secrets

Access secrets securely:

@function
async def api_call(ctx, endpoint: str):
    # Get API key from secrets
    api_key = ctx.secrets().get("openai_api_key")
    db_password = ctx.secrets().get("database_password")

    # Use secrets in API calls
    response = await make_request(endpoint, api_key=api_key)
    return response

Configuration

Feature flags and config:

@function
async def feature_gated_handler(ctx, data: dict):
    config = ctx.config()

    # Check feature flag
    if config.get("new_feature_enabled", default=False):
        return await new_implementation(data)
    else:
        return await legacy_implementation(data)

    # A/B testing variant
    variant = config.variant("experiment_group", default="control")
    if variant == "treatment":
        return await experimental_flow(data)

Request Headers

Access incoming headers:

@function
async def header_aware(ctx, data: dict):
    headers = ctx.headers()

    user_agent = headers.get("user-agent", "unknown")
    correlation_id = headers.get("x-correlation-id")

    logger = ctx.log()
    logger.info(f"Request from {user_agent}", extra={"correlation_id": correlation_id})

    return {"processed": True}

Context Properties

Access execution metadata:

@function
async def introspective(ctx, data: dict):
    return {
        "run_id": ctx.run_id,              # Workflow/run identifier
        "step_id": ctx.step_id,            # Current step identifier
        "attempt": ctx.attempt,            # Retry attempt number
        "component_type": ctx.component_type,  # "function", "entity", "workflow"
        "object_id": ctx.object_id,        # Entity key (for entities)
        "method_name": ctx.method_name,    # Entity method name (for entities)
        "processed": data
    }

API Reference

Orchestration

API Description
ctx.task(service, handler, input) Execute function (workflows only)
ctx.parallel(*tasks) Run tasks in parallel
ctx.gather(**tasks) Parallel with named results
ctx.spawn(fn, *args, key) Async child invocation
ctx.step(name, fn) Checkpoint operation (functions)

State (Entities)

API Description
await ctx.get(key, default) Get state value
ctx.set(key, value) Set state value
ctx.delete(key) Delete state key
await ctx.entity(type, key).method() Call entity method

Coordination

API Description
await ctx.signal(name, timeout_ms, default) Wait for signal
await ctx.signal.emit(name, payload) Send signal
await ctx.timer(delay_ms) Wait with delay
await ctx.timer(cron) Wait until cron time
await ctx.sleep(seconds) Durable sleep
await ctx.human.approval(...) Request approval

AI Integration

API Description
await ctx.llm.generate(prompt, model) Generate text/JSON
await ctx.llm.stream(prompt, model) Stream generation
ctx.tools.register(name, handler, schema) Register tool

Observability

API Description
ctx.log() Get logger
ctx.metrics() Get metrics recorder
ctx.trace_span().start(name, service) Create trace span

Configuration

API Description
ctx.secrets().get(key) Get secret
ctx.config().get(key, default) Get config value
ctx.config().variant(key, default) Get A/B variant
ctx.headers() Get request headers

Common Patterns

Parallel with Error Handling

@workflow
async def robust_workflow(ctx):
    results = await ctx.gather(
        task1=ctx.task("svc", "task1"),
        task2=ctx.task("svc", "task2")
    )

    if results["task1"] and results["task2"]:
        return {"status": "success", "results": results}
    else:
        return {"status": "partial_failure"}

Conditional Signal Waiting

@workflow
async def conditional_approval(ctx, needs_approval: bool):
    if needs_approval:
        approval = await ctx.signal("approval_signal", timeout_ms=60000)
        if not approval.get("approved"):
            return {"status": "rejected"}

    # Proceed with operation
    result = await ctx.task("service", "operation")
    return {"status": "completed", "result": result}

LLM with Tool Execution

@function
async def agent_handler(ctx, query: str):
    # Register tools
    search = ctx.tools.register("search", handler=search_web, ...)
    calc = ctx.tools.register("calculator", handler=calculate, ...)

    # Generate with tools
    response = await ctx.llm.generate(
        prompt=query,
        tools=[search, calc],
        model="gpt-4o"
    )

    # Execute tool calls if needed
    if response.tool_calls:
        for tool_call in response.tool_calls:
            handler = ctx.tools.handler(tool_call.name)
            await handler(**tool_call.arguments)

    return response

Next Steps

  • Functions - Using context in functions
  • Entity - Using context in entities
  • Workflows - Using context in workflows
  • Agent - AI integration with context
© 2026 AGNT5
llms.txt