> For the complete documentation index, see [llms.txt](/llms.txt).
> A full single-fetch corpus is available at [llms-full.txt](/llms-full.txt).
---
last_verified: 2026-05-13
title: Entities
description: Stateful components with unique keys and single-writer consistency
category: Python SDK
---

Entities are stateful components identified by unique keys. Use entities to model AI agents with conversation history, workflow orchestrators, or any business object that maintains state across interactions.

## Key Characteristics

- **Unique Key** - Each instance identified by a unique key (e.g., `agent-conv-123`)
- **Private State** - Built-in key-value storage per instance
- **Single-Writer** - Automatic consistency - only one write operation per key at a time
- **Durable** - State survives crashes and restarts
- **Scalable** - Different keys execute in parallel

<Callout type="info">
**Implementation Status**

Entities are being implemented in Phase 2 of AGNT5 (Target: Q1 2025). The API shown represents the planned design. Check current SDK status for availability.
</Callout>

## Basic Usage

### Creating an Entity

```python
from agnt5 import entity

# Create entity type
agent = entity("ConversationAgent")

# Write method (exclusive access per key)
@agent.write
async def send_message(ctx, message: str) -> dict:
    history = await ctx.get("history", [])
    history.append({"role": "user", "content": message})

    response = await call_llm(history)
    history.append({"role": "assistant", "content": response})

    ctx.set("history", history)
    return {"response": response}

# Shared method (read-only, concurrent)
@agent.shared
async def get_history(ctx) -> list:
    return await ctx.get("history", [])
```

### Calling Entities

Call entity methods from functions:

```python
from agnt5 import function

@function
async def chat(ctx, conv_id: str, msg: str):
    # Call entity method with unique key
    return await ctx.entity("ConversationAgent", conv_id).send_message(msg)
```

## Entity API

### Core Methods

| API | Description |
| --- | --- |
| `entity("name")` | Create entity type |
| `@entity.write` | Write method (exclusive per key) |
| `@entity.shared` | Shared method (read-only, concurrent) |
| `ctx.get(key, default)` | Get state value |
| `ctx.set(key, value)` | Set state value |
| `ctx.delete(key)` | Delete state key |
| `ctx.entity(type, key).method()` | Call entity from function |

### State Operations

<Tabs defaultValue="get">
  <TabsList>
    <TabsTrigger value="get">Get</TabsTrigger>
    <TabsTrigger value="set">Set</TabsTrigger>
    <TabsTrigger value="delete">Delete</TabsTrigger>
  </TabsList>

  <TabsContent value="get">
    ```python
    @agent.write
    async def process(ctx, data: dict) -> dict:
        # Get with default
        history = await ctx.get("history", [])
        count = await ctx.get("count", 0)

        return {"history": history, "count": count}
    ```
  </TabsContent>

  <TabsContent value="set">
    ```python
    @agent.write
    async def update_state(ctx, new_data: dict) -> dict:
        # Set values
        ctx.set("last_update", datetime.now().isoformat())
        ctx.set("data", new_data)
        ctx.set("version", 2)

        return {"status": "updated"}
    ```
  </TabsContent>

  <TabsContent value="delete">
    ```python
    @agent.write
    async def clear_cache(ctx) -> dict:
        # Delete keys
        ctx.delete("cached_results")
        ctx.delete("temporary_data")

        return {"status": "cleared"}
    ```
  </TabsContent>
</Tabs>

## Common Patterns

### Conversational AI Agent

```python
agent = entity("ChatAgent")

@agent.write
async def send_message(ctx, message: str) -> dict:
    """Handle conversational turns with LLM."""
    history = await ctx.get("history", [])
    history.append({"role": "user", "content": message})

    # Generate response
    response = await ctx.llm.generate(
        prompt=history,
        model="gpt-4"
    )
    history.append({"role": "assistant", "content": response.text})

    # Keep last 20 messages
    if len(history) > 20:
        history = history[-20:]

    ctx.set("history", history)
    return {"response": response.text}

@agent.shared
async def get_history(ctx) -> list:
    """Get conversation history (read-only)."""
    return await ctx.get("history", [])

@agent.shared
async def get_message_count(ctx) -> int:
    """Get total message count."""
    history = await ctx.get("history", [])
    return len(history)
```

Usage:

```python
@function
async def chat_endpoint(ctx, conversation_id: str, message: str):
    # Call entity with unique conversation ID
    return await ctx.entity("ChatAgent", conversation_id).send_message(message)
```

### Research Agent

```python
research_agent = entity("ResearchAgent")

@research_agent.write
async def start_research(ctx, topic: str) -> dict:
    """Initialize research task."""
    ctx.set("topic", topic)
    ctx.set("findings", [])
    ctx.set("status", "in_progress")
    return {"status": "started", "topic": topic}

@research_agent.write
async def add_finding(ctx, finding: str, source: str) -> dict:
    """Add research finding."""
    findings = await ctx.get("findings", [])
    findings.append({
        "content": finding,
        "source": source,
        "timestamp": datetime.now().isoformat()
    })
    ctx.set("findings", findings)
    return {"count": len(findings)}

@research_agent.write
async def synthesize(ctx) -> dict:
    """Generate summary from findings."""
    findings = await ctx.get("findings", [])
    topic = await ctx.get("topic")

    # Use LLM to synthesize
    summary = await ctx.llm.generate(
        prompt=f"Synthesize these findings about {topic}: {findings}",
        model="gpt-4"
    )

    ctx.set("summary", summary.text)
    ctx.set("status", "completed")
    return {"summary": summary.text}

@research_agent.shared
async def get_progress(ctx) -> dict:
    """Check research progress."""
    return {
        "status": await ctx.get("status"),
        "topic": await ctx.get("topic"),
        "findings_count": len(await ctx.get("findings", []))
    }
```

### Workflow Orchestrator

```python
workflow = entity("WorkflowOrchestrator")

@workflow.write
async def start(ctx, steps: list) -> dict:
    """Start workflow execution."""
    ctx.set("steps", steps)
    ctx.set("current_step", 0)
    ctx.set("results", [])
    ctx.set("status", "running")
    return {"status": "started", "total_steps": len(steps)}

@workflow.write
async def complete_step(ctx, result: dict) -> dict:
    """Mark step as complete and store result."""
    results = await ctx.get("results", [])
    results.append(result)
    ctx.set("results", results)

    current = len(results)
    ctx.set("current_step", current)

    # Check if workflow is done
    steps = await ctx.get("steps", [])
    if current >= len(steps):
        ctx.set("status", "completed")

    return {"completed": current, "total": len(steps)}

@workflow.shared
async def get_progress(ctx) -> dict:
    """Get workflow progress."""
    return {
        "current_step": await ctx.get("current_step", 0),
        "total_steps": len(await ctx.get("steps", [])),
        "status": await ctx.get("status", "unknown")
    }
```

## Consistency & Concurrency

### Single-Writer Per Key

Only one write operation per entity key executes at a time:

```python
# Same key = serial execution (consistency guaranteed)
await ctx.entity("agent", "conv-1").send_message("msg1")  # Runs first
await ctx.entity("agent", "conv-1").send_message("msg2")  # Runs second

# No race conditions, no lost updates
```

### Parallel Execution Across Keys

Different entity keys execute in parallel:

```python
# Different keys = parallel execution (scales horizontally)
await ctx.entity("agent", "conv-1").send_message(msg)  # Parallel
await ctx.entity("agent", "conv-2").send_message(msg)  # Parallel
await ctx.entity("agent", "conv-3").send_message(msg)  # Parallel
```

### Shared Methods for Reads

Use `@entity.shared` for read-only operations that can run concurrently:

```python
# Multiple shared calls can run in parallel for same key
@agent.shared
async def get_history(ctx) -> list:
    return await ctx.get("history", [])

# These execute concurrently
await ctx.entity("agent", "conv-1").get_history()  # Concurrent
await ctx.entity("agent", "conv-1").get_history()  # Concurrent
```

## Best Practices

### 1. Choose Stable, Meaningful Keys

Use unique, stable identifiers for entity keys:

<Tabs defaultValue="good">
  <TabsList>
    <TabsTrigger value="good">✓ Good Keys</TabsTrigger>
    <TabsTrigger value="bad">✗ Avoid</TabsTrigger>
  </TabsList>

  <TabsContent value="good">
    ```python
    # Descriptive and stable
    "agent-conv-{conversation_id}"
    "workflow-{run_id}"
    "user-{user_id}"
    "research-{task_id}"
    ```
  </TabsContent>

  <TabsContent value="bad">
    ```python
    # Not descriptive
    "abc123"

    # Changes every time
    "user-{timestamp}"

    # Too generic
    "agent-1"
    ```
  </TabsContent>
</Tabs>

### 2. Design for Concurrency

Choose key granularity for optimal parallelism:

```python
# ✓ Good - One entity per conversation
await ctx.entity("ChatAgent", f"conv-{conv_id}").send_message(msg)

# ✗ Bad - Single global entity (serializes everything)
await ctx.entity("ChatAgent", "global").send_message(msg)
```

### 3. Use Shared for Read Operations

Enable concurrent reads with `@entity.shared`:

```python
# Write methods - exclusive access
@agent.write
async def update_state(ctx, data: dict):
    ctx.set("state", data)

# Read methods - concurrent access
@agent.shared
async def get_state(ctx) -> dict:
    return await ctx.get("state", {})
```

### 4. Keep State Minimal

Store only what you need:

```python
# ✓ Good - Essential state only
ctx.set("history", recent_messages[-20:])
ctx.set("summary", summary_text)

# ✗ Avoid - Excessive state
ctx.set("full_transcript", all_messages)  # Could be huge
ctx.set("raw_responses", all_llm_responses)  # Redundant
```

## Entity Use Cases

| Use Case | Entity Key | State Stored |
| --- | --- | --- |
| AI Chat Agent | `agent-conv-{id}` | Conversation history, context |
| Research Task | `research-{task_id}` | Findings, sources, summary |
| Workflow Orchestrator | `workflow-{run_id}` | Step progress, results |
| User Context | `user-{user_id}` | Preferences, personalization |
| Shopping Cart | `cart-{session_id}` | Items, totals, discounts |
| Game Session | `game-{session_id}` | Player state, score, progress |

## Functions vs Entities

| Aspect | Functions | Entities |
| --- | --- | --- |
| State | Stateless | Stateful (KV store) |
| Identity | No identity | Unique key per instance |
| Concurrency | Parallel by default | Serial per key, parallel across keys |
| Consistency | No consistency needed | Single-writer guarantee |
| Use Case | Transformations, API calls | Stateful AI agents, workflows |

**When to use Functions:**
- Stateless operations
- Independent requests
- Data transformations
- API integrations

**When to use Entities:**
- Stateful AI agents with memory
- Workflow orchestration
- User sessions and context
- Any state that needs consistency

## Next Steps

- [Context API](context) - Entity state operations and APIs
- [Functions](functions) - Stateless operations
- [Workflows](workflows) - Multi-step orchestration
- [Agent Component](agent) - AI agents built on entities
