Skip to main content
Event-triggered workflows execute automatically when events are published to specific topics. Perfect for building reactive systems, webhooks, and event-driven architectures.

Defining event-triggered workflows

Use trigger_on_event to specify which topic should trigger the workflow:
from polos import workflow, WorkflowContext, EventPayload

@workflow(
    id="user-signup-handler",
    trigger_on_event="user/signup"
)
async def user_signup_handler(ctx: WorkflowContext, payload: EventPayload):
    # Triggered when event published to "user.signup" topic    
    user_id = payload.data["user_id"]
    email = payload.data["email"]
    
    # Send welcome email
    await ctx.step.run("send_welcome", send_welcome_email, user_id, email)
    
    # Create sample data
    await ctx.step.run("setup_account", create_sample_data, user_id)
    
    return {"status": "onboarded", "user_id": user_id}

Event payload structure

Event-triggered workflows receive the event in their payload:
@workflow(
    id="notification-handler",
    trigger_on_event="notifications/new"
)
async def notification_handler(ctx: WorkflowContext, payload: EventPayload):
    # Single event    
    print(f"Event ID: {payload.id}")
    print(f"Sequence ID: {payload.sequence_id}")
    print(f"Topic: {payload.topic}")
    print(f"Event Type: {payload.event_type}")
    print(f"Data: {payload.data}")  # This is a dict
    print(f"Created At: {payload.created_at}")
    
    # Process the event
    notification_data = payload.data
    await ctx.step.run("send", send_notification, notification_data)
Event structure:
{
  "id": "evt_123abc",
  "sequence_id": 456
  "topic": "notifications.new",
  "event_type": "notification_created",
  "data": {
    "user_id": "user_456",
    "message": "You have a new message"
  },
  "created_at": "2025-01-28T10:30:00Z"
}

Publishing events

Publish events from workflows or external systems:

From within a workflow

@workflow
async def create_user(ctx: WorkflowContext, input: CreateUserInput):
    # Create user
    user = await ctx.step.run("create", create_user_record, input)
    
    # Publish event (triggers event-triggered workflows)
    await ctx.step.publish_event(
        "publish_signup",
        topic="user/signup",
        data={
            "user_id": user.id,
            "email": user.email,
            "name": user.name
        },
        event_type="user_created"
    )
    
    return {"user_id": user.id}

From external systems (API)

import httpx

async def publish_event(topic: str, data: dict):
    """Publish an event via API."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://api.polos.ai/api/v1/events/publish",
            headers={
                "Authorization": "Bearer YOUR_API_KEY",
                "Content-Type": "application/json"
            },
            json={
                "topic": topic,
                "events": [{
                    "data": data,
                    "event_type": "custom_event"
                }]
            }
        )
        response.raise_for_status()

# Trigger workflows listening to "order.completed"
await publish_event(
    "order.completed",
    {"order_id": "ord_123", "total": 99.99}
)

Batch events for processing

Process multiple events together for efficiency:
@workflow(
    id="batch-processor",
    trigger_on_event="analytics/events",
    batch_size=10,              # Process up to 10 events
    batch_timeout_seconds=30    # Or wait max 30 seconds
)
async def batch_processor(ctx: WorkflowContext, payload: BatchEventPayload):
    # Multiple events in the payload
    events = payload.events
    
    print(f"Processing batch of {len(events)} events")
    
    # Extract all event data
    analytics_data = [event.data for event in events]
    
    # Process batch
    await ctx.step.run("batch_insert", insert_analytics, analytics_data)
    
    return {"processed": len(events)}
Batching behavior:
  • Workflow triggers when either batch_size is reached or batch_timeout_seconds elapses
  • If 10 events arrive in 5 seconds → triggers immediately with 10 events
  • If only 3 events arrive in 30 seconds → triggers with 3 events after timeout
Batch payload structure:
{
  "events": [
    {
      "id": "evt_1",
      "sequence_id": 1001,
      "topic": "analytics.events",
      "data": {"action": "click"},
      "created_at": "2025-01-28T10:30:00Z"
    },
    {
      "id": "evt_2",
      "sequence_id": 1002,
      "topic": "analytics.events",
      "data": {"action": "view"},
      "created_at": "2025-01-28T10:30:05Z"
    }
  ]
}

Multiple handlers for one topic

Multiple workflows can listen to the same topic:
@workflow(
    id="immediate-handler",
    trigger_on_event="order/created"
)
async def immediate_handler(ctx: WorkflowContext, payload: EventPayload):
    """Process each order immediately."""
    order_id = payload.data["order_id"]
    
    await ctx.step.run("send_confirmation", send_order_confirmation, order_id)
    return {"handler": "immediate"}

@workflow(
    id="batched-handler",
    trigger_on_event="order/created",
    batch_size=5,
    batch_timeout_seconds=60
)
async def batched_handler(ctx: WorkflowContext, payload: BatchEventPayload):
    """Process orders in batches for analytics."""
    events = payload.events
    
    order_ids = [e.data["order_id"] for e in events]
    await ctx.step.run("batch_analytics", update_analytics, order_ids)
    
    return {"handler": "batched", "count": len(events)}
When an event is published:
  • immediate-handler triggers once per event
  • batched-handler triggers once per batch (up to 5 events or 60 seconds)

Event filtering

Filter events by event_type:
@workflow(
    id="high-priority-handler",
    trigger_on_event="notifications/system"
)
async def high_priority_handler(ctx: WorkflowContext, payload: PayloadEvent):    
    # Only process high priority notifications
    if payload.event_type == "high_priority":
        await ctx.step.run("alert", send_alert, payload.data)
    else:
        print(f"Skipping event type: {payload.event_type}")
Alternative approach: Use multiple topics
# Publish to specific topics
await ctx.step.publish_event(
    "publish",
    topic="notifications/high_priority",  # Specific topic
    data=notification_data
)

@workflow(
    id="high-priority-handler",
    trigger_on_event="notifications/high_priority"  # Only high priority
)
async def high_priority_handler(ctx: WorkflowContext, payload: EventPayload):
    # All events on this topic are high priority
    await ctx.step.run("alert", send_alert, payload.data)

Event topic patterns

Use topic hierarchies for organization:
# User events
"user/signup"
"user/login"
"user/deleted"

# Order events
"order/created"
"order/completed"
"order/cancelled"

# System events
"system/error"
"system/warning"
"system/maintenance"

Key takeaways

  • Event-triggered workflows execute automatically when events are published
  • Use trigger_on_event to specify the topic
  • Publish events from workflows or external systems via API
  • Batch events for efficiency with batch_size and batch_timeout_seconds
  • Multiple handlers can listen to the same topic