Skip to content
Docs
API Reference Entities — AGNT5 Python SDK

Entities

Stateful components with unique keys and single-writer consistency

Entities are stateful components identified by unique keys. Use entities to model AI agents with conversation history, workflow orchestrators, or any business object that maintains state across interactions.

Key Characteristics

  • Unique Key - Each instance identified by a unique key (e.g., agent-conv-123)
  • Private State - Built-in key-value storage per instance
  • Single-Writer - Automatic consistency - only one write operation per key at a time
  • Durable - State survives crashes and restarts
  • Scalable - Different keys execute in parallel
Note:

Implementation Status

Entities are being implemented in Phase 2 of AGNT5 (Target: Q1 2025). The API shown represents the planned design. Check current SDK status for availability.

Basic Usage

Creating an Entity

from agnt5 import entity

# Create entity type
agent = entity("ConversationAgent")

# Write method (exclusive access per key)
@agent.write
async def send_message(ctx, message: str) -> dict:
    history = await ctx.get("history", [])
    history.append({"role": "user", "content": message})

    response = await call_llm(history)
    history.append({"role": "assistant", "content": response})

    ctx.set("history", history)
    return {"response": response}

# Shared method (read-only, concurrent)
@agent.shared
async def get_history(ctx) -> list:
    return await ctx.get("history", [])

Calling Entities

Call entity methods from functions:

from agnt5 import function

@function
async def chat(ctx, conv_id: str, msg: str):
    # Call entity method with unique key
    return await ctx.entity("ConversationAgent", conv_id).send_message(msg)

Entity API

Core Methods

API Description
entity("name") Create entity type
@entity.write Write method (exclusive per key)
@entity.shared Shared method (read-only, concurrent)
ctx.get(key, default) Get state value
ctx.set(key, value) Set state value
ctx.delete(key) Delete state key
ctx.entity(type, key).method() Call entity from function

State Operations

Common Patterns

Conversational AI Agent

agent = entity("ChatAgent")

@agent.write
async def send_message(ctx, message: str) -> dict:
    """Handle conversational turns with LLM."""
    history = await ctx.get("history", [])
    history.append({"role": "user", "content": message})

    # Generate response
    response = await ctx.llm.generate(
        prompt=history,
        model="gpt-4"
    )
    history.append({"role": "assistant", "content": response.text})

    # Keep last 20 messages
    if len(history) > 20:
        history = history[-20:]

    ctx.set("history", history)
    return {"response": response.text}

@agent.shared
async def get_history(ctx) -> list:
    """Get conversation history (read-only)."""
    return await ctx.get("history", [])

@agent.shared
async def get_message_count(ctx) -> int:
    """Get total message count."""
    history = await ctx.get("history", [])
    return len(history)

Usage:

@function
async def chat_endpoint(ctx, conversation_id: str, message: str):
    # Call entity with unique conversation ID
    return await ctx.entity("ChatAgent", conversation_id).send_message(message)

Research Agent

research_agent = entity("ResearchAgent")

@research_agent.write
async def start_research(ctx, topic: str) -> dict:
    """Initialize research task."""
    ctx.set("topic", topic)
    ctx.set("findings", [])
    ctx.set("status", "in_progress")
    return {"status": "started", "topic": topic}

@research_agent.write
async def add_finding(ctx, finding: str, source: str) -> dict:
    """Add research finding."""
    findings = await ctx.get("findings", [])
    findings.append({
        "content": finding,
        "source": source,
        "timestamp": datetime.now().isoformat()
    })
    ctx.set("findings", findings)
    return {"count": len(findings)}

@research_agent.write
async def synthesize(ctx) -> dict:
    """Generate summary from findings."""
    findings = await ctx.get("findings", [])
    topic = await ctx.get("topic")

    # Use LLM to synthesize
    summary = await ctx.llm.generate(
        prompt=f"Synthesize these findings about {topic}: {findings}",
        model="gpt-4"
    )

    ctx.set("summary", summary.text)
    ctx.set("status", "completed")
    return {"summary": summary.text}

@research_agent.shared
async def get_progress(ctx) -> dict:
    """Check research progress."""
    return {
        "status": await ctx.get("status"),
        "topic": await ctx.get("topic"),
        "findings_count": len(await ctx.get("findings", []))
    }

Workflow Orchestrator

workflow = entity("WorkflowOrchestrator")

@workflow.write
async def start(ctx, steps: list) -> dict:
    """Start workflow execution."""
    ctx.set("steps", steps)
    ctx.set("current_step", 0)
    ctx.set("results", [])
    ctx.set("status", "running")
    return {"status": "started", "total_steps": len(steps)}

@workflow.write
async def complete_step(ctx, result: dict) -> dict:
    """Mark step as complete and store result."""
    results = await ctx.get("results", [])
    results.append(result)
    ctx.set("results", results)

    current = len(results)
    ctx.set("current_step", current)

    # Check if workflow is done
    steps = await ctx.get("steps", [])
    if current >= len(steps):
        ctx.set("status", "completed")

    return {"completed": current, "total": len(steps)}

@workflow.shared
async def get_progress(ctx) -> dict:
    """Get workflow progress."""
    return {
        "current_step": await ctx.get("current_step", 0),
        "total_steps": len(await ctx.get("steps", [])),
        "status": await ctx.get("status", "unknown")
    }

Consistency & Concurrency

Single-Writer Per Key

Only one write operation per entity key executes at a time:

# Same key = serial execution (consistency guaranteed)
await ctx.entity("agent", "conv-1").send_message("msg1")  # Runs first
await ctx.entity("agent", "conv-1").send_message("msg2")  # Runs second

# No race conditions, no lost updates

Parallel Execution Across Keys

Different entity keys execute in parallel:

# Different keys = parallel execution (scales horizontally)
await ctx.entity("agent", "conv-1").send_message(msg)  # Parallel
await ctx.entity("agent", "conv-2").send_message(msg)  # Parallel
await ctx.entity("agent", "conv-3").send_message(msg)  # Parallel

Shared Methods for Reads

Use @entity.shared for read-only operations that can run concurrently:

# Multiple shared calls can run in parallel for same key
@agent.shared
async def get_history(ctx) -> list:
    return await ctx.get("history", [])

# These execute concurrently
await ctx.entity("agent", "conv-1").get_history()  # Concurrent
await ctx.entity("agent", "conv-1").get_history()  # Concurrent

Best Practices

1. Choose Stable, Meaningful Keys

Use unique, stable identifiers for entity keys:

2. Design for Concurrency

Choose key granularity for optimal parallelism:

# ✓ Good - One entity per conversation
await ctx.entity("ChatAgent", f"conv-{conv_id}").send_message(msg)

# ✗ Bad - Single global entity (serializes everything)
await ctx.entity("ChatAgent", "global").send_message(msg)

3. Use Shared for Read Operations

Enable concurrent reads with @entity.shared:

# Write methods - exclusive access
@agent.write
async def update_state(ctx, data: dict):
    ctx.set("state", data)

# Read methods - concurrent access
@agent.shared
async def get_state(ctx) -> dict:
    return await ctx.get("state", {})

4. Keep State Minimal

Store only what you need:

# ✓ Good - Essential state only
ctx.set("history", recent_messages[-20:])
ctx.set("summary", summary_text)

# ✗ Avoid - Excessive state
ctx.set("full_transcript", all_messages)  # Could be huge
ctx.set("raw_responses", all_llm_responses)  # Redundant

Entity Use Cases

Use Case Entity Key State Stored
AI Chat Agent agent-conv-{id} Conversation history, context
Research Task research-{task_id} Findings, sources, summary
Workflow Orchestrator workflow-{run_id} Step progress, results
User Context user-{user_id} Preferences, personalization
Shopping Cart cart-{session_id} Items, totals, discounts
Game Session game-{session_id} Player state, score, progress

Functions vs Entities

Aspect Functions Entities
State Stateless Stateful (KV store)
Identity No identity Unique key per instance
Concurrency Parallel by default Serial per key, parallel across keys
Consistency No consistency needed Single-writer guarantee
Use Case Transformations, API calls Stateful AI agents, workflows

When to use Functions:

  • Stateless operations
  • Independent requests
  • Data transformations
  • API integrations

When to use Entities:

  • Stateful AI agents with memory
  • Workflow orchestration
  • User sessions and context
  • Any state that needs consistency

Next Steps

© 2026 AGNT5
llms.txt