Entities
Stateful components with unique keys and single-writer consistency
Entities are stateful components identified by unique keys. Use entities to model AI agents with conversation history, workflow orchestrators, or any business object that maintains state across interactions.
Key Characteristics
- Unique Key - Each instance identified by a unique key (e.g.,
agent-conv-123) - Private State - Built-in key-value storage per instance
- Single-Writer - Automatic consistency - only one write operation per key at a time
- Durable - State survives crashes and restarts
- Scalable - Different keys execute in parallel
Implementation Status
Entities are being implemented in Phase 2 of AGNT5 (Target: Q1 2025). The API shown represents the planned design. Check current SDK status for availability.
Basic Usage
Creating an Entity
from agnt5 import entity
# Create entity type
agent = entity("ConversationAgent")
# Write method (exclusive access per key)
@agent.write
async def send_message(ctx, message: str) -> dict:
history = await ctx.get("history", [])
history.append({"role": "user", "content": message})
response = await call_llm(history)
history.append({"role": "assistant", "content": response})
ctx.set("history", history)
return {"response": response}
# Shared method (read-only, concurrent)
@agent.shared
async def get_history(ctx) -> list:
return await ctx.get("history", [])Calling Entities
Call entity methods from functions:
from agnt5 import function
@function
async def chat(ctx, conv_id: str, msg: str):
# Call entity method with unique key
return await ctx.entity("ConversationAgent", conv_id).send_message(msg)Entity API
Core Methods
| API | Description |
|---|---|
entity("name") | Create entity type |
@entity.write | Write method (exclusive per key) |
@entity.shared | Shared method (read-only, concurrent) |
ctx.get(key, default) | Get state value |
ctx.set(key, value) | Set state value |
ctx.delete(key) | Delete state key |
ctx.entity(type, key).method() | Call entity from function |
State Operations
Common Patterns
Conversational AI Agent
agent = entity("ChatAgent")
@agent.write
async def send_message(ctx, message: str) -> dict:
"""Handle conversational turns with LLM."""
history = await ctx.get("history", [])
history.append({"role": "user", "content": message})
# Generate response
response = await ctx.llm.generate(
prompt=history,
model="gpt-4"
)
history.append({"role": "assistant", "content": response.text})
# Keep last 20 messages
if len(history) > 20:
history = history[-20:]
ctx.set("history", history)
return {"response": response.text}
@agent.shared
async def get_history(ctx) -> list:
"""Get conversation history (read-only)."""
return await ctx.get("history", [])
@agent.shared
async def get_message_count(ctx) -> int:
"""Get total message count."""
history = await ctx.get("history", [])
return len(history)Usage:
@function
async def chat_endpoint(ctx, conversation_id: str, message: str):
# Call entity with unique conversation ID
return await ctx.entity("ChatAgent", conversation_id).send_message(message)Research Agent
research_agent = entity("ResearchAgent")
@research_agent.write
async def start_research(ctx, topic: str) -> dict:
"""Initialize research task."""
ctx.set("topic", topic)
ctx.set("findings", [])
ctx.set("status", "in_progress")
return {"status": "started", "topic": topic}
@research_agent.write
async def add_finding(ctx, finding: str, source: str) -> dict:
"""Add research finding."""
findings = await ctx.get("findings", [])
findings.append({
"content": finding,
"source": source,
"timestamp": datetime.now().isoformat()
})
ctx.set("findings", findings)
return {"count": len(findings)}
@research_agent.write
async def synthesize(ctx) -> dict:
"""Generate summary from findings."""
findings = await ctx.get("findings", [])
topic = await ctx.get("topic")
# Use LLM to synthesize
summary = await ctx.llm.generate(
prompt=f"Synthesize these findings about {topic}: {findings}",
model="gpt-4"
)
ctx.set("summary", summary.text)
ctx.set("status", "completed")
return {"summary": summary.text}
@research_agent.shared
async def get_progress(ctx) -> dict:
"""Check research progress."""
return {
"status": await ctx.get("status"),
"topic": await ctx.get("topic"),
"findings_count": len(await ctx.get("findings", []))
}Workflow Orchestrator
workflow = entity("WorkflowOrchestrator")
@workflow.write
async def start(ctx, steps: list) -> dict:
"""Start workflow execution."""
ctx.set("steps", steps)
ctx.set("current_step", 0)
ctx.set("results", [])
ctx.set("status", "running")
return {"status": "started", "total_steps": len(steps)}
@workflow.write
async def complete_step(ctx, result: dict) -> dict:
"""Mark step as complete and store result."""
results = await ctx.get("results", [])
results.append(result)
ctx.set("results", results)
current = len(results)
ctx.set("current_step", current)
# Check if workflow is done
steps = await ctx.get("steps", [])
if current >= len(steps):
ctx.set("status", "completed")
return {"completed": current, "total": len(steps)}
@workflow.shared
async def get_progress(ctx) -> dict:
"""Get workflow progress."""
return {
"current_step": await ctx.get("current_step", 0),
"total_steps": len(await ctx.get("steps", [])),
"status": await ctx.get("status", "unknown")
}Consistency & Concurrency
Single-Writer Per Key
Only one write operation per entity key executes at a time:
# Same key = serial execution (consistency guaranteed)
await ctx.entity("agent", "conv-1").send_message("msg1") # Runs first
await ctx.entity("agent", "conv-1").send_message("msg2") # Runs second
# No race conditions, no lost updatesParallel Execution Across Keys
Different entity keys execute in parallel:
# Different keys = parallel execution (scales horizontally)
await ctx.entity("agent", "conv-1").send_message(msg) # Parallel
await ctx.entity("agent", "conv-2").send_message(msg) # Parallel
await ctx.entity("agent", "conv-3").send_message(msg) # ParallelShared Methods for Reads
Use @entity.shared for read-only operations that can run concurrently:
# Multiple shared calls can run in parallel for same key
@agent.shared
async def get_history(ctx) -> list:
return await ctx.get("history", [])
# These execute concurrently
await ctx.entity("agent", "conv-1").get_history() # Concurrent
await ctx.entity("agent", "conv-1").get_history() # ConcurrentBest Practices
1. Choose Stable, Meaningful Keys
Use unique, stable identifiers for entity keys:
2. Design for Concurrency
Choose key granularity for optimal parallelism:
# ✓ Good - One entity per conversation
await ctx.entity("ChatAgent", f"conv-{conv_id}").send_message(msg)
# ✗ Bad - Single global entity (serializes everything)
await ctx.entity("ChatAgent", "global").send_message(msg)3. Use Shared for Read Operations
Enable concurrent reads with @entity.shared:
# Write methods - exclusive access
@agent.write
async def update_state(ctx, data: dict):
ctx.set("state", data)
# Read methods - concurrent access
@agent.shared
async def get_state(ctx) -> dict:
return await ctx.get("state", {})4. Keep State Minimal
Store only what you need:
# ✓ Good - Essential state only
ctx.set("history", recent_messages[-20:])
ctx.set("summary", summary_text)
# ✗ Avoid - Excessive state
ctx.set("full_transcript", all_messages) # Could be huge
ctx.set("raw_responses", all_llm_responses) # RedundantEntity Use Cases
| Use Case | Entity Key | State Stored |
|---|---|---|
| AI Chat Agent | agent-conv-{id} | Conversation history, context |
| Research Task | research-{task_id} | Findings, sources, summary |
| Workflow Orchestrator | workflow-{run_id} | Step progress, results |
| User Context | user-{user_id} | Preferences, personalization |
| Shopping Cart | cart-{session_id} | Items, totals, discounts |
| Game Session | game-{session_id} | Player state, score, progress |
Functions vs Entities
| Aspect | Functions | Entities |
|---|---|---|
| State | Stateless | Stateful (KV store) |
| Identity | No identity | Unique key per instance |
| Concurrency | Parallel by default | Serial per key, parallel across keys |
| Consistency | No consistency needed | Single-writer guarantee |
| Use Case | Transformations, API calls | Stateful AI agents, workflows |
When to use Functions:
- Stateless operations
- Independent requests
- Data transformations
- API integrations
When to use Entities:
- Stateful AI agents with memory
- Workflow orchestration
- User sessions and context
- Any state that needs consistency
Next Steps
- Context API - Entity state operations and APIs
- Functions - Stateless operations
- Workflows - Multi-step orchestration
- Agent Component - AI agents built on entities