Skip to content
Docs
API Reference Basic Worker — AGNT5 Python SDK

Basic Worker

Simple standalone worker with function handlers

A minimal AGNT5 worker demonstrating function registration and execution.

Complete Example

import asyncio
import logging
from agnt5 import Worker, function

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@function()
def greet(name: str) -> str:
    """Greet a user by name."""
    logger.info(f"Greeting user: {name}")
    return f"Hello, {name}!"

@function("math.add")
def add_numbers(a: int, b: int) -> int:
    """Add two numbers together."""
    logger.info(f"Adding {a} + {b}")
    return a + b

@function()
def process_data(data: dict) -> dict:
    """Process a data dictionary."""
    logger.info(f"Processing data with {len(data)} keys")

    # Simulate processing
    processed_data = {
        key: str(value).upper() if isinstance(value, str) else value
        for key, value in data.items()
    }

    return {
        "original": data,
        "processed": processed_data,
        "status": "completed"
    }

async def main():
    """Main worker entry point."""
    logger.info("Starting AGNT5 worker...")

    worker = Worker(
        service_name="basic-worker",
        service_version="1.0.0"
    )

    try:
        await worker.run()
    except KeyboardInterrupt:
        logger.info("Worker stopped by user")
    except Exception as e:
        logger.error(f"Worker error: {e}")
        raise

if __name__ == "__main__":
    asyncio.run(main())

Running the Worker

Deploy to AGNT5

# Authenticate and deploy
agnt5 auth login
agnt5 deploy

Run the Worker Locally

# Run the worker
python worker.py

Expected output:

INFO:__main__:Starting AGNT5 worker...
INFO:agnt5.worker:Starting worker for service: basic-worker
INFO:agnt5.worker:Registered function: greet
INFO:agnt5.worker:Registered function: math.add
INFO:agnt5.worker:Registered function: process_data
INFO:agnt5.worker:Worker running, waiting for tasks...

Testing Functions

Using HTTP API

Test functions via the AGNT5 Gateway:

# Test greet function
curl -X POST http://localhost:8080/call \
  -H "Content-Type: application/json" \
  -d '{
    "serviceName": "basic-worker",
    "handlerName": "greet",
    "inputData": "QWxpY2U="
  }'

# Test math function
curl -X POST http://localhost:8080/call \
  -H "Content-Type: application/json" \
  -d '{
    "serviceName": "basic-worker",
    "handlerName": "math.add",
    "inputData": "eyJhIjogNSwgImIiOiAzfQ=="
  }'
Note:

Input Data Encoding: The inputData field expects base64-encoded JSON. Use tools like echo '{"a": 5, "b": 3}' | base64 to encode data.

Using Python Client

import asyncio
import json
import base64
from agnt5 import Client

async def test_functions():
    """Test all worker functions."""
    client = Client("http://localhost:8080")

    # Test greet function
    name_data = base64.b64encode(json.dumps("Alice").encode()).decode()
    result = await client.call(
        service_name="basic-worker",
        handler_name="greet",
        input_data=name_data
    )
    print(f"Greet result: {result}")

    # Test math function
    math_data = base64.b64encode(json.dumps({"a": 10, "b": 5}).encode()).decode()
    result = await client.call(
        service_name="basic-worker",
        handler_name="math.add",
        input_data=math_data
    )
    print(f"Math result: {result}")

    # Test data processing
    process_data = base64.b64encode(json.dumps({
        "name": "john doe",
        "status": "active",
        "count": 42
    }).encode()).decode()

    result = await client.call(
        service_name="basic-worker",
        handler_name="process_data",
        input_data=process_data
    )
    print(f"Process result: {result}")

if __name__ == "__main__":
    asyncio.run(test_functions())

Error Handling

Add error handling to make functions more robust:

import asyncio
import logging
from agnt5 import Worker, function

logger = logging.getLogger(__name__)

@function()
def safe_divide(a: float, b: float) -> dict:
    """Safely divide two numbers with error handling."""
    try:
        if b == 0:
            return {
                "error": "Division by zero",
                "result": None
            }

        result = a / b
        logger.info(f"Division successful: {a} / {b} = {result}")

        return {
            "result": result,
            "error": None
        }

    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {
            "error": str(e),
            "result": None
        }

@function()
def validate_email(email: str) -> dict:
    """Validate email address format."""
    import re

    try:
        # Basic email validation
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        is_valid = bool(re.match(pattern, email))

        return {
            "email": email,
            "valid": is_valid,
            "message": "Valid email" if is_valid else "Invalid email format"
        }

    except Exception as e:
        logger.error(f"Email validation error: {e}")
        return {
            "email": email,
            "valid": False,
            "message": f"Validation error: {e}"
        }

async def main():
    worker = Worker("robust-worker")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Configuration

Environment Variables

# Service configuration
export AGNT5_SERVICE_NAME=basic-worker
export AGNT5_SERVICE_VERSION=1.0.0

# Coordinator endpoint
export AGNT5_COORDINATOR_ENDPOINT=http://localhost:9091

# Logging
export AGNT5_LOG_LEVEL=DEBUG

python worker.py

Configuration File

import os
import asyncio
import logging
from agnt5 import Worker, function
from agnt5.logging import install_opentelemetry_logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Install telemetry
install_opentelemetry_logging(logger=logger, level=logging.INFO)

@function()
def configured_handler(data: dict) -> dict:
    """Handler with access to configuration."""
    return {
        "service_name": os.getenv("AGNT5_SERVICE_NAME", "unknown"),
        "service_version": os.getenv("AGNT5_SERVICE_VERSION", "1.0.0"),
        "data": data
    }

async def main():
    # Worker with configuration
    worker = Worker(
        service_name=os.getenv("AGNT5_SERVICE_NAME", "configured-worker"),
        service_version=os.getenv("AGNT5_SERVICE_VERSION", "1.0.0"),
        coordinator_endpoint=os.getenv("AGNT5_COORDINATOR_ENDPOINT", "http://localhost:9091")
    )

    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Production Deployment

Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY worker.py .

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# Run worker
CMD ["python", "worker.py"]

Docker Compose

version: '3.8'

services:
  worker:
    build: .
    environment:
      - AGNT5_SERVICE_NAME=basic-worker
      - AGNT5_SERVICE_VERSION=1.0.0
      - AGNT5_COORDINATOR_ENDPOINT=http://coordinator:9091
      - AGNT5_LOG_LEVEL=INFO
    restart: unless-stopped
    depends_on:
      - coordinator
    networks:
      - agnt5

  coordinator:
    image: agnt5/coordinator:latest
    ports:
      - "9091:9091"
    networks:
      - agnt5

networks:
  agnt5:
    driver: bridge

Next Steps

© 2026 AGNT5
llms.txt