Skip to main content
Monitor workflow execution in real-time by streaming lifecycle events. Events provide visibility into steps, tool calls, and execution progress.

What are lifecycle events?

Lifecycle events are emitted automatically during workflow execution:
  • workflow_start - Workflow begins execution
  • workflow_finish - Workflow completes
  • step_start - Step begins
  • step_finish - Step completes
For agents, additional events:
  • agent_start - Agent execution begins
  • agent_finish - Agent execution completes (includes usage stats)
  • text_delta - Text streaming chunks (only with agent.stream())
  • tool_call - Tool execution triggered (only with agent.stream())

Streaming workflow events

Listen to events for a specific workflow execution:
from polos import workflow, WorkflowContext, PolosClient
from polos.features.events import stream_workflow

@workflow
async def data_pipeline(ctx: WorkflowContext, input: PipelineInput):
    data = await ctx.step.run("extract", extract_data, input)
    transformed = await ctx.step.run("transform", transform_data, data)
    final = await ctx.step.invoke_and_wait("child_pipeline", child_pipeline, transformed)
    await ctx.step.run("load", load_data, final)

# Start workflow
client = PolosClient()
handle = await data_pipeline.invoke(
    client,
    PipeInput(source="database", table="orders")
)

# Stream events
async for event in stream_workflow(workflow_run_id=handle.id):
    if event.event_type == "step_start":
        print(f"⏳ Step started: {event.data.get('step_key')}")
    
    elif event.event_type == "step_finish":
        print(f"✅ Step finished: {event.data.get('step_key')}")
    
    elif event.event_type == "workflow_finish":
        print(f"🎉 Workflow completed")

Event structure

Events contain event_type, data and execution metadata. Example event data:
{
  "id": "9124c653-41e8-4a7d-ad43-76d642b422bd",
  "sequence_id": 1001,
  "event_type": "step_start",
  "data": {
    "step_key": "extract",
    "step_type": "run",
    "_metadata": {
      "execution_id": "exec_123",
      "workflow_id": "data-pipeline"
    }
  },
  "created_at": "2025-01-29T10:30:00Z"
}
Workflows contain events for all steps and subworkflows that it executes. The data_pipeline example above has 3 steps and 1 workflow. So, it will see the following events:
workflow_start  -> "data": {"_metadata": {"workflow_id": "data_pipeline"}}

step_start      -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "extract"}
step_finish     -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "extract"}

step_start      -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "transform"}
step_finish     -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "transform"}

workflow_start  -> "data": {"_metadata": {"workflow_id": "child_pipeline"}}
< events for steps and subworkflows from child_pipeline >
workflow_finish -> "data": {"_metadata": {"workflow_id": "child_pipeline"}}

step_start      -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "load"}
step_finish     -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "load"}

workflow_finish -> "data": {"_metadata": {"workflow_id": "data_pipeline"}}
The metadata field identifies which workflow / subworkflow emitted the event.

Building real-time UIs

Lifecycle events are useful for streaming progress updates to UI. You can send events to frontend via Websocket:
from polos import events, PolosClient

async def stream_to_websocket(client: PolosClient, execution_id: str, websocket):
    """Stream workflow events to WebSocket client."""
    async for event in events.stream_workflow(client, workflow_run_id=execution_id):
        # Send event to frontend
        await websocket.send_json({
            "type": event.event_type,
            "data": event.data,
            "timestamp": event.created_at
        })
        
        # Stop streaming when workflow completes
        if event.event_type == "workflow_finish" and event.data["metadata"]["execution_id"] == execution_id:
            break
Frontend (JavaScript):
const ws = new WebSocket('ws://localhost:8000/stream');

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  
  if (data.type === 'step_start') {
    console.log(`Step started: ${data.data.step_key}`);
  } else if (data.type === 'step_finish') {
    console.log(`Step finished: ${data.data.step_key}`);
  } else if (data.type === 'workflow_finish') {
    console.log('Workflow completed!');
  }
};