Skip to main content
Steps are the fundamental unit of durability in Polos. Each step’s execution is persisted, enabling workflows to resume from the last completed step after failures.

What is a step?

A step is a durable operation within a workflow. When a step completes, its output is saved to the database. If the workflow crashes, completed steps return their cached results on replay - no re-execution, no duplicate side effects.
from polos import workflow, WorkflowContext

@workflow
async def example_workflow(ctx: WorkflowContext, input: ExampleInput):
    # Step 1: Fetch data (cached on replay)
    data = await ctx.step.run("fetch", fetch_from_api, input.url)
    
    # Step 2: Process data (cached on replay)
    processed = await ctx.step.run("process", transform_data, data)
    
    # Step 3: Save result (executes only once)
    await ctx.step.run("save", save_to_db, processed)
    
    return {"status": "completed"}
On failure after step 2:
Replay:
  Step 1 ("fetch") → Returns cached result (no API call)
  Step 2 ("process") → Returns cached result (no re-processing)
  Step 3 ("save") → Executes for the first time

Step execution model

Unlike workflows, which are scheduled by the orchestrator and assigned to workers, steps execute directly on the same worker running the workflow. This means:
  • Lower overhead - No scheduling delay
  • Same execution context - Access to workflow variables
  • Durability guaranteed - Output is persisted before continuing
Steps are not queued or orchestrated — they run inline with the workflow code.

Core step methods

step.run()

Run any Python function as a durable step:
@workflow
async def process_order(ctx: WorkflowContext, input: OrderInput):
    # Run async function
    order = await ctx.step.run(
        "validate_order",
        validate_order,
        input.order_id
    )

    # Run sync function (automatically wrapped)
    receipt = await ctx.step.run(
        "generate_receipt",
        generate_pdf,  # Sync function
        order
    )
    
    return receipt
With retry configuration:
result = await ctx.step.run(
    "api_call",
    call_external_api,
    url,
    max_retries=5,
    base_delay=2.0,
    max_delay=30.0
)
What should be a step:
  • ✅ External API calls
  • ✅ Database operations
  • ✅ File I/O
  • ✅ Non-deterministic operations (random(), datetime.now())
  • ✅ Any operation that might fail
What should NOT be a step:
  • ❌ Pure logic (if statements, loops)
  • ❌ Variable assignments
  • ❌ String manipulation

step.invoke()

Start a child workflow without waiting for it to complete:
@workflow
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
    # Start child workflow
    handle = await ctx.step.invoke(
        "start_child",
        child_workflow,
        {"data": input.data}
    )
    
    # Continue immediately (don't wait for child to complete)
    print(f"Started workflow: {handle.id}")
    return {"child_id": handle.id}

step.invoke_and_wait()

Start a child workflow and suspend until it completes:
@workflow
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
    # Call child and wait for result
    result = await ctx.step.invoke_and_wait(
        "call_child",
        child_workflow,
        {"data": input.data}
    )
    
    # Parent resumes here with child's result
    return {"child_result": result}
Worker behavior: The worker suspends the parent workflow (no compute consumed) until the child completes.

step.batch_invoke()

Start multiple workflows in parallel without waiting:
from polos import BatchWorkflowInput

@workflow
async def parallel_tasks(ctx: WorkflowContext, input: ParallelInput):
    # Start multiple workflows
    handles = await ctx.step.batch_invoke(
        "start_batch", [
            BatchWorkflowInput(id="task_workflow", payload={"task": "A"}),
            BatchWorkflowInput(id="task_workflow", payload={"task": "B"}),
            BatchWorkflowInput(id="log_workflow", payload={"input": input})
        ]
    )
    
    return {"started": len(handles)}

step.batch_invoke_and_wait()

Start multiple workflows in parallel and wait for all to complete:
@workflow
async def parallel_and_wait(ctx: WorkflowContext, input: ParallelInput):
    # Start batch and wait for all
    results = await ctx.step.batch_invoke_and_wait(
        "batch_process", [
            BatchWorkflowInput(id="processor", payload={"item": "A"}),
            BatchWorkflowInput(id="processor", payload={"item": "B"}),
            BatchWorkflowInput(id="log_workflow", payload={"input": input})
        ]
    )
    
    # Worker suspends until all complete
    for result in results:
        print(f"Result: {result.result}")
    
    return results

step.agent_invoke()

Start an agent execution without waiting for it to complete:
@workflow
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
    # Start agent execution
    handle = await ctx.step.agent_invoke(
        "research_agent",
        research_agent.with_input(input.query)
    )
    
    # Continue immediately (don't wait)
    print(f"Started agent execution: {handle.id}")
    return {"agent_execution_id": handle.id}

step.agent_invoke_and_wait()

Start an agent execution and suspend until it completes:
@workflow
async def review_workflow(ctx: WorkflowContext, input: ReviewInput):
    # Call agent and wait for result
    response = await ctx.step.agent_invoke_and_wait(
        "grammar_review_agent_invoke",
        grammar_review_agent.with_input(input.text)
    )
    
    # Parent resumes here with agent's response
    return {"review": response.result}
Worker behavior: The worker suspends the parent workflow (no compute consumed) until the agent completes.

step.batch_agent_invoke()

Start multiple agent executions in parallel without waiting:
@workflow
async def parallel_agents(ctx: WorkflowContext, input: ParallelInput):
    # Start multiple agents
    handles = await ctx.step.batch_agent_invoke(
        "start_batch_agents",
        [
            grammar_review_agent.with_input(input.text),
            tone_review_agent.with_input(input.text),
            accuracy_review_agent.with_input(input.text),
        ]
    )
    
    return {"started": len(handles)}

step.batch_agent_invoke_and_wait()

Start multiple agent executions in parallel and wait for all to complete:
@workflow
async def parallel_reviews(ctx: WorkflowContext, input: ReviewInput):
    # Run three review agents in parallel
    results = await ctx.step.batch_agent_invoke_and_wait(
        "batch_invoke_grammar_tone_accuracy_reviews",
        [
            grammar_review_agent.with_input(input.text),
            tone_consistency_review_agent.with_input(input.text),
            accuracy_review_agent.with_input(input.text),
        ]
    )
    
    # Worker suspends until all agents complete
    for result in results:
        print(f"Review result: {result.result}")
    
    return results

step.wait_for()

Pause execution for a duration:
@workflow
async def delayed_workflow(ctx: WorkflowContext, input: dict):
    # Wait 1 hour (worker suspends, no compute cost)
    await ctx.step.wait_for("wait_1_hour", hours=1)
    
    # Resume after 1 hour
    result = await ctx.step.run("process", process_data, input)
    return result
Available units: seconds, minutes, hours, days, weeks

step.wait_until()

Wait until a specific datetime:
from datetime import datetime, timezone

@workflow
async def scheduled_action(ctx: WorkflowContext, input: dict):
    # Schedule for specific time
    target_time = datetime(2025, 12, 31, 23, 59, 0, tzinfo=timezone.utc)
    
    await ctx.step.wait_until("wait_until_new_year", target_time)
    
    # Executes at 23:59 on Dec 31, 2025
    await ctx.step.run("celebrate", send_celebration)

step.wait_for_event()

Wait for an external event:
@workflow
async def approval_workflow(ctx: WorkflowContext, input: dict):
    # Submit for user confirmation
    await ctx.step.run("submit", submit_for_user_confirmation, input)
    
    # Wait for user.confirmation event (hours or days)
    approval = await ctx.step.wait_for_event(
        "user_confirmation",
        topic="user.confirmation",
        timeout=86400  # 24 hour timeout
    )
    
    # Resume when event arrives
    if approval.data["approved"]:
        await ctx.step.run("execute", execute_action, input)

step.suspend()

Suspend execution and wait for manual resume. This is syntactic sugar around wait_for_event that’s tailored for agent human-in-the-loop flows.
@workflow
async def manual_approval(ctx: WorkflowContext, input: dict):
    # Prepare action
    action = await ctx.step.run("prepare", prepare_action, input)
    
    # Suspend with context data
    approval = await ctx.step.suspend(
        "approval",
        data={
            "action": action.description,
            "cost": action.estimated_cost
        },
        timeout=3600  # 1 hour timeout
    )
    
    # Resume with approval decision
    if approval.data["approved"]:
        await ctx.step.run("execute", execute_action, action)
See Human-in-the-Loop for details.

step.publish_event()

Publish events durably:
@workflow
async def event_publisher(ctx: WorkflowContext, input: dict):
    # Process data
    result = await ctx.step.run("process", process_data, input)
    
    # Publish event (guaranteed once)
    await ctx.step.publish_event(
        "notify_complete",
        topic="processing.completed",
        data={"result": result},
        event_type="completion"
    )

step.uuid()

Generate a UUID that persists across replays:
@workflow
async def create_entity(ctx: WorkflowContext, input: dict):
    # Generate stable UUID (same on replay)
    entity_id = await ctx.step.uuid("entity_id")
    
    # Use the generated UUID in API calls
    await ctx.step.run("create", create_in_api, entity_id, input)
    
    return {"id": entity_id}
Why this matters:
# ❌ BAD: New UUID on each replay
entity_id = str(uuid.uuid4())  # Different every time
await ctx.step.run("create", create_entity, entity_id)

# ✅ GOOD: Same UUID on replay
entity_id = await ctx.step.uuid("entity_id")  # Cached
await ctx.step.run("create", create_entity, entity_id)

step.now()

Get current timestamp that persists across replays:
@workflow
async def timestamped_workflow(ctx: WorkflowContext, input: dict):
    # Get stable timestamp (same on replay)
    created_at = await ctx.step.now("created_at")
    
    # Use in conditional logic
    if created_at % 2 == 0:
        await ctx.step.run("even_path", process_even)
    else:
        await ctx.step.run("odd_path", process_odd)

step.trace()

Add custom spans for observability:
@workflow
async def monitored_workflow(ctx: WorkflowContext, input: dict):
    # Custom span for database operations
    with ctx.step.trace("database_query", {"table": "users", "limit": 100}):
        users = await db.query("SELECT * FROM users LIMIT 100")
    
    # Custom span for external API
    with ctx.step.trace("external_api", {"endpoint": "/data"}):
        data = await api.fetch("/data")
    
    return {"users": len(users), "data": data}
See Tracing for details.

Step keys must be unique

Each step needs a unique step_key per execution:
@workflow
async def process_items(ctx: WorkflowContext, input: dict):
    # ❌ BAD: Same key in loop
    for item in input.items:
        await ctx.step.run("process", process_item, item)  # Collision!
    
    # ✅ GOOD: Unique key per iteration
    for i, item in enumerate(input.items):
        await ctx.step.run(f"process_{i}", process_item, item)

Agents and steps

In Polos, agents are special workflows. Everything an agent does is broken into steps: Agent actions as steps:
  • LLM calls - Each call is a step
  • Guardrail evaluation - Each guardrail is a step
  • Stop condition checks - Each check is a step
  • Tool calls - Tools are subworkflows (not steps)
This means:
  • Agent failures resume from the last completed step (not the beginning)
  • Tool calls are durable subworkflows (not lost on failure)
  • Every agent action is observable via step events