Skip to content
Docs
API Reference Workflows — AGNT5 Python SDK

Workflows

Multi-step orchestration and durable execution patterns

Workflows enable durable, multi-step orchestration with automatic recovery, state persistence, and complex dependency management. Built on the AGNT5 orchestration plane for exactly-once execution guarantees.

Basic Workflow

Simple Sequential Workflow

from agnt5 import workflow, task_step
from agnt5.workflows import FlowDefinition

@workflow()
def data_pipeline() -> FlowDefinition:
    return FlowDefinition([
        task_step(
            name="extract",
            service_name="etl-service",
            handler_name="extract_data"
        ),
        task_step(
            name="transform",
            service_name="etl-service",
            handler_name="transform_data",
            dependencies=["extract"]
        ),
        task_step(
            name="load",
            service_name="etl-service",
            handler_name="load_data",
            dependencies=["transform"]
        )
    ])

Parallel Execution

Steps without dependencies execute in parallel:

@workflow()
def parallel_processing() -> FlowDefinition:
    return FlowDefinition([
        # These three steps run in parallel
        task_step(
            name="process_a",
            service_name="service",
            handler_name="process_type_a"
        ),
        task_step(
            name="process_b",
            service_name="service",
            handler_name="process_type_b"
        ),
        task_step(
            name="process_c",
            service_name="service",
            handler_name="process_type_c"
        ),
        # This step waits for all three to complete
        task_step(
            name="merge_results",
            service_name="service",
            handler_name="merge_results",
            dependencies=["process_a", "process_b", "process_c"]
        )
    ])

Step Types

Task Steps

Execute function handlers on services:

from agnt5.workflows import task_step

# Basic task step
step = task_step(
    name="unique_step_name",
    service_name="my-service",
    handler_name="my_handler"
)

# Task with dependencies and input data
step = task_step(
    name="dependent_step",
    service_name="my-service",
    handler_name="process_data",
    dependencies=["previous_step"],
    input_data={"config": "production", "batch_size": 100}
)

# Task with object keys (Phase 2)
step = task_step(
    name="object_step",
    service_name="object-service",
    handler_name="update_state",
    dependencies=["init_step"],
    object_keys=["user:123", "cart:456"]
)

Wait Signal Steps

Pause execution until external signals:

from agnt5.workflows import wait_signal_step

# Basic signal wait
step = wait_signal_step(
    name="wait_for_approval",
    signal_name="approval_granted",
    dependencies=["review_step"]
)

# Signal wait with timeout
step = wait_signal_step(
    name="wait_with_timeout",
    signal_name="user_action",
    dependencies=["prompt_user"],
    timeout_ms=300000,  # 5 minutes
    on_timeout="timeout_handler_step"
)

Wait Timer Steps

Scheduled delays and cron-based execution:

from agnt5.workflows import wait_timer_step

# Fixed delay
step = wait_timer_step(
    name="delay_before_retry",
    timer_key="retry_delay",
    delay_ms=30000,  # 30 seconds
    dependencies=["failed_step"]
)

# Cron schedule
step = wait_timer_step(
    name="nightly_batch",
    timer_key="nightly",
    cron_expr="0 2 * * *",  # 2 AM daily
    dependencies=["prep_step"]
)

# Timer with retries
step = wait_timer_step(
    name="retry_with_backoff",
    timer_key="exponential_backoff",
    delay_ms=5000,
    max_retries=3,
    dependencies=["error_step"]
)

Workflow Examples

ETL Pipeline

@workflow()
def nightly_etl() -> FlowDefinition:
    return FlowDefinition([
        # Start with data validation
        task_step(
            name="validate_sources",
            service_name="etl-service",
            handler_name="validate_data_sources"
        ),

        # Extract from multiple sources in parallel
        task_step(
            name="extract_database",
            service_name="etl-service",
            handler_name="extract_from_database",
            dependencies=["validate_sources"]
        ),
        task_step(
            name="extract_api",
            service_name="etl-service",
            handler_name="extract_from_api",
            dependencies=["validate_sources"]
        ),
        task_step(
            name="extract_files",
            service_name="etl-service",
            handler_name="extract_from_files",
            dependencies=["validate_sources"]
        ),

        # Wait for all extractions to complete
        task_step(
            name="merge_extracted_data",
            service_name="etl-service",
            handler_name="merge_data",
            dependencies=["extract_database", "extract_api", "extract_files"]
        ),

        # Transform data
        task_step(
            name="clean_data",
            service_name="etl-service",
            handler_name="clean_and_normalize",
            dependencies=["merge_extracted_data"]
        ),
        task_step(
            name="enrich_data",
            service_name="etl-service",
            handler_name="enrich_with_metadata",
            dependencies=["clean_data"]
        ),

        # Wait for maintenance window
        wait_timer_step(
            name="wait_for_maintenance_window",
            timer_key="maintenance",
            cron_expr="0 3 * * *",  # 3 AM
            dependencies=["enrich_data"]
        ),

        # Load data
        task_step(
            name="load_to_warehouse",
            service_name="etl-service",
            handler_name="load_data_warehouse",
            dependencies=["wait_for_maintenance_window"]
        ),

        # Generate reports
        task_step(
            name="generate_reports",
            service_name="reporting-service",
            handler_name="generate_daily_reports",
            dependencies=["load_to_warehouse"]
        )
    ])

Approval Workflow

@workflow()
def document_approval() -> FlowDefinition:
    return FlowDefinition([
        # Submit document for review
        task_step(
            name="submit_document",
            service_name="doc-service",
            handler_name="submit_for_review",
            input_data={"priority": "normal"}
        ),

        # Notify reviewers
        task_step(
            name="notify_reviewers",
            service_name="notification-service",
            handler_name="send_review_notifications",
            dependencies=["submit_document"]
        ),

        # Wait for approval (with timeout)
        wait_signal_step(
            name="wait_for_approval",
            signal_name="document_approved",
            dependencies=["notify_reviewers"],
            timeout_ms=172800000,  # 48 hours
            on_timeout="escalate_approval"
        ),

        # Escalation path
        task_step(
            name="escalate_approval",
            service_name="doc-service",
            handler_name="escalate_to_manager",
            # No dependencies - triggered by timeout
        ),

        # Wait for escalated approval
        wait_signal_step(
            name="wait_escalated_approval",
            signal_name="escalated_approval",
            dependencies=["escalate_approval"],
            timeout_ms=86400000  # 24 hours
        ),

        # Publish approved document
        task_step(
            name="publish_document",
            service_name="doc-service",
            handler_name="publish_approved_document",
            dependencies=["wait_for_approval", "wait_escalated_approval"]
        )
    ])

Workflow Registration

Using the Decorator

from agnt5 import workflow
from agnt5.workflows import get_registered_workflows

@workflow()
def my_workflow() -> FlowDefinition:
    return FlowDefinition([...])

@workflow("custom_name")
def workflow_with_custom_name() -> FlowDefinition:
    return FlowDefinition([...])

# Inspect registered workflows
workflows = get_registered_workflows()
print(f"Registered workflows: {list(workflows.keys())}")

Manual Registration

from agnt5.workflows import register_workflow

def create_workflow_definition() -> FlowDefinition:
    return FlowDefinition([
        task_step("step1", service_name="service", handler_name="handler1"),
        task_step("step2", service_name="service", handler_name="handler2",
                 dependencies=["step1"])
    ])

# Manual registration
register_workflow("manual_workflow", create_workflow_definition())

Workflow Data Classes

FlowDefinition

Container for workflow steps with serialization:

from agnt5.workflows import FlowDefinition

# Create definition
flow = FlowDefinition([
    task_step("step1", service_name="svc", handler_name="h1"),
    task_step("step2", service_name="svc", handler_name="h2", dependencies=["step1"])
])

# Serialize to dictionary
flow_dict = flow.to_dict()

# Serialize to JSON string
flow_json = flow.to_json()

WorkflowStep

Individual step configuration:

from agnt5.workflows import WorkflowStep, StepType

# Manual step creation (usually use helper functions instead)
step = WorkflowStep(
    name="custom_step",
    step_type=StepType.TASK,
    service_name="my-service",
    handler_name="my_handler",
    dependencies=["previous_step"],
    input_data={"key": "value"}
)

Configuration Classes

from agnt5.workflows import SignalConfig, TimerConfig

# Signal configuration
signal_config = SignalConfig(
    name="approval_signal",
    timeout_ms=3600000,  # 1 hour
    on_timeout="timeout_step"
)

# Timer configuration
timer_config = TimerConfig(
    key="batch_timer",
    delay_ms=60000,  # 1 minute
    max_retries=5
)

# Cron timer configuration
cron_timer = TimerConfig(
    key="daily_job",
    cron_expr="0 0 * * *",  # Daily at midnight
    max_retries=3
)

Validation and Error Handling

Workflow Validation

AGNT5 validates workflows during registration:

from agnt5.workflows import register_workflow, FlowDefinition, task_step

# This will raise ValueError: Missing dependencies
try:
    invalid_flow = FlowDefinition([
        task_step("step2", service_name="svc", handler_name="h2",
                 dependencies=["step1"]),  # step1 doesn't exist
        task_step("step3", service_name="svc", handler_name="h3",
                 dependencies=["step2"])
    ])
    register_workflow("invalid", invalid_flow)
except ValueError as e:
    print(f"Validation error: {e}")

Validation Rules

  • At least one step must be defined
  • Step names must be unique within the workflow
  • Dependencies must reference existing steps
  • Dependencies must appear earlier in the definition (causal order)
  • Required fields must be populated based on step type

Testing Workflows

Workflow Definition Testing

import pytest
from agnt5.workflows import FlowDefinition, task_step, get_registered_workflows

def test_workflow_definition():
    # Test workflow structure
    flow = FlowDefinition([
        task_step("extract", service_name="etl", handler_name="extract"),
        task_step("transform", service_name="etl", handler_name="transform",
                 dependencies=["extract"])
    ])

    # Verify serialization
    flow_dict = flow.to_dict()
    assert len(flow_dict["steps"]) == 2
    assert flow_dict["steps"][1]["dependencies"] == ["extract"]

def test_workflow_registration():
    @workflow()
    def test_workflow() -> FlowDefinition:
        return FlowDefinition([
            task_step("test_step", service_name="test", handler_name="test")
        ])

    # Verify registration
    workflows = get_registered_workflows()
    assert "test_workflow" in workflows

Integration Testing

Test workflows with a local development environment:

import asyncio
from agnt5 import Client

async def test_workflow_execution():
    client = Client("http://localhost:8080")

    # Trigger workflow
    result = await client.start_workflow(
        workflow_name="data_pipeline",
        input_data={"source": "test_data"}
    )

    workflow_id = result["workflow_id"]

    # Poll for completion
    while True:
        status = await client.get_workflow_status(workflow_id)
        if status["state"] in ["completed", "failed"]:
            break
        await asyncio.sleep(1)

    assert status["state"] == "completed"

Best Practices

Design Patterns

  1. Idempotent Steps - Design steps to be safely retryable
  2. Small Steps - Break complex operations into smaller, focused steps
  3. Clear Dependencies - Make step relationships explicit
  4. Meaningful Names - Use descriptive names for steps and workflows
  5. Error Handling - Plan for failure and recovery scenarios

Performance

  1. Parallel Execution - Remove unnecessary dependencies to enable parallelism
  2. Batch Operations - Group related operations into single steps
  3. Resource Management - Consider resource usage when designing workflows
  4. State Minimization - Keep workflow state as small as possible

Monitoring

  1. Structured Logging - Add logging to workflow steps
  2. Progress Tracking - Use meaningful step names and descriptions
  3. Metrics Collection - Track workflow success rates and durations
  4. Error Alerting - Set up alerts for workflow failures

Next Steps

© 2026 AGNT5
llms.txt