Workflows
Multi-step orchestration and durable execution patterns
Workflows enable durable, multi-step orchestration with automatic recovery, state persistence, and complex dependency management. Built on the AGNT5 orchestration plane for exactly-once execution guarantees.
Basic Workflow
Simple Sequential Workflow
from agnt5 import workflow, task_step
from agnt5.workflows import FlowDefinition
@workflow()
def data_pipeline() -> FlowDefinition:
return FlowDefinition([
task_step(
name="extract",
service_name="etl-service",
handler_name="extract_data"
),
task_step(
name="transform",
service_name="etl-service",
handler_name="transform_data",
dependencies=["extract"]
),
task_step(
name="load",
service_name="etl-service",
handler_name="load_data",
dependencies=["transform"]
)
])Parallel Execution
Steps without dependencies execute in parallel:
@workflow()
def parallel_processing() -> FlowDefinition:
return FlowDefinition([
# These three steps run in parallel
task_step(
name="process_a",
service_name="service",
handler_name="process_type_a"
),
task_step(
name="process_b",
service_name="service",
handler_name="process_type_b"
),
task_step(
name="process_c",
service_name="service",
handler_name="process_type_c"
),
# This step waits for all three to complete
task_step(
name="merge_results",
service_name="service",
handler_name="merge_results",
dependencies=["process_a", "process_b", "process_c"]
)
])Step Types
Task Steps
Execute function handlers on services:
from agnt5.workflows import task_step
# Basic task step
step = task_step(
name="unique_step_name",
service_name="my-service",
handler_name="my_handler"
)
# Task with dependencies and input data
step = task_step(
name="dependent_step",
service_name="my-service",
handler_name="process_data",
dependencies=["previous_step"],
input_data={"config": "production", "batch_size": 100}
)
# Task with object keys (Phase 2)
step = task_step(
name="object_step",
service_name="object-service",
handler_name="update_state",
dependencies=["init_step"],
object_keys=["user:123", "cart:456"]
)Wait Signal Steps
Pause execution until external signals:
from agnt5.workflows import wait_signal_step
# Basic signal wait
step = wait_signal_step(
name="wait_for_approval",
signal_name="approval_granted",
dependencies=["review_step"]
)
# Signal wait with timeout
step = wait_signal_step(
name="wait_with_timeout",
signal_name="user_action",
dependencies=["prompt_user"],
timeout_ms=300000, # 5 minutes
on_timeout="timeout_handler_step"
)Wait Timer Steps
Scheduled delays and cron-based execution:
from agnt5.workflows import wait_timer_step
# Fixed delay
step = wait_timer_step(
name="delay_before_retry",
timer_key="retry_delay",
delay_ms=30000, # 30 seconds
dependencies=["failed_step"]
)
# Cron schedule
step = wait_timer_step(
name="nightly_batch",
timer_key="nightly",
cron_expr="0 2 * * *", # 2 AM daily
dependencies=["prep_step"]
)
# Timer with retries
step = wait_timer_step(
name="retry_with_backoff",
timer_key="exponential_backoff",
delay_ms=5000,
max_retries=3,
dependencies=["error_step"]
)Workflow Examples
ETL Pipeline
@workflow()
def nightly_etl() -> FlowDefinition:
return FlowDefinition([
# Start with data validation
task_step(
name="validate_sources",
service_name="etl-service",
handler_name="validate_data_sources"
),
# Extract from multiple sources in parallel
task_step(
name="extract_database",
service_name="etl-service",
handler_name="extract_from_database",
dependencies=["validate_sources"]
),
task_step(
name="extract_api",
service_name="etl-service",
handler_name="extract_from_api",
dependencies=["validate_sources"]
),
task_step(
name="extract_files",
service_name="etl-service",
handler_name="extract_from_files",
dependencies=["validate_sources"]
),
# Wait for all extractions to complete
task_step(
name="merge_extracted_data",
service_name="etl-service",
handler_name="merge_data",
dependencies=["extract_database", "extract_api", "extract_files"]
),
# Transform data
task_step(
name="clean_data",
service_name="etl-service",
handler_name="clean_and_normalize",
dependencies=["merge_extracted_data"]
),
task_step(
name="enrich_data",
service_name="etl-service",
handler_name="enrich_with_metadata",
dependencies=["clean_data"]
),
# Wait for maintenance window
wait_timer_step(
name="wait_for_maintenance_window",
timer_key="maintenance",
cron_expr="0 3 * * *", # 3 AM
dependencies=["enrich_data"]
),
# Load data
task_step(
name="load_to_warehouse",
service_name="etl-service",
handler_name="load_data_warehouse",
dependencies=["wait_for_maintenance_window"]
),
# Generate reports
task_step(
name="generate_reports",
service_name="reporting-service",
handler_name="generate_daily_reports",
dependencies=["load_to_warehouse"]
)
])Approval Workflow
@workflow()
def document_approval() -> FlowDefinition:
return FlowDefinition([
# Submit document for review
task_step(
name="submit_document",
service_name="doc-service",
handler_name="submit_for_review",
input_data={"priority": "normal"}
),
# Notify reviewers
task_step(
name="notify_reviewers",
service_name="notification-service",
handler_name="send_review_notifications",
dependencies=["submit_document"]
),
# Wait for approval (with timeout)
wait_signal_step(
name="wait_for_approval",
signal_name="document_approved",
dependencies=["notify_reviewers"],
timeout_ms=172800000, # 48 hours
on_timeout="escalate_approval"
),
# Escalation path
task_step(
name="escalate_approval",
service_name="doc-service",
handler_name="escalate_to_manager",
# No dependencies - triggered by timeout
),
# Wait for escalated approval
wait_signal_step(
name="wait_escalated_approval",
signal_name="escalated_approval",
dependencies=["escalate_approval"],
timeout_ms=86400000 # 24 hours
),
# Publish approved document
task_step(
name="publish_document",
service_name="doc-service",
handler_name="publish_approved_document",
dependencies=["wait_for_approval", "wait_escalated_approval"]
)
])Workflow Registration
Using the Decorator
from agnt5 import workflow
from agnt5.workflows import get_registered_workflows
@workflow()
def my_workflow() -> FlowDefinition:
return FlowDefinition([...])
@workflow("custom_name")
def workflow_with_custom_name() -> FlowDefinition:
return FlowDefinition([...])
# Inspect registered workflows
workflows = get_registered_workflows()
print(f"Registered workflows: {list(workflows.keys())}")Manual Registration
from agnt5.workflows import register_workflow
def create_workflow_definition() -> FlowDefinition:
return FlowDefinition([
task_step("step1", service_name="service", handler_name="handler1"),
task_step("step2", service_name="service", handler_name="handler2",
dependencies=["step1"])
])
# Manual registration
register_workflow("manual_workflow", create_workflow_definition())Workflow Data Classes
FlowDefinition
Container for workflow steps with serialization:
from agnt5.workflows import FlowDefinition
# Create definition
flow = FlowDefinition([
task_step("step1", service_name="svc", handler_name="h1"),
task_step("step2", service_name="svc", handler_name="h2", dependencies=["step1"])
])
# Serialize to dictionary
flow_dict = flow.to_dict()
# Serialize to JSON string
flow_json = flow.to_json()WorkflowStep
Individual step configuration:
from agnt5.workflows import WorkflowStep, StepType
# Manual step creation (usually use helper functions instead)
step = WorkflowStep(
name="custom_step",
step_type=StepType.TASK,
service_name="my-service",
handler_name="my_handler",
dependencies=["previous_step"],
input_data={"key": "value"}
)Configuration Classes
from agnt5.workflows import SignalConfig, TimerConfig
# Signal configuration
signal_config = SignalConfig(
name="approval_signal",
timeout_ms=3600000, # 1 hour
on_timeout="timeout_step"
)
# Timer configuration
timer_config = TimerConfig(
key="batch_timer",
delay_ms=60000, # 1 minute
max_retries=5
)
# Cron timer configuration
cron_timer = TimerConfig(
key="daily_job",
cron_expr="0 0 * * *", # Daily at midnight
max_retries=3
)Validation and Error Handling
Workflow Validation
AGNT5 validates workflows during registration:
from agnt5.workflows import register_workflow, FlowDefinition, task_step
# This will raise ValueError: Missing dependencies
try:
invalid_flow = FlowDefinition([
task_step("step2", service_name="svc", handler_name="h2",
dependencies=["step1"]), # step1 doesn't exist
task_step("step3", service_name="svc", handler_name="h3",
dependencies=["step2"])
])
register_workflow("invalid", invalid_flow)
except ValueError as e:
print(f"Validation error: {e}")Validation Rules
- At least one step must be defined
- Step names must be unique within the workflow
- Dependencies must reference existing steps
- Dependencies must appear earlier in the definition (causal order)
- Required fields must be populated based on step type
Testing Workflows
Workflow Definition Testing
import pytest
from agnt5.workflows import FlowDefinition, task_step, get_registered_workflows
def test_workflow_definition():
# Test workflow structure
flow = FlowDefinition([
task_step("extract", service_name="etl", handler_name="extract"),
task_step("transform", service_name="etl", handler_name="transform",
dependencies=["extract"])
])
# Verify serialization
flow_dict = flow.to_dict()
assert len(flow_dict["steps"]) == 2
assert flow_dict["steps"][1]["dependencies"] == ["extract"]
def test_workflow_registration():
@workflow()
def test_workflow() -> FlowDefinition:
return FlowDefinition([
task_step("test_step", service_name="test", handler_name="test")
])
# Verify registration
workflows = get_registered_workflows()
assert "test_workflow" in workflowsIntegration Testing
Test workflows with a local development environment:
import asyncio
from agnt5 import Client
async def test_workflow_execution():
client = Client("http://localhost:8080")
# Trigger workflow
result = await client.start_workflow(
workflow_name="data_pipeline",
input_data={"source": "test_data"}
)
workflow_id = result["workflow_id"]
# Poll for completion
while True:
status = await client.get_workflow_status(workflow_id)
if status["state"] in ["completed", "failed"]:
break
await asyncio.sleep(1)
assert status["state"] == "completed"Best Practices
Design Patterns
- Idempotent Steps - Design steps to be safely retryable
- Small Steps - Break complex operations into smaller, focused steps
- Clear Dependencies - Make step relationships explicit
- Meaningful Names - Use descriptive names for steps and workflows
- Error Handling - Plan for failure and recovery scenarios
Performance
- Parallel Execution - Remove unnecessary dependencies to enable parallelism
- Batch Operations - Group related operations into single steps
- Resource Management - Consider resource usage when designing workflows
- State Minimization - Keep workflow state as small as possible
Monitoring
- Structured Logging - Add logging to workflow steps
- Progress Tracking - Use meaningful step names and descriptions
- Metrics Collection - Track workflow success rates and durations
- Error Alerting - Set up alerts for workflow failures
Next Steps
- Worker Runtime - Configure and deploy workers
- API Reference - Complete workflows API reference
- Examples - Real-world workflow patterns