Skip to content
Docs
API Reference Worker Runtime — AGNT5 Python SDK

Worker Runtime

Configure and deploy Python workers for AGNT5

The Worker class is the high-level runtime that integrates with the AGNT5 platform, automatically registers decorated components, and handles execution coordination.

Worker Configuration

Basic Worker

import asyncio
from agnt5 import Worker, function

@function()
def hello(name: str) -> str:
    return f"Hello, {name}!"

async def main():
    worker = Worker(service_name="hello-service")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Configuration Parameters

worker = Worker(
    service_name="my-service",           # Required: Service identifier
    service_version="1.2.0",             # Version string (default: "1.0.0")
    coordinator_endpoint="http://localhost:9091",  # Worker coordinator URL
    runtime="standalone"                  # Runtime mode: "standalone" or "asgi"
)
Parameter Type Description Default
service_name str Service identifier for registration Required
service_version str Version string for this service "1.0.0"
coordinator_endpoint str Worker coordinator URL "http://localhost:9091"
runtime str Runtime adapter: "standalone" or "asgi" "standalone"

Runtime Modes

Standalone Runtime

For background workers, batch processing, and daemon processes:

import asyncio
from agnt5 import Worker, function

@function()
def background_task(data: dict) -> dict:
    # Process data in background
    return {"processed": True, "result": data}

async def main():
    # Standalone worker blocks until stopped
    worker = Worker(
        service_name="background-processor",
        runtime="standalone"
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Characteristics:

  • Blocks until manually stopped (Ctrl+C)
  • Connects to worker coordinator
  • Ideal for background processing
  • Built-in signal handling
  • OpenTelemetry integration

ASGI Runtime

For web applications and HTTP endpoints:

from agnt5 import Worker, function

@function()
def web_handler(request: dict) -> dict:
    return {"message": "Hello from AGNT5!", "data": request}

# Create ASGI application
app = Worker(
    service_name="web-service",
    runtime="asgi"
)

# Enable CORS for browser access
app.enable_cors()

Run with any ASGI server:

# Install ASGI server
pip install uvicorn

# Run the application
uvicorn main:app --reload --port 8000

ASGI Endpoints:

Endpoint Method Description
/health GET Health check endpoint
/functions GET List registered functions
/invoke/{handler} POST Invoke specific function

Environment Configuration

Environment Variables

Configure workers using environment variables:

# Service configuration
export AGNT5_SERVICE_NAME=my-service
export AGNT5_SERVICE_VERSION=2.0.0
export AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com

# Logging configuration
export AGNT5_LOG_LEVEL=INFO
export AGNT5_LOG_FORMAT=json

# Runtime configuration
export AGNT5_RUNTIME=standalone
export AGNT5_DISABLE_TELEMETRY=false

Configuration Priority

Configuration sources in order of precedence:

  1. Constructor parameters - Highest priority
  2. Environment variables - Medium priority
  3. Default values - Lowest priority
# This worker uses constructor values over environment
worker = Worker(
    service_name="explicit-service",  # Overrides AGNT5_SERVICE_NAME
    coordinator_endpoint="http://localhost:9091"
)

Worker Lifecycle

Initialization

async def main():
    worker = Worker("my-service")

    # Worker validates Rust extension availability
    # Creates runtime adapter (standalone or ASGI)
    # Installs OpenTelemetry logging
    # Registers all decorated functions and workflows

    await worker.run()  # Starts the worker loop

Registration Process

  1. Function Discovery: Scans for @function decorated callables
  2. Workflow Discovery: Scans for @workflow decorated factories
  3. Component Registration: Sends metadata to coordinator
  4. Service Announcement: Service becomes available for invocations

Graceful Shutdown

import signal
import asyncio
from agnt5 import Worker

class GracefulWorker:
    def __init__(self):
        self.worker = Worker("graceful-service")
        self.shutdown_requested = False

    async def run(self):
        # Register signal handlers
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGINT, self._signal_handler)

        try:
            await self.worker.run()
        except KeyboardInterrupt:
            print("Shutdown requested via keyboard interrupt")
        finally:
            await self._cleanup()

    def _signal_handler(self, signum, frame):
        print(f"Received signal {signum}, initiating graceful shutdown...")
        self.shutdown_requested = True

    async def _cleanup(self):
        print("Cleaning up resources...")
        # Perform cleanup tasks
        # Close database connections
        # Finish in-flight requests
        # Remove OpenTelemetry handlers
        print("Cleanup complete")

async def main():
    graceful_worker = GracefulWorker()
    await graceful_worker.run()

if __name__ == "__main__":
    asyncio.run(main())

ASGI Integration

Basic ASGI App

from agnt5 import Worker, function

@function()
def api_endpoint(data: dict) -> dict:
    return {"status": "success", "received": data}

# Create ASGI app
app = Worker("api-service", runtime="asgi")

CORS Configuration

Enable CORS for browser access:

# Enable CORS with defaults (allows all origins)
app.enable_cors()

# Enable CORS with specific origins
app.enable_cors(origins=["https://myapp.com", "https://localhost:3000"])

# Disable CORS
app.disable_cors()

Custom Middleware

Add ASGI middleware:

from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware

app = Worker("api-service", runtime="asgi")

# Add middleware (if using Starlette/FastAPI patterns)
# Note: This is conceptual - actual middleware integration depends on ASGI runtime implementation

Error Handling

ASGI runtime provides consistent error responses:

# Function that raises an exception
@function()
def failing_function(data: dict) -> dict:
    raise ValueError("Something went wrong")

# ASGI runtime catches and formats the error:
# {
#   "error": "Function failing_function failed: Something went wrong",
#   "status": 500
# }

Worker Methods

Runtime Control

worker = Worker("my-service")

# Check if worker is running
if worker.is_running():
    print("Worker is active")

# For ASGI workers only - get ASGI callable
if worker.runtime == "asgi":
    asgi_app = worker.__call__  # ASGI callable interface

Component Registration

# Manual component registration (usually automatic)
worker._register_components()

# Internal message handling (not part of public API)
# worker._handle_message(request)

Observability

OpenTelemetry Integration

Workers automatically install OpenTelemetry logging:

import logging
from agnt5 import Worker
from agnt5.logging import install_opentelemetry_logging, remove_opentelemetry_logging

# Custom logging setup
logger = logging.getLogger("my-service")

# Install telemetry with custom formatter
install_opentelemetry_logging(
    logger=logger,
    level=logging.DEBUG,
    format_string="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

worker = Worker("my-service")

# Telemetry is automatically cleaned up on worker shutdown

Structured Logging

import logging
from agnt5 import Worker, function

logger = logging.getLogger(__name__)

@function()
def logged_function(ctx, data: dict) -> dict:
    # Structured logging with context
    logger.info(
        "Processing function",
        extra={
            "invocation_id": ctx.invocation_id,
            "service_name": ctx.metadata.get("service_name"),
            "data_size": len(str(data))
        }
    )

    result = {"processed": True}

    logger.info(
        "Function completed",
        extra={
            "invocation_id": ctx.invocation_id,
            "success": True
        }
    )

    return result

Development Patterns

Hot Reload Development

import os
import sys
from agnt5 import Worker, function

# Development configuration
if os.getenv("ENVIRONMENT") == "development":
    import logging
    logging.basicConfig(level=logging.DEBUG)

@function()
def development_handler(data: dict) -> dict:
    return {"env": "development", "data": data}

async def main():
    worker = Worker(
        service_name="dev-service",
        coordinator_endpoint=os.getenv("COORDINATOR_URL", "http://localhost:9091")
    )

    try:
        await worker.run()
    except KeyboardInterrupt:
        print("\nDevelopment worker stopped")
        sys.exit(0)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Testing Workers

import pytest
from unittest.mock import AsyncMock, patch
from agnt5 import Worker, function

@function()
def test_function(data: str) -> str:
    return data.upper()

@pytest.fixture
async def mock_worker():
    with patch('agnt5.worker.PyWorker'):
        worker = Worker("test-service")
        yield worker

@pytest.mark.asyncio
async def test_worker_registration(mock_worker):
    # Test component registration
    mock_worker._register_components()

    # Verify functions are registered
    from agnt5.decorators import get_registered_functions
    functions = get_registered_functions()
    assert "test_function" in functions

@pytest.mark.asyncio
async def test_worker_asgi_mode():
    app = Worker("test-service", runtime="asgi")
    assert callable(app)  # ASGI callable interface

Production Deployment

Container Deployment

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# Run worker
CMD ["python", "worker.py"]

Environment Configuration

# Production environment variables
AGNT5_SERVICE_NAME=production-service
AGNT5_SERVICE_VERSION=1.2.0
AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com
AGNT5_LOG_LEVEL=INFO
AGNT5_LOG_FORMAT=json

Health Monitoring

import asyncio
import logging
from agnt5 import Worker, function

logger = logging.getLogger(__name__)

@function()
def health_check() -> dict:
    """Health check endpoint."""
    return {
        "status": "healthy",
        "service": "production-service",
        "timestamp": time.time()
    }

async def main():
    worker = Worker(
        service_name="production-service",
        service_version=os.getenv("SERVICE_VERSION", "1.0.0")
    )

    logger.info("Starting production worker")

    try:
        await worker.run()
    except Exception as e:
        logger.error(f"Worker failed: {e}")
        raise
    finally:
        logger.info("Worker shutdown complete")

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

Service Design

  1. Service Naming - Use consistent, descriptive service names
  2. Version Management - Use semantic versioning for service versions
  3. Resource Management - Clean up resources in shutdown handlers
  4. Error Handling - Handle exceptions gracefully in workers
  5. Health Checks - Implement health check functions for monitoring

Performance

  1. Connection Pooling - Reuse database and HTTP connections
  2. Async Operations - Use async functions for I/O operations
  3. Resource Limits - Configure appropriate memory and CPU limits
  4. Scaling - Deploy multiple worker instances for high throughput
  5. Monitoring - Track worker performance and error rates

Security

  1. Input Validation - Validate all function inputs
  2. Error Messages - Don’t expose sensitive information in errors
  3. Authentication - Use proper authentication for coordinator connections
  4. Network Security - Use secure connections (HTTPS/TLS) in production
  5. Secrets Management - Use environment variables for sensitive configuration

Next Steps

Core Primitives

Agent Development Kit

  • Agents - Autonomous LLM-driven systems
  • Tools - Extend agent capabilities
  • Sessions - Conversation management
  • Memory - Long-term knowledge storage

Resources

© 2026 AGNT5
llms.txt