Skip to main content
Workflows can pause execution to wait for time delays or external events. During waits, the worker suspends execution and no compute resources are consumed.

Why use waits?

Waits enable workflows to:
  • Delay execution - Wait hours, days, or weeks between steps
  • Coordinate with external systems - Wait for webhooks, approvals, or user actions
  • Implement timeouts - Fail gracefully if events don’t arrive
  • Save costs - No compute consumed while waiting

Types of waits

1. Time based: wait_for

Pause for a duration:
from polos import workflow, WorkflowContext

@workflow
async def delayed_workflow(ctx: WorkflowContext, input: WorkflowInput):
    # Process immediately
    data = await ctx.step.run("fetch", fetch_data)
    
    # Wait 1 hour (worker suspends)
    await ctx.step.wait_for("wait_1_hour", hours=1)
    
    # Resume after 1 hour
    result = await ctx.step.run("process", process_data, data)
    return result
Available time units:
# Wait 30 seconds
await ctx.step.wait_for("wait", seconds=30)

# Wait 5 minutes
await ctx.step.wait_for("wait", minutes=5)

# Wait 2 hours
await ctx.step.wait_for("wait", hours=2)

# Wait 3 days
await ctx.step.wait_for("wait", days=3)

# Wait 1 week
await ctx.step.wait_for("wait", weeks=1)

# Combine units
await ctx.step.wait_for("wait", hours=1, minutes=30)  # 1.5 hours

2. Timestamp-based: wait_until

Wait until a specific datetime
from datetime import datetime, timezone

@workflow
async def scheduled_action(ctx: WorkflowContext, input: dict):
    # Schedule for specific time
    target_time = datetime(2025, 12, 31, 23, 59, 0, tzinfo=timezone.utc)
    
    # Wait until exactly 23:59 on Dec 31, 2025
    await ctx.step.wait_until("wait_new_year", target_time)
    
    # Executes at the specified time
    await ctx.step.run("celebrate", send_celebration)
Timezone handling:
from datetime import datetime
from zoneinfo import ZoneInfo

# Wait until 9 AM Eastern Time
target = datetime(2025, 2, 1, 9, 0, 0, tzinfo=ZoneInfo("America/New_York"))
await ctx.step.wait_until("wait_morning", target)

# If target is in the past, raises error

3. Event-based: wait_for_event

Wait for an external event:
@workflow
async def approval_workflow(ctx: WorkflowContext, input: dict):
    # Submit for approval
    await ctx.step.run("submit", submit_for_approval, input)
    
    # Wait for approval event (hours or days)
    approval = await ctx.step.wait_for_event(
        "wait_approval",
        topic="approval/response"
    )
    
    # Resume when event arrives
    if approval.data["approved"]:
        await ctx.step.run("execute", execute_action, input)
    else:
        return {"status": "rejected"}
With timeout:
# Wait for event with 24-hour timeout
try:
    approval = await ctx.step.wait_for_event(
        "wait_approval",
        topic="approval.response",
        timeout=86400  # 24 hours in seconds
    )
    # Event received
    return {"approved": approval.data["approved"]}
except StepExecutionError:
    # Timeout expired without event
    return {"status": "timeout"}

Wait behavior

Worker suspension

When a workflow waits:
  1. Worker suspends execution - Workflow state is saved
  2. No compute consumed - Worker is freed to handle other workflows
  3. Orchestrator tracks wait - Schedules resumption
  4. Workflow resumes - Worker picks up from where it stopped

Short vs long waits

Short waits (≤10 seconds):
  • Worker sleeps inline (doesn’t suspend)
  • Slightly more efficient (no orchestrator round-trip)
  • Configurable via POLOS_WAIT_THRESHOLD_SECONDS environment variable
Long waits (>10 seconds):
  • Worker suspends and is freed
  • Orchestrator schedules resumption
  • Essential for hours/days/weeks
# Short wait - worker sleeps inline
await ctx.step.wait_for("short", seconds=5)

# Long wait - worker suspends
await ctx.step.wait_for("long", minutes=30)

Publishing events to resume waits

From within a workflow

@workflow
async def publisher_workflow(ctx: WorkflowContext, input: dict):
    # Process data
    result = await ctx.step.run("process", process_data, input)
    
    # Publish event (resumes workflows waiting on this topic)
    await ctx.step.publish_event(
        "notify",
        topic="approval/response",
        data={
            "approved": True,
            "approved_by": "[email protected]"
        }
    )
    
    return result

From external systems (API)

import httpx

async def send_approval(approved: bool, approved_by: str):
    """Send approval event via API."""
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://api.polos.ai/api/v1/events/publish",
            headers={
                "Authorization": "Bearer YOUR_API_KEY",
                "Content-Type": "application/json"
            },
            json={
                "topic": "approval/response",
                "events": [{
                    "data": {
                        "approved": approved,
                        "approved_by": approved_by
                    }
                }]
            }
        )

# Resume waiting workflow
await send_approval(approved=True, approved_by="[email protected]")