Functions

The building blocks of AI Workflows


Functions are the smallest units of execution in AGNT5 — any piece of code that does work. They can be called directly or as part of a workflow. Unlike normal code, they execute deterministically and can be safely retried without side effects.

Example

from agnt5 import function

@function()
async def analyze_research_paper(paper_url: str) -> dict:
    content = await fetch_pdf(paper_url)
    analysis = await call_llm(f"Analyze this research paper:\n\n{content}")
    return {"paper_url": paper_url, "key_findings": analysis}

When you call it:

from agnt5 import Client

client = Client()
client.run("analyze_research_paper", {"paper_url": "https://arxiv.org/pdf/..."})

AGNT5:

  • Logs the invocation (input + function version).
  • Executes the code deterministically.
  • Stores the output for replay or debugging.
  • Emits OpenTelemetry traces automatically.

If you rerun the same call, AGNT5 replays the cached result — the function doesn’t re-execute.

Function Context

Add FunctionContext as the first parameter to access execution metadata and utilities:

from agnt5 import function, FunctionContext

@function()
async def extract_insights(ctx: FunctionContext, transcript_url: str) -> dict:
    ctx.log("Extracting insights from transcript", url=transcript_url, attempt=ctx.attempt)

    transcript = await fetch_transcript(transcript_url)
    insights = await call_llm(f"Extract key insights:\n\n{transcript}")

    ctx.log("Completed extraction", insight_count=len(insights))
    return {"source": transcript_url, "insights": insights}

Context provides:

  • ctx.log(message, **extra) — Structured logging with correlation IDs
  • ctx.run_id — Unique execution identifier
  • ctx.attempt — Current retry attempt (0-indexed)
  • ctx.should_retry(error) — Check if error is retryable
  • ctx.sleep(seconds) — Non-durable async sleep
  • ctx.logger — Full logger for .debug(), .warning(), .error()

Context is optional. If your function doesn’t need logging or metadata, omit it:

@function()
async def add_numbers(a: int, b: int) -> int:
    return a + b  # No context needed

What context does NOT provide:

  • State management (ctx.get(), ctx.set()) — use workflows or entities instead
  • Orchestration (ctx.task(), ctx.parallel()) — use workflows instead
  • Checkpointing (ctx.step()) — use workflows instead

Functions are stateless and atomic. For multi-step orchestration or state management, use workflows.

Retry Policies

Add automatic retry with exponential backoff:

from agnt5 import function

@function(retries=5, backoff="exponential")
async def query_knowledge_base(query: str) -> dict:
    response = await vector_db.search(query, top_k=10)
    return {"query": query, "results": response}

Fine-tune the retry behavior:

from agnt5 import function, RetryPolicy, BackoffPolicy, BackoffType

@function(
    retries=RetryPolicy(
        max_attempts=5,
        initial_interval_ms=1000,
        max_interval_ms=30000
    ),
    backoff=BackoffPolicy(
        type=BackoffType.EXPONENTIAL,
        multiplier=2.0
    )
)
async def generate_embeddings(texts: list[str]) -> list[list[float]]:
    # Retries: 1s, 2s, 4s, 8s, 16s (capped at 30s)
    return await embedding_model.encode(texts)

Available backoff strategies:

  • Exponential (default): 1s → 2s → 4s → 8s (multiplier applies)
  • Linear: 1s → 2s → 3s → 4s (constant increment)
  • Constant: 1s → 1s → 1s → 1s (fixed interval)

Input Validation with Pydantic

Use Pydantic models for automatic input/output validation:

from pydantic import BaseModel
from agnt5 import function, FunctionContext

class ResearchQuery(BaseModel):
    topic: str
    max_results: int
    date_range: str

class ResearchResults(BaseModel):
    papers: list[dict]
    summary: str
    total_found: int

@function()
async def research_topic(ctx: FunctionContext, query: ResearchQuery) -> ResearchResults:
    ctx.log("Researching topic", topic=query.topic)

    papers = await search_arxiv(query.topic, limit=query.max_results)
    summary = await call_llm(f"Summarize these research findings: {papers}")

    return ResearchResults(
        papers=papers,
        summary=summary,
        total_found=len(papers)
    )

AGNT5 extracts JSON schemas from Pydantic models automatically and validates inputs before execution.

Calling Functions

From Client SDK

from agnt5 import Client

client = Client()
result = client.run("analyze_research_paper", {"paper_url": "https://arxiv.org/pdf/..."})

Local Testing

Call functions directly in your code:

from agnt5 import FunctionContext

ctx = FunctionContext(run_id="test-123")
result = await analyze_research_paper(ctx, "https://arxiv.org/pdf/...")

For functions without context:

result = await extract_keywords("AI agents enable autonomous task execution")

Observability

Every function execution is automatically instrumented with OpenTelemetry. Traces connect from the API request through workflow logic to every function call. When you use ctx.log(), correlation IDs are included automatically:

{
  "message": "Extracting insights from transcript",
  "run_id": "abc123",
  "trace_id": "def456",
  "span_id": "ghi789",
  "url": "https://example.com/transcript.txt",
  "attempt": 0
}

This makes debugging straightforward: search logs by run_id, find the trace, and replay the exact invocation locally to reproduce the issue.

Functions as Foundation

Functions are the atomic unit of execution in AGNT5. They’re the building blocks for everything else: entities, workflows, and agents.

What makes this work is how AGNT5 treats functions: as versioned computations where results persist and can be replayed. When you call a function, the input and function version create a unique signature. If you call it again with identical input, AGNT5 returns the stored result immediately — no re-execution.

This enables three guarantees:

  • Exactly-once semantics — Every invocation is processed once, even during failures or retries
  • Deterministic execution — Same input always produces the same output, enabling safe replay
  • Replay capability — Re-run historical invocations to test code changes or debug production issues

Functions don’t manage state — they compute outputs from inputs. This separation keeps functions simple while enabling the durability that more complex patterns need.

When an LLM call times out, when a process crashes, when a network partition happens — functions don’t lose progress. They resume from checkpoints. This is what makes AI workflows reliable: every piece of logic you write — every function — is a durable unit of computation. That’s the foundation everything else builds on.