> For the complete documentation index, see [llms.txt](/llms.txt).
> A full single-fetch corpus is available at [llms-full.txt](/llms-full.txt).
---
last_verified: 2026-05-13
title: Context API
description: Execution context with APIs for orchestration, state, AI, and observability
category: Python SDK
---

The **Context** (`ctx`) is the execution environment provided to all AGNT5 components. It provides APIs for orchestration, state management, LLM interactions, coordination, and observability.

## Core Capabilities

- **Orchestration** - Execute tasks, spawn functions, parallel execution
- **State Management** - Get/set/delete state for entities
- **Coordination** - Signals, timers, human approvals
- **AI Integration** - LLM calls, tool registration
- **Observability** - Logging, metrics, tracing

## Orchestration APIs

### Task Execution

Execute functions and wait for results (workflows only):

```python
@workflow
async def process_workflow(ctx):
    # Execute a task
    result = await ctx.task(
        service_name="analytics",
        handler_name="process_data",
        input={"dataset": "users"}
    )

    return result
```

### Parallel Execution

Run multiple tasks concurrently:

<Tabs defaultValue="parallel">
  <TabsList>
    <TabsTrigger value="parallel">Parallel</TabsTrigger>
    <TabsTrigger value="gather">Gather (Named)</TabsTrigger>
  </TabsList>

  <TabsContent value="parallel">
    ```python
    # Returns list of results in order
    results = await ctx.parallel(
        ctx.task("service1", "handler1"),
        ctx.task("service2", "handler2"),
        ctx.task("service3", "handler3")
    )

    # Access results by index
    result1 = results[0]
    result2 = results[1]
    ```
  </TabsContent>

  <TabsContent value="gather">
    ```python
    # Returns dict with named results
    results = await ctx.gather(
        db=ctx.task("analytics", "analyze_db"),
        api=ctx.task("analytics", "analyze_api"),
        cache=ctx.task("analytics", "analyze_cache")
    )

    # Access results by name
    db_result = results["db"]
    api_result = results["api"]
    ```
  </TabsContent>
</Tabs>

### Async Invocation

Spawn child functions without waiting:

```python
@function
async def batch_processor(ctx, items: list):
    # Spawn child invocations
    handles = []
    for item in items:
        handle = ctx.spawn(process_item, item, key=f"item-{item['id']}")
        handles.append(handle)

    # Continue other work...

    # Wait for results later if needed
    results = [await h.result() for h in handles]
    return {"processed": len(results)}
```

### Checkpointing

Checkpoint expensive operations (functions only):

```python
@function
async def process_pipeline(ctx, data_id: str):
    # Each step is checkpointed
    raw = await ctx.step("extract", lambda: extract_data(data_id))
    cleaned = await ctx.step("clean", lambda: clean_data(raw))
    result = await ctx.step("analyze", lambda: analyze(cleaned))

    # If crash occurs, resumes from last completed step
    return result
```

## State Management (Entities)

### Get, Set, Delete

Manage entity state:

```python
@entity.write
async def update_profile(ctx, name: str, age: int):
    # Get with default
    profile = await ctx.get("profile", {})

    # Update profile
    profile.update({"name": name, "age": age})

    # Set state
    ctx.set("profile", profile)
    ctx.set("last_updated", datetime.now().isoformat())

    # Delete temporary data
    ctx.delete("temp_cache")

    return {"status": "updated"}
```

### Entity Method Calls

Call entity methods from functions:

```python
@function
async def chat(ctx, conversation_id: str, message: str):
    # Call entity method
    response = await ctx.entity(
        "ChatAgent",
        conversation_id
    ).send_message(message)

    return response
```

## Coordination APIs

### Signals

Wait for external events:

<Tabs defaultValue="wait">
  <TabsList>
    <TabsTrigger value="wait">Wait for Signal</TabsTrigger>
    <TabsTrigger value="emit">Emit Signal</TabsTrigger>
  </TabsList>

  <TabsContent value="wait">
    ```python
    @workflow
    async def approval_workflow(ctx, document_id: str):
        # Submit for review
        await ctx.task("docs", "submit_review", input={"id": document_id})

        # Wait for approval signal (24 hours timeout)
        approval = await ctx.signal(
            "manager_approved",
            timeout_ms=86400000,
            default={"approved": False}
        )

        if approval["approved"]:
            await ctx.task("docs", "publish", input={"id": document_id})
            return {"status": "published"}
        else:
            return {"status": "rejected"}
    ```
  </TabsContent>

  <TabsContent value="emit">
    ```python
    @function
    async def approve_document(ctx, workflow_id: str):
        # Send approval signal to waiting workflow
        await ctx.signal.emit(
            "manager_approved",
            target_workflow_id=workflow_id,
            payload={"approved": True, "approver": "manager@example.com"}
        )

        return {"status": "signal_sent"}
    ```
  </TabsContent>
</Tabs>

### Timers & Sleep

Add delays and scheduled execution:

```python
@workflow
async def scheduled_job(ctx):
    # Wait 5 seconds
    await ctx.timer(delay_ms=5000)

    # Or use sleep (alternative syntax)
    await ctx.sleep(30)  # 30 seconds

    # Wait until specific time (cron)
    await ctx.timer(cron="0 0 * * *")  # Daily at midnight

    return {"status": "completed"}
```

### Human-in-the-Loop

Request human approval:

```python
@workflow
async def deployment_workflow(ctx, version: str):
    # Run tests
    test_results = await ctx.task("ci", "run_tests", input={"version": version})

    if test_results["passed"]:
        # Request human approval for production
        approval = await ctx.human.approval(
            "deploy_production",
            payload={"version": version, "tests": test_results},
            timeout=timedelta(minutes=30),
            required_roles=["admin", "devops"]
        )

        if approval.decision == "approved":
            await ctx.task("deploy", "to_production", input={"version": version})
            return {"status": "deployed"}

    return {"status": "cancelled"}
```

## AI Integration

### LLM Generation

Generate text or structured responses:

<Tabs defaultValue="simple">
  <TabsList>
    <TabsTrigger value="simple">Simple</TabsTrigger>
    <TabsTrigger value="chat">Chat</TabsTrigger>
    <TabsTrigger value="structured">Structured</TabsTrigger>
  </TabsList>

  <TabsContent value="simple">
    ```python
    @function
    async def summarize(ctx, text: str):
        response = await ctx.llm.generate(
            prompt=f"Summarize this text: {text}",
            model="gpt-4o-mini"
        )

        return {"summary": response.text}
    ```
  </TabsContent>

  <TabsContent value="chat">
    ```python
    @function
    async def chat_response(ctx, messages: list):
        response = await ctx.llm.generate(
            prompt=[
                {"role": "system", "content": "You are a helpful assistant"},
                *messages
            ],
            model="gpt-4"
        )

        return {"response": response.text}
    ```
  </TabsContent>

  <TabsContent value="structured">
    ```python
    @function
    async def extract_info(ctx, text: str):
        # JSON Schema-constrained output
        response = await ctx.llm.generate(
            prompt=f"Extract information: {text}",
            schema={
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "age": {"type": "integer"},
                    "email": {"type": "string"}
                },
                "required": ["name"]
            },
            model="gpt-4o-mini"
        )

        return response.object  # Parsed JSON
    ```
  </TabsContent>
</Tabs>

### Streaming

Stream responses for real-time output:

```python
@function
async def stream_story(ctx, topic: str):
    # Stream text generation
    async for chunk in await ctx.llm.stream(
        prompt=f"Write a story about {topic}",
        model="gpt-4o"
    ):
        if chunk.text:
            yield chunk.text  # Stream to client
```

### Tool Registration

Register tools for LLM use:

```python
@function
async def agent_with_tools(ctx, query: str):
    # Register search tool
    search_tool = ctx.tools.register(
        "web_search",
        handler=perform_search,
        description="Search the web for information",
        schema={
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "max_results": {"type": "integer"}
            }
        }
    )

    # Generate with tool
    response = await ctx.llm.generate(
        prompt=query,
        tools=[search_tool],
        model="gpt-4o"
    )

    return response
```

## Observability

### Logging

Structured logging with context:

```python
@function
async def tracked_operation(ctx, data: dict):
    logger = ctx.log()

    logger.info("Processing started", extra={"data_size": len(data)})

    try:
        result = process(data)
        logger.info("Processing completed", extra={"result_size": len(result)})
        return result
    except Exception as e:
        logger.error("Processing failed", exc_info=True)
        raise
```

### Metrics

Record custom metrics:

```python
@function
async def monitored_function(ctx, request: dict):
    metrics = ctx.metrics()

    # Increment counter
    metrics.increment("requests.count", service="api")

    # Record timing
    start = time.time()
    result = await process_request(request)
    duration = (time.time() - start) * 1000

    metrics.observe("latency.ms", duration, endpoint="/api/process")

    return result
```

### Distributed Tracing

Create spans for tracing:

```python
@function
async def traced_operation(ctx, data: dict):
    # Create span for external API call
    with ctx.trace_span().start("external_api_call", service="payments"):
        result = await call_payment_api(data)

    # Create span for database operation
    with ctx.trace_span().start("database_query", service="postgres"):
        await save_to_db(result)

    return result
```

## Configuration & Secrets

### Secrets

Access secrets securely:

```python
@function
async def api_call(ctx, endpoint: str):
    # Get API key from secrets
    api_key = ctx.secrets().get("openai_api_key")
    db_password = ctx.secrets().get("database_password")

    # Use secrets in API calls
    response = await make_request(endpoint, api_key=api_key)
    return response
```

### Configuration

Feature flags and config:

```python
@function
async def feature_gated_handler(ctx, data: dict):
    config = ctx.config()

    # Check feature flag
    if config.get("new_feature_enabled", default=False):
        return await new_implementation(data)
    else:
        return await legacy_implementation(data)

    # A/B testing variant
    variant = config.variant("experiment_group", default="control")
    if variant == "treatment":
        return await experimental_flow(data)
```

### Request Headers

Access incoming headers:

```python
@function
async def header_aware(ctx, data: dict):
    headers = ctx.headers()

    user_agent = headers.get("user-agent", "unknown")
    correlation_id = headers.get("x-correlation-id")

    logger = ctx.log()
    logger.info(f"Request from {user_agent}", extra={"correlation_id": correlation_id})

    return {"processed": True}
```

## Context Properties

Access execution metadata:

```python
@function
async def introspective(ctx, data: dict):
    return {
        "run_id": ctx.run_id,              # Workflow/run identifier
        "step_id": ctx.step_id,            # Current step identifier
        "attempt": ctx.attempt,            # Retry attempt number
        "component_type": ctx.component_type,  # "function", "entity", "workflow"
        "object_id": ctx.object_id,        # Entity key (for entities)
        "method_name": ctx.method_name,    # Entity method name (for entities)
        "processed": data
    }
```

## API Reference

### Orchestration

| API | Description |
| --- | --- |
| `ctx.task(service, handler, input)` | Execute function (workflows only) |
| `ctx.parallel(*tasks)` | Run tasks in parallel |
| `ctx.gather(**tasks)` | Parallel with named results |
| `ctx.spawn(fn, *args, key)` | Async child invocation |
| `ctx.step(name, fn)` | Checkpoint operation (functions) |

### State (Entities)

| API | Description |
| --- | --- |
| `await ctx.get(key, default)` | Get state value |
| `ctx.set(key, value)` | Set state value |
| `ctx.delete(key)` | Delete state key |
| `await ctx.entity(type, key).method()` | Call entity method |

### Coordination

| API | Description |
| --- | --- |
| `await ctx.signal(name, timeout_ms, default)` | Wait for signal |
| `await ctx.signal.emit(name, payload)` | Send signal |
| `await ctx.timer(delay_ms)` | Wait with delay |
| `await ctx.timer(cron)` | Wait until cron time |
| `await ctx.sleep(seconds)` | Durable sleep |
| `await ctx.human.approval(...)` | Request approval |

### AI Integration

| API | Description |
| --- | --- |
| `await ctx.llm.generate(prompt, model)` | Generate text/JSON |
| `await ctx.llm.stream(prompt, model)` | Stream generation |
| `ctx.tools.register(name, handler, schema)` | Register tool |

### Observability

| API | Description |
| --- | --- |
| `ctx.log()` | Get logger |
| `ctx.metrics()` | Get metrics recorder |
| `ctx.trace_span().start(name, service)` | Create trace span |

### Configuration

| API | Description |
| --- | --- |
| `ctx.secrets().get(key)` | Get secret |
| `ctx.config().get(key, default)` | Get config value |
| `ctx.config().variant(key, default)` | Get A/B variant |
| `ctx.headers()` | Get request headers |

## Common Patterns

### Parallel with Error Handling

```python
@workflow
async def robust_workflow(ctx):
    results = await ctx.gather(
        task1=ctx.task("svc", "task1"),
        task2=ctx.task("svc", "task2")
    )

    if results["task1"] and results["task2"]:
        return {"status": "success", "results": results}
    else:
        return {"status": "partial_failure"}
```

### Conditional Signal Waiting

```python
@workflow
async def conditional_approval(ctx, needs_approval: bool):
    if needs_approval:
        approval = await ctx.signal("approval_signal", timeout_ms=60000)
        if not approval.get("approved"):
            return {"status": "rejected"}

    # Proceed with operation
    result = await ctx.task("service", "operation")
    return {"status": "completed", "result": result}
```

### LLM with Tool Execution

```python
@function
async def agent_handler(ctx, query: str):
    # Register tools
    search = ctx.tools.register("search", handler=search_web, ...)
    calc = ctx.tools.register("calculator", handler=calculate, ...)

    # Generate with tools
    response = await ctx.llm.generate(
        prompt=query,
        tools=[search, calc],
        model="gpt-4o"
    )

    # Execute tool calls if needed
    if response.tool_calls:
        for tool_call in response.tool_calls:
            handler = ctx.tools.handler(tool_call.name)
            await handler(**tool_call.arguments)

    return response
```

## Next Steps

- [Functions](functions) - Using context in functions
- [Entity](entity) - Using context in entities
- [Workflows](workflows) - Using context in workflows
- [Agent](agent) - AI integration with context
