> For the complete documentation index, see [llms.txt](/llms.txt).
> A full single-fetch corpus is available at [llms-full.txt](/llms-full.txt).
---
title: Functions
description: Define stateless units of work that AGNT5 can register, retry, time out, and checkpoint inside workflows.
last_verified: 2026-06-02
---

A **function** is a stateless unit of work: it receives inputs, runs your logic, and returns a result. The platform handles registration, retries, and timeouts. When called via `ctx.step()` inside a workflow, the result is checkpointed so a restart never runs it twice.

## Creating a function

One thing is required: a decorated async (or sync) Python function.

```python
from agnt5 import function, FunctionContext

@function
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    # call your email provider here
    return f"Sent to {to}"
```

| Parameter | Type | Default | Description |
|---|---|---|---|
| `name` | `str` | `function.__name__` | How the function appears in the platform. |
| `retries` | `int \| RetryPolicy` | `None` | How many times to retry on failure. |
| `backoff` | `str \| BackoffPolicy` | `None` | How to space out retries: `"constant"`, `"linear"`, or `"exponential"`. |
| `timeout_ms` | `int` | `None` | Cut off the function after this many milliseconds. |

**Example with retries and timeout:**

```python
@function(name="send_email", retries=3, backoff="exponential", timeout_ms=10000)
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    ctx.logger.info("Sending email", to=to)
    # call your email provider here
    return f"Sent to {to}"
```

Sync functions work too. AGNT5 automatically runs them in a thread pool.


## FunctionContext

Name the first parameter `ctx` and AGNT5 injects a `FunctionContext` automatically.

```python
@function
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    ctx.logger.info("Sending email", to=to, attempt=ctx.attempt)
    # call your email provider here
    return f"Sent to {to}"
```

| Property / method | Description |
|---|---|
| `ctx.run_id` | Unique ID for this execution (useful for logging and tracing) |
| `ctx.attempt` | Which retry this is (0 = first try) |
| `ctx.logger` | Structured logger. Pass extra fields as keyword args: `ctx.logger.info("msg", key=value)` |
| `ctx.sleep(seconds)` | Pause execution without blocking the event loop |

If you don't need context, leave `ctx` out and the function still works fine.

---

## Retries and backoff

### Shorthand

Pass an integer for retries and a string for the backoff strategy.

```python
@function(retries=3, backoff="exponential")
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    if ctx.attempt > 0:
        ctx.logger.info("Retrying email", to=to, attempt=ctx.attempt)
    # call your email provider here
    return f"Sent to {to}"
```

### Full control

Use `RetryPolicy` and `BackoffPolicy` for precise tuning.

```python
from agnt5 import function, FunctionContext
from agnt5.types import RetryPolicy, BackoffPolicy, BackoffType

@function(
    retries=RetryPolicy(max_attempts=5, initial_interval_ms=500, max_interval_ms=30000),
    backoff=BackoffPolicy(type=BackoffType.EXPONENTIAL, multiplier=2.0),
)
async def send_email(ctx: FunctionContext, to: str, subject: str, body: str) -> str:
    ctx.logger.info("Sending email", to=to, attempt=ctx.attempt)
    # call your email provider here
    return f"Sent to {to}"
```

`RetryPolicy` parameters:

| Parameter | Default | Description |
|---|---|---|
| `max_attempts` | `3` | Total tries, including the first |
| `initial_interval_ms` | `1000` | Wait before the first retry |
| `max_interval_ms` | `60000` | Maximum wait between retries |

`BackoffPolicy` parameters:

| Parameter | Default | Description |
|---|---|---|
| `type` | `EXPONENTIAL` | `CONSTANT` (fixed wait), `LINEAR` (grows steadily), `EXPONENTIAL` (doubles each time) |
| `multiplier` | `2.0` | How fast the wait grows |

AGNT5 runs your function body once per attempt. Use `ctx.attempt` if you need to vary behaviour on retries.

---

## Calling from a workflow

Inside a workflow, always call functions with `ctx.step()`. This tells AGNT5 to checkpoint the result. If the workflow restarts, the function is skipped and the saved result is returned directly.

```python
from agnt5 import workflow, WorkflowContext
from myapp.functions import send_email

@workflow
async def notify_workflow(ctx: WorkflowContext, user_email: str) -> str:
    result = await ctx.step(send_email, user_email, "Welcome!", "Thanks for signing up.")
    return result
```

`result` is whatever `send_email` returned. A plain `await send_email(ctx, ...)` also works but is not checkpointed. The function re-runs on every replay.

| How you call it | Checkpointed | When to use |
|---|---|---|
| `await ctx.step(send_email, ...)` | Yes | Inside a workflow (always prefer this) |
| `await send_email(ctx, ...)` | No | Outside a workflow, or in local tests |
