Skip to main content
Streaming provides real-time feedback as agents execute, allowing you to display progress to users and handle responses incrementally.

Basic streaming

Use agent.stream() to stream agent responses:
import asyncio
from polos import PolosClient

async def main():
    client = PolosClient()
    stream = await weather_agent.stream(client, "What's the weather in Tokyo?")
    
    # Stream text as it's generated
    async for chunk in stream.text_chunks:
        print(chunk, end="", flush=True)
    
    print("\n")

if __name__ == "__main__":
    asyncio.run(main())

Stream iterators

Polos provides multiple ways to consume streamed responses:

Text chunks

Stream only the text content:
stream = await agent.stream(client, "Explain quantum computing")

async for chunk in stream.text_chunks:
    print(chunk, end="", flush=True)
Use case: Display text to users in real-time, like ChatGPT’s streaming interface.

All events

Access full event stream including tool calls, steps, and metadata:
stream = await agent.stream(client, "Search for Python tutorials and summarize them")

async for event in stream.events:
    if event.event_type == "text_delta":
        print(event.data.get("content", ""), end="", flush=True)
    elif event.event_type == "tool_call":
        tool_name = event.data.get("tool_name")
        print(f"\n[Calling tool: {tool_name}]\n")
Use case: Build rich UIs that show tool execution progress, reasoning steps, etc.

Final text

Get the complete accumulated text after streaming finishes:
stream = await agent.stream(client, "Write a haiku about programming")

final_text = await stream.text()
print(final_text)
Use case: When you need the complete response but want streaming for progress indication.

Complete result

Get the full result object with usage stats, tool calls, and metadata:
stream = await agent.stream(client, "What's 2+2?")

result = await stream.result()

print(f"Result: {result.result}")
print(f"Total steps: {result.total_steps}")
print(f"Tokens used: {result.usage.total_tokens}")

# Access tool calls
for tool_call in result.tool_results:
    print(f"Tool: {tool_call.tool_name}")
    print(f"Input: {tool_call.result}")
Use case: Analytics, logging, cost tracking, debugging.

Lifecycle events

By default, agents emit lifecycle events during execution - regardless of whether you use agent.run() or agent.stream(). These events help you track progress and display status to users.

Event types

agent_start
  • Marks the beginning of agent execution
  • Contains agent ID and initial configuration
agent_finish
  • Marks the end of execution
  • Includes usage statistics (tokens, cost)
  • Contains final result
step_start
  • Indicates a workflow step has begun - for example, LLM or tool call
step_finish
  • Confirms step completion
  • Includes step output and duration
text_delta (only with agent.stream())
  • Incremental text chunks as the LLM generates them
  • Real-time content streaming
tool_call (only with agent.stream())
  • Emitted when the agent asks for a tool execution
  • Includes tool name and arguments
  • The results of the tool execution are available via step_finish event when the tool finishes execution

Using lifecycle events

stream = await agent.stream(client, "Research AI agents and create a summary")

async for event in stream.events:
    if event.event_type == "agent_start":
        print("🚀 Agent started")
    
    elif event.event_type == "step_start":
        step_name = event.data.get("step_key")
        print(f"⏳ Running step: {step_name}")
    
    elif event.event_type == "tool_call":
        tool_name = event.data.get("tool_call", {}).get("function", {}).get("name")
        print(f"🔧 Calling tool: {tool_name}")
    
    elif event.event_type == "text_delta":
        content = event.data.get("content", "")
        print(content, end="", flush=True)
    
    elif event.event_type == "step_finish":
        step_name = event.data.get("step_key")
        print(f"\n✓ Completed step: {step_name}")
    
    elif event.event_type == "agent_finish":
        usage = event.data.get("usage", {})
        print(f"\n✅ Agent finished - Tokens: {usage.get('total_tokens')}")
Example output:
🚀 Agent started
🔧 Calling tool: search_web
⏳ Running step: search_web
✓ Completed step: search_web
⏳ Running step: generate_summary
Based on recent research, AI agents are...
✓ Completed step: generate_summary
✅ Agent finished - Tokens: 1247

Stream with agent.run()

Even when using agent.run() (non-streaming), lifecycle events are still emitted. You can listen to them separately:
from polos import PolosClient, events
import json

client = PolosClient()

# Start the agent - non-streaming
execution_handle = await weather_agent.invoke(
    client,
    payload={
        "input": "What is the weather in New York and Mumbai? Compare them.",
        "streaming": False,  # Set to False for non-streaming execution
    }
)

# Get the execution_id from the handle
execution_id = execution_handle.id

# Listen to events using stream_workflow
print("📡 Streaming events:\n")
async for event in events.stream_workflow(client, workflow_run_id=execution_id):
    if event.event_type == "step_start":
        print(f"\n\nStep started: {event.data.get("step_key")}")
    elif event.event_type == "step_finish":
        # Print the result of the step
        print(f"\n\nStep finished: {json.dumps(event.data.get("data", {}).get("result", {}), indent=2)}")
    elif event.event_type == "agent_finish":
        print(f"\n\nAgent finished")

Building UIs with streaming

Basic text streaming UI

async def stream_to_ui(agent, user_message):
    stream = await agent.stream(client, user_message)
    
    async for chunk in stream.text_chunks:
        # Send chunk to frontend
        await websocket.send_json({
            "type": "text_chunk",
            "content": chunk
        })
    
    # Send completion signal
    await websocket.send_json({"type": "done"})

Rich progress UI

async def stream_with_progress(agent, user_message):
    stream = await agent.stream(client, user_message)
    
    async for event in stream.events:
        if event.event_type == "tool_call":
            await websocket.send_json({
                "type": "tool_call",
                "tool": event.data.get("tool_call", {}).get("function", {}).get("tool_name"),
                "status": "executing"
            })
        
        elif event.event_type == "text_delta":
            await websocket.send_json({
                "type": "text",
                "content": event.data.get("content", "")
            })
        
        elif event.event_type == "agent_finish":
            await websocket.send_json({
                "type": "complete",
                "usage": event.data.get("usage")
            })

Error handling in streams

Streams automatically handle errors and emit error events:
stream = await agent.stream(client, "Do something risky")

try:
    async for event in stream.events:
        if event.event_type == "error":
            error_msg = event.data.get("error")
            print(f"Error: {error_msg}")
            break
        
        elif event.event_type == "text_delta":
            print(event.data.get("content", ""), end="")
except Exception as e:
    print(f"Stream failed: {e}")

Key takeaways

  • stream.text_chunks - Simple text-only streaming
  • stream.events - Full access to lifecycle events, tool calls, and metadata
  • stream.text() - Get complete accumulated text
  • stream.result() - Get full result with usage and tool information
  • Lifecycle events - Emitted for both agent.run() and agent.stream()
  • Rich UIs - Use events to show tool execution, progress, and real-time updates