Worker Runtime
Configure and deploy Python workers for AGNT5
The Worker class is the high-level runtime that integrates with the AGNT5 platform, automatically registers decorated components, and handles execution coordination.
Worker Configuration
Basic Worker
import asyncio
from agnt5 import Worker, function
@function()
def hello(name: str) -> str:
return f"Hello, {name}!"
async def main():
worker = Worker(service_name="hello-service")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())Configuration Parameters
worker = Worker(
service_name="my-service", # Required: Service identifier
service_version="1.2.0", # Version string (default: "1.0.0")
coordinator_endpoint="http://localhost:9091", # Worker coordinator URL
runtime="standalone" # Runtime mode: "standalone" or "asgi"
)| Parameter | Type | Description | Default |
|---|---|---|---|
service_name | str | Service identifier for registration | Required |
service_version | str | Version string for this service | "1.0.0" |
coordinator_endpoint | str | Worker coordinator URL | "http://localhost:9091" |
runtime | str | Runtime adapter: "standalone" or "asgi" | "standalone" |
Runtime Modes
Standalone Runtime
For background workers, batch processing, and daemon processes:
import asyncio
from agnt5 import Worker, function
@function()
def background_task(data: dict) -> dict:
# Process data in background
return {"processed": True, "result": data}
async def main():
# Standalone worker blocks until stopped
worker = Worker(
service_name="background-processor",
runtime="standalone"
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())Characteristics:
- Blocks until manually stopped (Ctrl+C)
- Connects to worker coordinator
- Ideal for background processing
- Built-in signal handling
- OpenTelemetry integration
ASGI Runtime
For web applications and HTTP endpoints:
from agnt5 import Worker, function
@function()
def web_handler(request: dict) -> dict:
return {"message": "Hello from AGNT5!", "data": request}
# Create ASGI application
app = Worker(
service_name="web-service",
runtime="asgi"
)
# Enable CORS for browser access
app.enable_cors()Run with any ASGI server:
# Install ASGI server
pip install uvicorn
# Run the application
uvicorn main:app --reload --port 8000ASGI Endpoints:
| Endpoint | Method | Description |
|---|---|---|
/health | GET | Health check endpoint |
/functions | GET | List registered functions |
/invoke/{handler} | POST | Invoke specific function |
Environment Configuration
Environment Variables
Configure workers using environment variables:
# Service configuration
export AGNT5_SERVICE_NAME=my-service
export AGNT5_SERVICE_VERSION=2.0.0
export AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com
# Logging configuration
export AGNT5_LOG_LEVEL=INFO
export AGNT5_LOG_FORMAT=json
# Runtime configuration
export AGNT5_RUNTIME=standalone
export AGNT5_DISABLE_TELEMETRY=falseConfiguration Priority
Configuration sources in order of precedence:
- Constructor parameters - Highest priority
- Environment variables - Medium priority
- Default values - Lowest priority
# This worker uses constructor values over environment
worker = Worker(
service_name="explicit-service", # Overrides AGNT5_SERVICE_NAME
coordinator_endpoint="http://localhost:9091"
)Worker Lifecycle
Initialization
async def main():
worker = Worker("my-service")
# Worker validates Rust extension availability
# Creates runtime adapter (standalone or ASGI)
# Installs OpenTelemetry logging
# Registers all decorated functions and workflows
await worker.run() # Starts the worker loopRegistration Process
- Function Discovery: Scans for
@functiondecorated callables - Workflow Discovery: Scans for
@workflowdecorated factories - Component Registration: Sends metadata to coordinator
- Service Announcement: Service becomes available for invocations
Graceful Shutdown
import signal
import asyncio
from agnt5 import Worker
class GracefulWorker:
def __init__(self):
self.worker = Worker("graceful-service")
self.shutdown_requested = False
async def run(self):
# Register signal handlers
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
try:
await self.worker.run()
except KeyboardInterrupt:
print("Shutdown requested via keyboard interrupt")
finally:
await self._cleanup()
def _signal_handler(self, signum, frame):
print(f"Received signal {signum}, initiating graceful shutdown...")
self.shutdown_requested = True
async def _cleanup(self):
print("Cleaning up resources...")
# Perform cleanup tasks
# Close database connections
# Finish in-flight requests
# Remove OpenTelemetry handlers
print("Cleanup complete")
async def main():
graceful_worker = GracefulWorker()
await graceful_worker.run()
if __name__ == "__main__":
asyncio.run(main())ASGI Integration
Basic ASGI App
from agnt5 import Worker, function
@function()
def api_endpoint(data: dict) -> dict:
return {"status": "success", "received": data}
# Create ASGI app
app = Worker("api-service", runtime="asgi")CORS Configuration
Enable CORS for browser access:
# Enable CORS with defaults (allows all origins)
app.enable_cors()
# Enable CORS with specific origins
app.enable_cors(origins=["https://myapp.com", "https://localhost:3000"])
# Disable CORS
app.disable_cors()Custom Middleware
Add ASGI middleware:
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware
app = Worker("api-service", runtime="asgi")
# Add middleware (if using Starlette/FastAPI patterns)
# Note: This is conceptual - actual middleware integration depends on ASGI runtime implementationError Handling
ASGI runtime provides consistent error responses:
# Function that raises an exception
@function()
def failing_function(data: dict) -> dict:
raise ValueError("Something went wrong")
# ASGI runtime catches and formats the error:
# {
# "error": "Function failing_function failed: Something went wrong",
# "status": 500
# }Worker Methods
Runtime Control
worker = Worker("my-service")
# Check if worker is running
if worker.is_running():
print("Worker is active")
# For ASGI workers only - get ASGI callable
if worker.runtime == "asgi":
asgi_app = worker.__call__ # ASGI callable interfaceComponent Registration
# Manual component registration (usually automatic)
worker._register_components()
# Internal message handling (not part of public API)
# worker._handle_message(request)Observability
OpenTelemetry Integration
Workers automatically install OpenTelemetry logging:
import logging
from agnt5 import Worker
from agnt5.logging import install_opentelemetry_logging, remove_opentelemetry_logging
# Custom logging setup
logger = logging.getLogger("my-service")
# Install telemetry with custom formatter
install_opentelemetry_logging(
logger=logger,
level=logging.DEBUG,
format_string="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
worker = Worker("my-service")
# Telemetry is automatically cleaned up on worker shutdownStructured Logging
import logging
from agnt5 import Worker, function
logger = logging.getLogger(__name__)
@function()
def logged_function(ctx, data: dict) -> dict:
# Structured logging with context
logger.info(
"Processing function",
extra={
"invocation_id": ctx.invocation_id,
"service_name": ctx.metadata.get("service_name"),
"data_size": len(str(data))
}
)
result = {"processed": True}
logger.info(
"Function completed",
extra={
"invocation_id": ctx.invocation_id,
"success": True
}
)
return resultDevelopment Patterns
Hot Reload Development
import os
import sys
from agnt5 import Worker, function
# Development configuration
if os.getenv("ENVIRONMENT") == "development":
import logging
logging.basicConfig(level=logging.DEBUG)
@function()
def development_handler(data: dict) -> dict:
return {"env": "development", "data": data}
async def main():
worker = Worker(
service_name="dev-service",
coordinator_endpoint=os.getenv("COORDINATOR_URL", "http://localhost:9091")
)
try:
await worker.run()
except KeyboardInterrupt:
print("\nDevelopment worker stopped")
sys.exit(0)
if __name__ == "__main__":
import asyncio
asyncio.run(main())Testing Workers
import pytest
from unittest.mock import AsyncMock, patch
from agnt5 import Worker, function
@function()
def test_function(data: str) -> str:
return data.upper()
@pytest.fixture
async def mock_worker():
with patch('agnt5.worker.PyWorker'):
worker = Worker("test-service")
yield worker
@pytest.mark.asyncio
async def test_worker_registration(mock_worker):
# Test component registration
mock_worker._register_components()
# Verify functions are registered
from agnt5.decorators import get_registered_functions
functions = get_registered_functions()
assert "test_function" in functions
@pytest.mark.asyncio
async def test_worker_asgi_mode():
app = Worker("test-service", runtime="asgi")
assert callable(app) # ASGI callable interfaceProduction Deployment
Container Deployment
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run worker
CMD ["python", "worker.py"]Environment Configuration
# Production environment variables
AGNT5_SERVICE_NAME=production-service
AGNT5_SERVICE_VERSION=1.2.0
AGNT5_COORDINATOR_ENDPOINT=https://coordinator.agnt5.com
AGNT5_LOG_LEVEL=INFO
AGNT5_LOG_FORMAT=jsonHealth Monitoring
import asyncio
import logging
from agnt5 import Worker, function
logger = logging.getLogger(__name__)
@function()
def health_check() -> dict:
"""Health check endpoint."""
return {
"status": "healthy",
"service": "production-service",
"timestamp": time.time()
}
async def main():
worker = Worker(
service_name="production-service",
service_version=os.getenv("SERVICE_VERSION", "1.0.0")
)
logger.info("Starting production worker")
try:
await worker.run()
except Exception as e:
logger.error(f"Worker failed: {e}")
raise
finally:
logger.info("Worker shutdown complete")
if __name__ == "__main__":
asyncio.run(main())Best Practices
Service Design
- Service Naming - Use consistent, descriptive service names
- Version Management - Use semantic versioning for service versions
- Resource Management - Clean up resources in shutdown handlers
- Error Handling - Handle exceptions gracefully in workers
- Health Checks - Implement health check functions for monitoring
Performance
- Connection Pooling - Reuse database and HTTP connections
- Async Operations - Use async functions for I/O operations
- Resource Limits - Configure appropriate memory and CPU limits
- Scaling - Deploy multiple worker instances for high throughput
- Monitoring - Track worker performance and error rates
Security
- Input Validation - Validate all function inputs
- Error Messages - Don’t expose sensitive information in errors
- Authentication - Use proper authentication for coordinator connections
- Network Security - Use secure connections (HTTPS/TLS) in production
- Secrets Management - Use environment variables for sensitive configuration
Next Steps
Core Primitives
- Functions - Stateless operations with retries
- Entities - Stateful components
- Workflows - Multi-step orchestration
- Context API - Full API reference
Agent Development Kit
- Agents - Autonomous LLM-driven systems
- Tools - Extend agent capabilities
- Sessions - Conversation management
- Memory - Long-term knowledge storage
Resources
- Examples - Worker deployment examples