from polos import workflow, WorkflowContext, PolosClient
from polos.features.events import stream_workflow
@workflow
async def data_pipeline(ctx: WorkflowContext, input: PipelineInput):
data = await ctx.step.run("extract", extract_data, input)
transformed = await ctx.step.run("transform", transform_data, data)
final = await ctx.step.invoke_and_wait("child_pipeline", child_pipeline, transformed)
await ctx.step.run("load", load_data, final)
# Start workflow
client = PolosClient()
handle = await data_pipeline.invoke(
client,
PipeInput(source="database", table="orders")
)
# Stream events
async for event in stream_workflow(workflow_run_id=handle.id):
if event.event_type == "step_start":
print(f"⏳ Step started: {event.data.get('step_key')}")
elif event.event_type == "step_finish":
print(f"✅ Step finished: {event.data.get('step_key')}")
elif event.event_type == "workflow_finish":
print(f"🎉 Workflow completed")