Skip to main content
Workflows are the foundation of Polos. They’re durable functions that survive failures, maintain state, and resume exactly where they stopped.

What is a workflow?

A workflow is a Python function decorated with @workflow. It can contain steps, call other workflows, wait for events, and use normal programming constructs like loops and conditionals.
from polos import workflow, WorkflowContext

@workflow
async def process_order(ctx: WorkflowContext, input: OrderInput):
    # Step 1: Validate order
    order = await ctx.step.run("validate", validate_order, input)
    
    # Step 2: Charge payment
    payment = await ctx.step.run("charge", charge_stripe, order)
    
    # Step 3: Send confirmation
    await ctx.step.run("send_email", send_confirmation, order)
    
    return OrderOutput(order_id=order.id, status="completed")
What makes workflows special:
  • Durable - Survive crashes and resume from the last completed step
  • Stateful - Maintain context across execution
  • Composable - Call other workflows and wait for results
  • Observable - Track execution with real-time events

Why workflows?

Traditional functions fail completely when something goes wrong. Workflows persist progress at each step: Without workflows:
async def process_order(order):
    validate_order(order)           # ✓ Completes
    charge_stripe(order)            # ✓ Completes
    send_confirmation(order)        # ❌ Server crashes
    # On restart: Everything runs again
    # Result: Customer charged twice
With workflows:
@workflow
async def process_order(ctx: WorkflowContext, input: OrderInput):
    order = await ctx.step.run("validate", validate_order, input)   # ✓ Cached
    payment = await ctx.step.run("charge", charge_stripe, order)    # ✓ Cached
    await ctx.step.run("send_email", send_confirmation, order)      # ❌ Crashes
    # On restart: Resumes from send_email
    # Result: Customer charged once, email sent successfully

Workflow context

Every workflow receives a WorkflowContext with execution metadata and methods:
@workflow
async def example_workflow(ctx: WorkflowContext, input: dict):
    # Execution identifiers
    execution_id = ctx.execution_id
    workflow_id = ctx.workflow_id
    
    # User context
    user_id = ctx.user_id
    session_id = ctx.session_id
    
    # Timestamps
    created_at = ctx.created_at
    started_at = ctx.started_at
    
    # Step methods
    result = await ctx.step.run("step_name", function, args)
    await ctx.step.wait_for("wait_step", seconds=60)
    
    return {"status": "completed"}

Key concepts

Steps

Steps are the unit of durability. Each step’s output is persisted when the step completes.
@workflow
async def example(ctx: WorkflowContext, input: dict):
    # Each step is durable
    step1 = await ctx.step.run("step1", func1, input)
    step2 = await ctx.step.run("step2", func2, step1)
    step3 = await ctx.step.run("step3", func3, step2)
See Steps for details.

Workflow composition

Workflows can call other workflows or agents. They can wait for those to complete or simply trigger them as fire-and-forget.
@workflow
async def parent(ctx: WorkflowContext, input: ParentInput):
    # Call child workflow but don't wait for it to complete
    exec_handle = await ctx.step.invoke(
        "call_child",
        child_workflow,
        {"data": input.data}
    )
    
    # Call agent and wait
    response = await ctx.step.agent_invoke_and_wait(
        "research_agent",
        research_agent.with_input(input.data)
    )
    return response.result

@workflow
async def child_workflow(ctx: WorkflowContext, input: dict):
    # Process data
    return {"processed": True}

Control flow

Use normal Python control flow:
@workflow
async def conditional_workflow(ctx: WorkflowContext, input: ConditionalWorkflowInput):
    # Conditionals
    if input.amount > 1000:
        await ctx.step.run("high_value", process_high_value, input)
    else:
        await ctx.step.run("standard", process_standard, input)
    
    # Loops
    for item in input.items:
        await ctx.step.run(f"process_{item.id}", process_item, item)
    
    # Parallel execution
    results = await asyncio.gather(
        ctx.step.run("task_a", task_a, input),
        ctx.step.run("task_b", task_b, input),
        ctx.step.run("task_c", task_c, input)
    )
    
    return results

Waits and events

Workflows can pause and resume:
@workflow
async def approval_workflow(ctx: WorkflowContext, input: ApprovalWorkflowInput):
    # Process request
    request = await ctx.step.run("process", process_request, input)
    
    # Wait for approval event
    approval = await ctx.step.wait_for_event(
        "wait_approval",
        topic="approval.response"
    )
    
    if approval.data["approved"]:
        result = await ctx.step.run("execute", execute_action, request)
        return result
See Waits and Events for details.

Starting workflows

Workflows can be started in three ways:

1. Invoke using API or SDK

client = PolosClient()
result = await order_workflow.invoke(
    client, payload={"order_id": "123"}
)

You can also invoke multiple workflows or agents in parallel:
from polos import (
    batch_invoke, batch_agent_invoke, BatchWorkflowInput, PolosClient
)

client = PolosClient()

# Batch invoke multiple workflows
handles = await client.batch_invoke([
    BatchWorkflowInput(id="workflow-1", payload={"foo": "bar"}),
    BatchWorkflowInput(id="workflow-2", payload={"baz": 42}),
    BatchWorkflowInput(id="workflow-3", payload={"qux": "test"}),
])

# Batch invoke multiple agents
agent_handles = await client.batch_agent_invoke([
    grammar_agent.with_input("Check this text"),
    tone_agent.with_input("Check this text"),
    accuracy_agent.with_input("Check this text"),
])
Note: Batch invoke cannot be called from within a workflow. Use ctx.step.batch_invoke() or ctx.step.batch_agent_invoke() when calling from within workflows. See Steps for details.

2. Scheduled execution

@workflow(schedule="0 9 * * *")  # Daily at 9am
async def daily_report(ctx: WorkflowContext, input: SchedulePayload):
    data = await ctx.step.run("fetch", fetch_data)
    report = await ctx.step.run("generate", generate_report, data)
    await ctx.step.run("send", send_email, report)

3. Event-triggered

@workflow(trigger_on_event="user.signup")
async def onboard_user(ctx: WorkflowContext, event: EventPayload):
    user_id = event.data["user_id"]
    await ctx.step.run("welcome", send_welcome_email, user_id)
    await ctx.step.run("setup", create_sample_data, user_id)

Key takeaways

  • Workflows are durable functions - They survive failures and resume from the last completed step
  • Steps are the unit of durability - Each step’s output is persisted
  • WorkflowContext provides execution metadata - Access IDs, timestamps, and step methods
  • Compose workflows - Call other workflows and wait for results
  • Use normal Python control flow - Conditionals, loops, and parallel execution
  • Start workflows - Via API, schedule, or events