Skip to content
Docs
Build Functions

Functions

Define stateless units of work that AGNT5 can register, retry, time out, and checkpoint inside workflows.

A function is a stateless unit of work: it receives inputs, runs your logic, and returns a result. The platform handles registration, retries, and timeouts. When called via ctx.step() inside a workflow, the result is checkpointed so a restart never runs it twice.

Creating a function

One thing is required: a decorated async (or sync) Python function.

from agnt5 import function, FunctionContext

@function
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    # call your email provider here
    return f"Sent to {to}"
Parameter Type Default Description
name str function.__name__ How the function appears in the platform.
retries int | RetryPolicy None How many times to retry on failure.
backoff str | BackoffPolicy None How to space out retries: "constant", "linear", or "exponential".
timeout_ms int None Cut off the function after this many milliseconds.

Example with retries and timeout:

@function(name="send_email", retries=3, backoff="exponential", timeout_ms=10000)
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    ctx.logger.info("Sending email", to=to)
    # call your email provider here
    return f"Sent to {to}"

Sync functions work too. AGNT5 automatically runs them in a thread pool.

FunctionContext

Name the first parameter ctx and AGNT5 injects a FunctionContext automatically.

@function
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    ctx.logger.info("Sending email", to=to, attempt=ctx.attempt)
    # call your email provider here
    return f"Sent to {to}"
Property / method Description
ctx.run_id Unique ID for this execution (useful for logging and tracing)
ctx.attempt Which retry this is (0 = first try)
ctx.logger Structured logger. Pass extra fields as keyword args: ctx.logger.info("msg", key=value)
ctx.sleep(seconds) Pause execution without blocking the event loop

If you don’t need context, leave ctx out and the function still works fine.


Retries and backoff

Shorthand

Pass an integer for retries and a string for the backoff strategy.

@function(retries=3, backoff="exponential")
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    if ctx.attempt > 0:
        ctx.logger.info("Retrying email", to=to, attempt=ctx.attempt)
    # call your email provider here
    return f"Sent to {to}"

Full control

Use RetryPolicy and BackoffPolicy for precise tuning.

from agnt5 import function, FunctionContext
from agnt5.types import RetryPolicy, BackoffPolicy, BackoffType

@function(
    retries=RetryPolicy(max_attempts=5, initial_interval_ms=500, max_interval_ms=30000),
    backoff=BackoffPolicy(type=BackoffType.EXPONENTIAL, multiplier=2.0),
)
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    ctx.logger.info("Sending email", to=to, attempt=ctx.attempt)
    # call your email provider here
    return f"Sent to {to}"

RetryPolicy parameters:

Parameter Default Description
max_attempts 3 Total tries, including the first
initial_interval_ms 1000 Wait before the first retry
max_interval_ms 60000 Maximum wait between retries

BackoffPolicy parameters:

Parameter Default Description
type EXPONENTIAL CONSTANT (fixed wait), LINEAR (grows steadily), EXPONENTIAL (doubles each time)
multiplier 2.0 How fast the wait grows

AGNT5 runs your function body once per attempt. Use ctx.attempt if you need to vary behaviour on retries.


Calling from a workflow

Inside a workflow, always call functions with ctx.step(). This tells AGNT5 to checkpoint the result. If the workflow restarts, the function is skipped and the saved result is returned directly.

from agnt5 import workflow, WorkflowContext
from myapp.functions import send_email

@workflow
async def notify_workflow(ctx: WorkflowContext, user_email: str) -> str:
    result = await ctx.step(send_email, user_email, "Welcome!", "Thanks for signing up.")
    return result

result is whatever send_email returned. A plain await send_email(ctx, ...) also works but is not checkpointed. The function re-runs on every replay.

How you call it Checkpointed When to use
await ctx.step(send_email, ...) Yes Inside a workflow (always prefer this)
await send_email(ctx, ...) No Outside a workflow, or in local tests
© 2026 AGNT5
llms.txt