Basic Worker
Simple standalone worker with function handlers
A minimal AGNT5 worker demonstrating function registration and execution.
Complete Example
import asyncio
import logging
from agnt5 import Worker, function
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@function()
def greet(name: str) -> str:
"""Greet a user by name."""
logger.info(f"Greeting user: {name}")
return f"Hello, {name}!"
@function("math.add")
def add_numbers(a: int, b: int) -> int:
"""Add two numbers together."""
logger.info(f"Adding {a} + {b}")
return a + b
@function()
def process_data(data: dict) -> dict:
"""Process a data dictionary."""
logger.info(f"Processing data with {len(data)} keys")
# Simulate processing
processed_data = {
key: str(value).upper() if isinstance(value, str) else value
for key, value in data.items()
}
return {
"original": data,
"processed": processed_data,
"status": "completed"
}
async def main():
"""Main worker entry point."""
logger.info("Starting AGNT5 worker...")
worker = Worker(
service_name="basic-worker",
service_version="1.0.0"
)
try:
await worker.run()
except KeyboardInterrupt:
logger.info("Worker stopped by user")
except Exception as e:
logger.error(f"Worker error: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())Running the Worker
Deploy to AGNT5
# Authenticate and deploy
agnt5 auth login
agnt5 deployRun the Worker Locally
# Run the worker
python worker.pyExpected output:
INFO:__main__:Starting AGNT5 worker...
INFO:agnt5.worker:Starting worker for service: basic-worker
INFO:agnt5.worker:Registered function: greet
INFO:agnt5.worker:Registered function: math.add
INFO:agnt5.worker:Registered function: process_data
INFO:agnt5.worker:Worker running, waiting for tasks...Testing Functions
Using HTTP API
Test functions via the AGNT5 Gateway:
# Test greet function
curl -X POST http://localhost:8080/call \
-H "Content-Type: application/json" \
-d '{
"serviceName": "basic-worker",
"handlerName": "greet",
"inputData": "QWxpY2U="
}'
# Test math function
curl -X POST http://localhost:8080/call \
-H "Content-Type: application/json" \
-d '{
"serviceName": "basic-worker",
"handlerName": "math.add",
"inputData": "eyJhIjogNSwgImIiOiAzfQ=="
}' Note:
Input Data Encoding: The inputData field expects base64-encoded JSON. Use tools like echo '{"a": 5, "b": 3}' | base64 to encode data.
Using Python Client
import asyncio
import json
import base64
from agnt5 import Client
async def test_functions():
"""Test all worker functions."""
client = Client("http://localhost:8080")
# Test greet function
name_data = base64.b64encode(json.dumps("Alice").encode()).decode()
result = await client.call(
service_name="basic-worker",
handler_name="greet",
input_data=name_data
)
print(f"Greet result: {result}")
# Test math function
math_data = base64.b64encode(json.dumps({"a": 10, "b": 5}).encode()).decode()
result = await client.call(
service_name="basic-worker",
handler_name="math.add",
input_data=math_data
)
print(f"Math result: {result}")
# Test data processing
process_data = base64.b64encode(json.dumps({
"name": "john doe",
"status": "active",
"count": 42
}).encode()).decode()
result = await client.call(
service_name="basic-worker",
handler_name="process_data",
input_data=process_data
)
print(f"Process result: {result}")
if __name__ == "__main__":
asyncio.run(test_functions())Error Handling
Add error handling to make functions more robust:
import asyncio
import logging
from agnt5 import Worker, function
logger = logging.getLogger(__name__)
@function()
def safe_divide(a: float, b: float) -> dict:
"""Safely divide two numbers with error handling."""
try:
if b == 0:
return {
"error": "Division by zero",
"result": None
}
result = a / b
logger.info(f"Division successful: {a} / {b} = {result}")
return {
"result": result,
"error": None
}
except Exception as e:
logger.error(f"Unexpected error: {e}")
return {
"error": str(e),
"result": None
}
@function()
def validate_email(email: str) -> dict:
"""Validate email address format."""
import re
try:
# Basic email validation
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
is_valid = bool(re.match(pattern, email))
return {
"email": email,
"valid": is_valid,
"message": "Valid email" if is_valid else "Invalid email format"
}
except Exception as e:
logger.error(f"Email validation error: {e}")
return {
"email": email,
"valid": False,
"message": f"Validation error: {e}"
}
async def main():
worker = Worker("robust-worker")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())Configuration
Environment Variables
# Service configuration
export AGNT5_SERVICE_NAME=basic-worker
export AGNT5_SERVICE_VERSION=1.0.0
# Coordinator endpoint
export AGNT5_COORDINATOR_ENDPOINT=http://localhost:9091
# Logging
export AGNT5_LOG_LEVEL=DEBUG
python worker.pyConfiguration File
import os
import asyncio
import logging
from agnt5 import Worker, function
from agnt5.logging import install_opentelemetry_logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Install telemetry
install_opentelemetry_logging(logger=logger, level=logging.INFO)
@function()
def configured_handler(data: dict) -> dict:
"""Handler with access to configuration."""
return {
"service_name": os.getenv("AGNT5_SERVICE_NAME", "unknown"),
"service_version": os.getenv("AGNT5_SERVICE_VERSION", "1.0.0"),
"data": data
}
async def main():
# Worker with configuration
worker = Worker(
service_name=os.getenv("AGNT5_SERVICE_NAME", "configured-worker"),
service_version=os.getenv("AGNT5_SERVICE_VERSION", "1.0.0"),
coordinator_endpoint=os.getenv("AGNT5_COORDINATOR_ENDPOINT", "http://localhost:9091")
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())Production Deployment
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY worker.py .
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run worker
CMD ["python", "worker.py"]Docker Compose
version: '3.8'
services:
worker:
build: .
environment:
- AGNT5_SERVICE_NAME=basic-worker
- AGNT5_SERVICE_VERSION=1.0.0
- AGNT5_COORDINATOR_ENDPOINT=http://coordinator:9091
- AGNT5_LOG_LEVEL=INFO
restart: unless-stopped
depends_on:
- coordinator
networks:
- agnt5
coordinator:
image: agnt5/coordinator:latest
ports:
- "9091:9091"
networks:
- agnt5
networks:
agnt5:
driver: bridgeNext Steps
- ASGI Server Example - Web application integration
- Workflow Example - Multi-step orchestration
- Error Handling Patterns - Comprehensive error handling