Skip to main content
Human-in-the-loop (HITL) workflows pause execution to wait for human approval, input, or decisions before continuing. This is essential for sensitive operations like financial transactions, data deletions, or any action requiring human oversight.

Basic suspend and resume

Use ctx.step.suspend() to pause workflow execution and wait for external input:
from polos import workflow, WorkflowContext, Agent
from pydantic import BaseModel, Field
from typing import List, Optional

class ActionPlan(BaseModel):
    description: str = Field(description="What will be done")
    steps: List[str] = Field(description="Steps to execute")
    estimated_cost: float = Field(description="Estimated cost in USD")
    risk_level: str = Field(description="low, medium, or high")

planning_agent = Agent(
    id="planning-agent",
    provider="openai",
    model="gpt-4o",
    system_prompt="Analyze requests and create detailed action plans with cost estimates.",
    output_schema=ActionPlan
)

class ApprovalWorkflowInput(BaseModel):
    request: str

class ApprovalWorkflowOutput(BaseModel):
    status: str
    result: Optional[str] = None
    rejection_reason: Optional[str] = None


@workflow
async def approval_workflow(ctx: WorkflowContext, input: ApprovalWorkflowInput):
    # Step 1: Agent creates action plan
    plan_response = await ctx.step.agent_invoke_and_wait(
        "create_plan",
        planning_agent.with_input(f"Create an action plan for: {input.request}")
    )
    
    plan = plan_response.result  # ActionPlan (structured output)
    
    # Step 2: Suspend and wait for approval
    approval = await ctx.step.suspend(
        step_key="wait_for_approval",
        data={
            "description": plan.description,
            "steps": plan.steps,
            "estimated_cost": plan.estimated_cost,
            "risk_level": plan.risk_level,
            "requires_approval_from": "[email protected]"
        },
        timeout=3600  # 1 hour timeout
    )
    
    # Step 3: Resumes here when approval event is received. Execute if approved.
    if approval.data.get("approved"):
        result = await ctx.step.run(
            "execute_plan",
            execute_action,
            plan
        )
        return ApprovalWorkflowOutput(status="completed", result=result)
    else:
        return ApprovalWorkflowOutput(status="rejected", rejection_reason=approval_data.get("reason"))
What happens:
  1. Agent creates a structured action plan
  2. Workflow suspends with plan details
  3. Worker suspends execution (no compute consumed)
  4. An event is emitted with suspend details
  5. Workflow waits indefinitely (or until timeout)
  6. When resume event arrives, workflow continues with the provided data

Suspend events

When a workflow suspends, Polos emits an event to a topic specific to that suspension: Topic format: {step_key}/{execution_id} Event:
{
  "event_type": "suspend",
  "topic": "wait_for_approval/abc-123-def-456",
  "data": {
    "description": "Delete 1000 customer records from archive",
    "steps": ["Backup data", "Run deletion query", "Verify deletion"],
    "estimated_cost": 50.00,
    "risk_level": "high",
    "requires_approval_from": "[email protected]"
  }
}

Listening for suspend events

Listen to the suspend topic to know when workflows need attention:
import asyncio
from polos import events, PolosClient

async def main():
    client = PolosClient()

    # Start the workflow
    handle = await approval_workflow.invoke(client, {
        "request": "delete old customer records from archive"
    })
    
    print(f"Workflow started: {handle.id}")
    print("Waiting for approval...")
    
    # Build the suspend topic
    suspend_step_key = "wait_for_approval"
    suspend_topic = f"{suspend_step_key}/{handle.id}"
    
    # Listen for suspend event
    async for event in events.stream_topic(suspend_topic):
        if event.event_type == "suspend":
            print("Workflow suspended, awaiting approval:")
            print(f"  Description: {event.data['description']}")
            print(f"  Steps: {', '.join(event.data['steps'])}")
            print(f"  Cost: ${event.data['estimated_cost']}")
            print(f"  Risk: {event.data['risk_level']}")
            print(f"  Approver: {event.data['requires_approval_from']}")
            break

if __name__ == "__main__":
    asyncio.run(main())

Resuming workflows

To resume a suspended workflow, emit a resume event to the same topic:
from polos import PolosClient
from datetime import datetime

async def approve_workflow():
    client = PolosClient()

    # Resume with approval
    await client.resume(
        suspend_execution_id="abc-123-def-456",  # Workflow execution ID
        suspend_step_key="wait_for_approval",     # Step key from suspend()
        data={
            "approved": True,
            "approved_by": "[email protected]",
            "approved_at": datetime.now().isoformat(),
            "notes": "Approved after review"
        }
    )
The workflow resumes immediately and continues with the provided data.

Complete example

Here’s a full approval workflow with suspend/resume:
import asyncio
from polos import workflow, WorkflowContext, events, PolosClient
from datetime import datetime
from pydantic import BaseModel

class ChargeRequest(BaseModel):
    customer_id: str
    amount: float
    description: str

class ChargeResponse(BaseModel):
    status: str
    reason: Optional[str] = None  # Used for rejections
    charge_id: Optional[str] = None
    amount: Optional[float] = 0.0

@workflow
async def charge_customer_workflow(ctx: WorkflowContext, input: ChargeRequest):
    # Validate the charge
    validation = await ctx.step.run(
        "validate_charge",
        validate_charge_request,
        input
    )
    
    if not validation.is_valid:
        return ChargeResponse(status=status, reason=validation.error)
    
    # For charges over $1000, require approval
    if input.amount > 1000:
        approval = await ctx.step.suspend(
            step_key="high_value_approval",
            data={
                "customer_id": input.customer_id,
                "amount": input.amount,
                "description": input.description,
                "threshold_exceeded": True
            },
            timeout=7200  # 2 hours
        )
        
        if not approval.data.get("approved"):
            return ChargeResponse(
                status=status,
                reason=approval.get("reason", "Not approved")
            )
    
    # Execute the charge
    charge_result = await ctx.step.run(
        "execute_charge",
        charge_stripe,
        input
    )
    
    # Send confirmation
    await ctx.step.run(
        "send_confirmation",
        send_email,
        customer_id=input.customer_id,
        charge_id=charge_result.charge_id
    )
    
    return ChargeResponse(
        status="completed",
        charge_id=charge_result.charge_id,
        amount=input.amount
    )

# Start workflow
async def main():
    client = PolosClient()

    handle = await charge_customer_workflow.invoke(
        client,
        ChargeRequest(
            customer_id="cust_123",
            amount=5000.00,
            description="Enterprise annual subscription"
        ))
    
    print(f"Charge workflow started: {handle.id}")
    
    # Listen for approval request
    suspend_topic = f"high_value_approval/{handle.id}"
    
    async for event in events.stream_topic(suspend_topic):
        if event.event_type == "suspend":
            print("\n⚠️  High-value charge requires approval:")
            print(f"   Customer: {event.data['customer_id']}")
            print(f"   Amount: ${event.data['amount']}")
            print(f"   Description: {event.data['description']}")
            
            # Send Slack notification with approval URL
            approval_url = f"https://admin.example.com/approve/{handle.id}"
            await send_slack_message(
                channel="#financial-approvals",
                message=f"High-value charge requires approval: ${event.data['amount']}",
                blocks=[{
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*Charge Approval Required*\nAmount: ${event.data['amount']}\nCustomer: {event.data['customer_id']}\n<{approval_url}|Approve or Reject>"
                    }
                }]
            )
            break
    
    # Wait for workflow completion
    result = await handle.result()
    print(f"\n✅ Charge completed: {result.charge_id}")

if __name__ == "__main__":
    asyncio.run(main())

Building approval UIs

The slack message has an approval URL of https://admin.example.com/approve/{execution_id}. When a human clicks the URL and makes a decision, your approval endpoint can call the Polos API to resume the workflow.
const response = await fetch(
  `https://api.polos.ai/api/v1/executions/${executionId}/resume`,
  {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      step_key: "high_value_approval",
      data: {
        approved: approval.approved,
        approved_by: approval.approved_by,
        approved_at: new Date().toISOString(),
        reason: approval.reason
      }
    })
  }
)

const result = await response.json()

Timeouts

Suspensions can have timeouts to prevent workflows from waiting indefinitely:
try:
    approval = await ctx.step.suspend(
        step_key="approval",
        data={"request": "delete records"},
        timeout=3600  # 1 hour in seconds
    )
except StepExecutionError as e:
    return ChargeResponse(
        status="timeout",
        reason="Approval not received in time"
    )

Multiple approvals

Handle multi-stage approvals:
@workflow
async def multi_approval_workflow(ctx: WorkflowContext, input: MultiApprovalInput):
    # Stage 1: Manager approval
    manager_approval = await ctx.step.suspend(
        step_key="manager_approval",
        data={"stage": "manager", "amount": input.amount}
    )
    
    if not manager_approval.data.get("approved"):
        return MultiApprovalOutput(status="rejected_by_manager")
    
    # Stage 2: Finance approval (for large amounts)
    if input.amount > 10000:
        finance_approval = await ctx.step.suspend(
            step_key="finance_approval",
            data={"stage": "finance", "amount": input.amount}
        )
        
        if not finance_approval.data.get("approved"):
            return MultiApprovalOutput(status="rejected_by_finance")
    
    # Execute action
    result = await ctx.step.run("execute", execute_action, input)
    return MultiApprovalOutput(status="completed", result=result)

Key takeaways

  • ctx.step.suspend() pauses workflow execution and waits for external input
  • No compute consumed while suspended — workflows can wait hours or days
  • Suspend events emitted to {step_key}/{execution_id} topic
  • Resume with resume() providing execution ID, step key, and decision data
  • Use timeouts to prevent indefinite waiting
  • Build approval UIs by listening to suspend events and calling resume API
  • Multi-stage approvals supported with multiple suspend steps