> For the complete documentation index, see [llms.txt](/llms.txt).
> A full single-fetch corpus is available at [llms-full.txt](/llms-full.txt).
---
last_verified: 2026-05-13
title: Worker Runtime
description: Configure and deploy Python workers for AGNT5
category: Python SDK
---

The `Worker` class is the high-level runtime that integrates with the AGNT5 platform, automatically registers decorated components, and handles execution coordination.

## Worker Configuration

### Basic Worker

```python
import asyncio
from agnt5 import Worker, function

@function()
def hello(name: str) -> str:
    return f"Hello, {name}!"

async def main():
    worker = Worker(service_name="hello-service")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())
```

### Configuration Parameters

```python
worker = Worker(
    service_name="my-service",           # Required: Service identifier
    service_version="1.2.0",             # Version string (default: "1.0.0")
    coordinator_endpoint="http://localhost:9091",  # Worker coordinator URL
    runtime="standalone"                  # Runtime mode: "standalone" or "asgi"
)
```

| Parameter | Type | Description | Default |
|-----------|------|-------------|---------|
| `service_name` | `str` | Service identifier for registration | **Required** |
| `service_version` | `str` | Version string for this service | `"1.0.0"` |
| `coordinator_endpoint` | `str` | Worker coordinator URL | `"http://localhost:9091"` |
| `runtime` | `str` | Runtime adapter: `"standalone"` or `"asgi"` | `"standalone"` |

## Runtime Modes

### Standalone Runtime

For background workers, batch processing, and daemon processes:

```python
import asyncio
from agnt5 import Worker, function

@function()
def background_task(data: dict) -> dict:
    # Process data in background
    return {"processed": True, "result": data}

async def main():
    # Standalone worker blocks until stopped
    worker = Worker(
        service_name="background-processor",
        runtime="standalone"
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())
```

**Characteristics:**
- Blocks until manually stopped (Ctrl+C)
- Connects to worker coordinator
- Ideal for background processing
- Built-in signal handling
- OpenTelemetry integration

### ASGI Runtime

For web applications and HTTP endpoints:

```python
from agnt5 import Worker, function

@function()
def web_handler(request: dict) -> dict:
    return {"message": "Hello from AGNT5!", "data": request}

# Create ASGI application
app = Worker(
    service_name="web-service",
    runtime="asgi"
)

# Enable CORS for browser access
app.enable_cors()
```

Run with any ASGI server:

```bash
# Install ASGI server
pip install uvicorn

# Run the application
uvicorn main:app --reload --port 8000
```

**ASGI Endpoints:**

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/health` | GET | Health check endpoint |
| `/functions` | GET | List registered functions |
| `/invoke/{handler}` | POST | Invoke specific function |

## Environment Configuration

### Environment Variables

Configure workers using environment variables:

```bash
# Service configuration
export AGNT5_SERVICE_NAME=my-service
export AGNT5_SERVICE_VERSION=2.0.0
export AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com

# Logging configuration
export AGNT5_LOG_LEVEL=INFO
export AGNT5_LOG_FORMAT=json

# Runtime configuration
export AGNT5_RUNTIME=standalone
export AGNT5_DISABLE_TELEMETRY=false
```

### Configuration Priority

Configuration sources in order of precedence:

1. **Constructor parameters** - Highest priority
2. **Environment variables** - Medium priority
3. **Default values** - Lowest priority

```python
# This worker uses constructor values over environment
worker = Worker(
    service_name="explicit-service",  # Overrides AGNT5_SERVICE_NAME
    coordinator_endpoint="http://localhost:9091"
)
```

## Worker Lifecycle

### Initialization

```python
async def main():
    worker = Worker("my-service")

    # Worker validates Rust extension availability
    # Creates runtime adapter (standalone or ASGI)
    # Installs OpenTelemetry logging
    # Registers all decorated functions and workflows

    await worker.run()  # Starts the worker loop
```

### Registration Process

1. **Function Discovery**: Scans for `@function` decorated callables
2. **Workflow Discovery**: Scans for `@workflow` decorated factories
3. **Component Registration**: Sends metadata to coordinator
4. **Service Announcement**: Service becomes available for invocations

### Graceful Shutdown

```python
import signal
import asyncio
from agnt5 import Worker

class GracefulWorker:
    def __init__(self):
        self.worker = Worker("graceful-service")
        self.shutdown_requested = False

    async def run(self):
        # Register signal handlers
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGINT, self._signal_handler)

        try:
            await self.worker.run()
        except KeyboardInterrupt:
            print("Shutdown requested via keyboard interrupt")
        finally:
            await self._cleanup()

    def _signal_handler(self, signum, frame):
        print(f"Received signal {signum}, initiating graceful shutdown...")
        self.shutdown_requested = True

    async def _cleanup(self):
        print("Cleaning up resources...")
        # Perform cleanup tasks
        # Close database connections
        # Finish in-flight requests
        # Remove OpenTelemetry handlers
        print("Cleanup complete")

async def main():
    graceful_worker = GracefulWorker()
    await graceful_worker.run()

if __name__ == "__main__":
    asyncio.run(main())
```

## ASGI Integration

### Basic ASGI App

```python
from agnt5 import Worker, function

@function()
def api_endpoint(data: dict) -> dict:
    return {"status": "success", "received": data}

# Create ASGI app
app = Worker("api-service", runtime="asgi")
```

### CORS Configuration

Enable CORS for browser access:

```python
# Enable CORS with defaults (allows all origins)
app.enable_cors()

# Enable CORS with specific origins
app.enable_cors(origins=["https://myapp.com", "https://localhost:3000"])

# Disable CORS
app.disable_cors()
```

### Custom Middleware

Add ASGI middleware:

```python
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware

app = Worker("api-service", runtime="asgi")

# Add middleware (if using Starlette/FastAPI patterns)
# Note: This is conceptual - actual middleware integration depends on ASGI runtime implementation
```

### Error Handling

ASGI runtime provides consistent error responses:

```python
# Function that raises an exception
@function()
def failing_function(data: dict) -> dict:
    raise ValueError("Something went wrong")

# ASGI runtime catches and formats the error:
# {
#   "error": "Function failing_function failed: Something went wrong",
#   "status": 500
# }
```

## Worker Methods

### Runtime Control

```python
worker = Worker("my-service")

# Check if worker is running
if worker.is_running():
    print("Worker is active")

# For ASGI workers only - get ASGI callable
if worker.runtime == "asgi":
    asgi_app = worker.__call__  # ASGI callable interface
```

### Component Registration

```python
# Manual component registration (usually automatic)
worker._register_components()

# Internal message handling (not part of public API)
# worker._handle_message(request)
```

## Observability

### OpenTelemetry Integration

Workers automatically install OpenTelemetry logging:

```python
import logging
from agnt5 import Worker
from agnt5.logging import install_opentelemetry_logging, remove_opentelemetry_logging

# Custom logging setup
logger = logging.getLogger("my-service")

# Install telemetry with custom formatter
install_opentelemetry_logging(
    logger=logger,
    level=logging.DEBUG,
    format_string="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

worker = Worker("my-service")

# Telemetry is automatically cleaned up on worker shutdown
```

### Structured Logging

```python
import logging
from agnt5 import Worker, function

logger = logging.getLogger(__name__)

@function()
def logged_function(ctx, data: dict) -> dict:
    # Structured logging with context
    logger.info(
        "Processing function",
        extra={
            "invocation_id": ctx.invocation_id,
            "service_name": ctx.metadata.get("service_name"),
            "data_size": len(str(data))
        }
    )

    result = {"processed": True}

    logger.info(
        "Function completed",
        extra={
            "invocation_id": ctx.invocation_id,
            "success": True
        }
    )

    return result
```

## Development Patterns

### Hot Reload Development

```python
import os
import sys
from agnt5 import Worker, function

# Development configuration
if os.getenv("ENVIRONMENT") == "development":
    import logging
    logging.basicConfig(level=logging.DEBUG)

@function()
def development_handler(data: dict) -> dict:
    return {"env": "development", "data": data}

async def main():
    worker = Worker(
        service_name="dev-service",
        coordinator_endpoint=os.getenv("COORDINATOR_URL", "http://localhost:9091")
    )

    try:
        await worker.run()
    except KeyboardInterrupt:
        print("\nDevelopment worker stopped")
        sys.exit(0)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
```

### Testing Workers

```python
import pytest
from unittest.mock import AsyncMock, patch
from agnt5 import Worker, function

@function()
def test_function(data: str) -> str:
    return data.upper()

@pytest.fixture
async def mock_worker():
    with patch('agnt5.worker.PyWorker'):
        worker = Worker("test-service")
        yield worker

@pytest.mark.asyncio
async def test_worker_registration(mock_worker):
    # Test component registration
    mock_worker._register_components()

    # Verify functions are registered
    from agnt5.decorators import get_registered_functions
    functions = get_registered_functions()
    assert "test_function" in functions

@pytest.mark.asyncio
async def test_worker_asgi_mode():
    app = Worker("test-service", runtime="asgi")
    assert callable(app)  # ASGI callable interface
```

## Production Deployment

### Container Deployment

```dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# Run worker
CMD ["python", "worker.py"]
```

### Environment Configuration

```bash
# Production environment variables
AGNT5_SERVICE_NAME=production-service
AGNT5_SERVICE_VERSION=1.2.0
AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com
AGNT5_LOG_LEVEL=INFO
AGNT5_LOG_FORMAT=json
```

### Health Monitoring

```python
import asyncio
import logging
from agnt5 import Worker, function

logger = logging.getLogger(__name__)

@function()
def health_check() -> dict:
    """Health check endpoint."""
    return {
        "status": "healthy",
        "service": "production-service",
        "timestamp": time.time()
    }

async def main():
    worker = Worker(
        service_name="production-service",
        service_version=os.getenv("SERVICE_VERSION", "1.0.0")
    )

    logger.info("Starting production worker")

    try:
        await worker.run()
    except Exception as e:
        logger.error(f"Worker failed: {e}")
        raise
    finally:
        logger.info("Worker shutdown complete")

if __name__ == "__main__":
    asyncio.run(main())
```

## Best Practices

### Service Design

1. **Service Naming** - Use consistent, descriptive service names
2. **Version Management** - Use semantic versioning for service versions
3. **Resource Management** - Clean up resources in shutdown handlers
4. **Error Handling** - Handle exceptions gracefully in workers
5. **Health Checks** - Implement health check functions for monitoring

### Performance

1. **Connection Pooling** - Reuse database and HTTP connections
2. **Async Operations** - Use async functions for I/O operations
3. **Resource Limits** - Configure appropriate memory and CPU limits
4. **Scaling** - Deploy multiple worker instances for high throughput
5. **Monitoring** - Track worker performance and error rates

### Security

1. **Input Validation** - Validate all function inputs
2. **Error Messages** - Don't expose sensitive information in errors
3. **Authentication** - Use proper authentication for coordinator connections
4. **Network Security** - Use secure connections (HTTPS/TLS) in production
5. **Secrets Management** - Use environment variables for sensitive configuration

## Next Steps

### Core Primitives
- [Functions](functions) - Stateless operations with retries
- [Entities](entity) - Stateful components
- [Workflows](workflows) - Multi-step orchestration
- [Context API](context) - Full API reference

### Agent Development Kit
- [Agents](agent) - Autonomous LLM-driven systems
- [Tools](tool) - Extend agent capabilities
- [Sessions](session) - Conversation management
- [Memory](memory) - Long-term knowledge storage

### Resources
- [Examples](examples/basic-worker) - Worker deployment examples
