Functions
Define stateless units of work that AGNT5 can register, retry, time out, and checkpoint inside workflows.
A function is a stateless unit of work: it receives inputs, runs your logic, and returns a result. The platform handles registration, retries, and timeouts. When called via ctx.step() inside a workflow, the result is checkpointed so a restart never runs it twice.
Creating a function
One thing is required: a decorated async (or sync) Python function.
from agnt5 import function, FunctionContext
@function
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
# call your email provider here
return f"Sent to {to}"| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | function.__name__ | How the function appears in the platform. |
retries | int | RetryPolicy | None | How many times to retry on failure. |
backoff | str | BackoffPolicy | None | How to space out retries: "constant", "linear", or "exponential". |
timeout_ms | int | None | Cut off the function after this many milliseconds. |
Example with retries and timeout:
@function(name="send_email", retries=3, backoff="exponential", timeout_ms=10000)
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
ctx.logger.info("Sending email", to=to)
# call your email provider here
return f"Sent to {to}"Sync functions work too. AGNT5 automatically runs them in a thread pool.
FunctionContext
Name the first parameter ctx and AGNT5 injects a FunctionContext automatically.
@function
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
ctx.logger.info("Sending email", to=to, attempt=ctx.attempt)
# call your email provider here
return f"Sent to {to}"| Property / method | Description |
|---|---|
ctx.run_id | Unique ID for this execution (useful for logging and tracing) |
ctx.attempt | Which retry this is (0 = first try) |
ctx.logger | Structured logger. Pass extra fields as keyword args: ctx.logger.info("msg", key=value) |
ctx.sleep(seconds) | Pause execution without blocking the event loop |
If you don’t need context, leave ctx out and the function still works fine.
Retries and backoff
Shorthand
Pass an integer for retries and a string for the backoff strategy.
@function(retries=3, backoff="exponential")
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
if ctx.attempt > 0:
ctx.logger.info("Retrying email", to=to, attempt=ctx.attempt)
# call your email provider here
return f"Sent to {to}"Full control
Use RetryPolicy and BackoffPolicy for precise tuning.
from agnt5 import function, FunctionContext
from agnt5.types import RetryPolicy, BackoffPolicy, BackoffType
@function(
retries=RetryPolicy(max_attempts=5, initial_interval_ms=500, max_interval_ms=30000),
backoff=BackoffPolicy(type=BackoffType.EXPONENTIAL, multiplier=2.0),
)
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
ctx.logger.info("Sending email", to=to, attempt=ctx.attempt)
# call your email provider here
return f"Sent to {to}"RetryPolicy parameters:
| Parameter | Default | Description |
|---|---|---|
max_attempts | 3 | Total tries, including the first |
initial_interval_ms | 1000 | Wait before the first retry |
max_interval_ms | 60000 | Maximum wait between retries |
BackoffPolicy parameters:
| Parameter | Default | Description |
|---|---|---|
type | EXPONENTIAL | CONSTANT (fixed wait), LINEAR (grows steadily), EXPONENTIAL (doubles each time) |
multiplier | 2.0 | How fast the wait grows |
AGNT5 runs your function body once per attempt. Use ctx.attempt if you need to vary behaviour on retries.
Calling from a workflow
Inside a workflow, always call functions with ctx.step(). This tells AGNT5 to checkpoint the result. If the workflow restarts, the function is skipped and the saved result is returned directly.
from agnt5 import workflow, WorkflowContext
from myapp.functions import send_email
@workflow
async def notify_workflow(ctx: WorkflowContext, user_email: str) -> str:
result = await ctx.step(send_email, user_email, "Welcome!", "Thanks for signing up.")
return resultresult is whatever send_email returned. A plain await send_email(ctx, ...) also works but is not checkpointed. The function re-runs on every replay.
| How you call it | Checkpointed | When to use |
|---|---|---|
await ctx.step(send_email, ...) | Yes | Inside a workflow (always prefer this) |
await send_email(ctx, ...) | No | Outside a workflow, or in local tests |