Skip to main content
Workflow state persists data across the entire workflow execution. Use state for counters, accumulators, configuration, or any data that multiple steps need to access and modify.

Defining workflow state

Create a schema for state using WorkflowState:
from polos import workflow, WorkflowContext, WorkflowState

class ProcessingState(WorkflowState):
    processed_count: int = 0
    failed_items: list[str] = []
    started_at: str = ""

@workflow(state_schema=ProcessingState)
async def batch_processor(ctx: WorkflowContext, input: BatchProcessorInput):
    # Access state via ctx.state
    ctx.state.started_at = input.timestamp
    
    for item in input.items:
        try:
            await ctx.step.run(f"process_{item}", process_item, item)
            ctx.state.processed_count += 1
        except Exception as e:
            ctx.state.failed_items.append(item)
    
    return BatchProcessorOutput(
        processed=ctx.state.processed_count,
        failed=len(ctx.state.failed_items)
    )

When to use state

Use state for:
  • ✅ Counters and accumulators
  • ✅ Progress tracking
  • ✅ Configuration set during execution
  • ✅ Data shared across multiple steps
Don’t use state for:
  • ❌ Large data (state has 1MB limit)
  • ❌ Temporary variables (use local variables)
State is saved when the workflow completes or fails.

Setting initial state during workflow invocation

You can set the initial workflow state when invoking it:
class ParentState(WorkflowState):
    tasks_started: int = 0

class ChildState(WorkflowState):
    parent_id: str = ""
    config: dict = {}

@workflow(state_schema=ParentState)
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
    ctx.state.tasks_started += 1
    
    # Invoke child workflow with some initial values for its state
    result = await ctx.step.invoke_and_wait(
        "call_child",
        child_workflow,
        payload={"data": input.data},
        initial_state=ChildState(
            parent_id=ctx.execution_id,
            config={"mode": "production"}
        )
    )
    
    return result

@workflow(state_schema=ChildState)
async def child_workflow(ctx: WorkflowContext, input: ChildInput):
    # Access initial state
    print(f"Parent ID: {ctx.state.parent_id}")
    print(f"Config: {ctx.state.config}")
    
    await ctx.step.run("process", process_data, input.data)
    return {"processed": True}
Invoke parent workflow with some initial values for its state:
client = PolosClient()

result = await parent_workflow(
    client,
    ParentInput(data={"resume_from": 10}),
    initial_state=ParentState(tasks_started=10)
)

Best practices

  • Do not use state to store large objects. Polos enforces a limit of 1MB (serialized JSON) on state size.
  • Use default values
class ConfigState(WorkflowState):
    retry_limit: int = 3
    timeout_seconds: int = 30
    debug_mode: bool = False