Skip to main content
Scheduled workflows run automatically at specified times using cron expressions. Perfect for recurring tasks like daily reports, data syncs, or cleanup jobs.

Defining scheduled workflows

There are two ways to schedule workflows:

1. Inline cron string

Use a cron expression directly in the decorator:
from polos import workflow, WorkflowContext

@workflow(
    id="daily-report",
    schedule="0 9 * * *"  # Every day at 9:00 AM UTC
)
async def daily_report(ctx: WorkflowContext, payload: SchedulePayload):
    print(f"Report generated at: {payload.timestamp}")
    
    # Generate and send report
    data = await ctx.step.run("fetch_data", fetch_daily_data)
    report = await ctx.step.run("generate", create_report, data)
    await ctx.step.run("send", email_report, report)
    
    return {"status": "completed"}
You can also specify timezone in the cron schedule:
@workflow(
    id="end-of-day-sync",
    schedule={
        "cron": "0 17 * * *",  # 5:00 PM
        "timezone": "America/New_York"
    }
)
async def end_of_day_sync(ctx: WorkflowContext, payload: SchedulePayload):
    print(f"Syncing at {payload.timestamp} Eastern Time")
    
    await ctx.step.run("sync_data", sync_to_warehouse)
    return {"status": "synced"}

2. Create schedule via API

Mark a workflow as schedulable, then create one or more schedules for it via API: This is useful for creating separate schedules per user or entity.
# Define workflow as schedulable
@workflow(
    id="user-reminder",
    schedule=True  # Can be scheduled via API
)
async def user_reminder(ctx: WorkflowContext, payload: SchedulePayload):
    user_id = payload.key  # Schedule key (e.g., user ID)
    print(f"Reminder for user {user_id} at {payload.timestamp}")
    
    await ctx.step.run("send_reminder", send_user_reminder, user_id)
    return {"status": "sent"}
Create schedule via API:
import asyncio
from polos import schedules, PolosClient

async def schedule_user_reminders(client):
    # Schedule per-user reminder
    schedule_id = await schedules.create(
        client,
        workflow="user-reminder",
        cron="0 8 * * *",  # 8:00 AM daily
        timezone="America/Los_Angeles",
        key="user_123"  # Per-user schedule
    )
    
    print(f"Created schedule: {schedule_id}")

client = PolosClient()
asyncio.run(schedule_user_reminders(client))

Schedule payload

Scheduled workflows receive a special SchedulePayload with timing information:
from datetime import datetime
from pydantic import BaseModel

class SchedulePayload(BaseModel):
    timestamp: datetime          # When this workflow was scheduled to run
    last_timestamp: datetime | None  # When this schedule last ran (None if first run)
    timezone: str                # Timezone of the schedule
    schedule_id: str             # Unique identifier for this schedule
    key: str                     # User ID or custom identifier
    upcoming: datetime           # Next scheduled run time

@workflow(id="scheduled-task", schedule="0 */6 * * *")
async def scheduled_task(ctx: WorkflowContext, payload: SchedulePayload):
    # Access schedule information
    print(f"Current run: {payload.timestamp}")
    print(f"Last run: {payload.last_timestamp}")
    print(f"Next run: {payload.upcoming}")
    print(f"Schedule ID: {payload.schedule_id}")
    
    return {"executed_at": payload.timestamp}

Cron expressions

Cron format: minute hour day month day_of_week Common patterns:
# Every hour
schedule="0 * * * *"

# Every day at 9 AM
schedule="0 9 * * *"

# Every Monday at 8 AM
schedule="0 8 * * 1"

# Every 15 minutes
schedule="*/15 * * * *"

# First day of every month at midnight
schedule="0 0 1 * *"

# Weekdays at 5 PM
schedule="0 17 * * 1-5"

# Every 6 hours
schedule="0 */6 * * *"
Cron syntax:
  • * - Any value
  • */n - Every n units
  • n-m - Range from n to m
  • n,m - List of values
Field ranges:
  • Minute: 0-59
  • Hour: 0-23
  • Day of month: 1-31
  • Month: 1-12
  • Day of week: 0-6 (Sunday = 0)

Use appropriate cron intervals

# ✅ GOOD: Reasonable intervals
schedule="0 */6 * * *"  # Every 6 hours
schedule="0 9 * * *"    # Daily at 9 AM

# ❌ BAD: Too frequent (use events or queues instead)
schedule="* * * * *"    # Every minute

Per-user schedules

Use the key parameter to create separate schedules for each user or entity:
from polos import schedules, PolosClient

async def setup_user_schedule(client, user_id: str, preferences: dict):
    # Create/update per-user schedule
    schedule_id = await schedules.create(
        client,
        workflow="daily-digest",
        cron="0 8 * * *",  # 8 AM
        timezone=preferences["timezone"],
        key=user_id  # Unique per user
    )
    
    return schedule_id

client = PolosClient()

# Each user gets their own schedule
await setup_user_schedule(client, "user_123", {"timezone": "America/New_York"})
await setup_user_schedule(client, "user_456", {"timezone": "Europe/London"})
In the workflow:
@workflow(id="daily-digest", schedule=True)
async def daily_digest(ctx: WorkflowContext, payload: SchedulePayload):
    user_id = payload.key  # Get user from schedule key
    
    # Fetch user-specific data
    user = await ctx.step.run("get_user", get_user, user_id)
    digest = await ctx.step.run("generate", create_digest, user)
    await ctx.step.run("send", send_email, user.email, digest)
    
    return {"user_id": user_id, "sent": True}
Key behavior:
  • If a schedule with the same workflow and key exists, it’s updated (not duplicated)
  • Use key="global" (default) for system-wide schedules
  • Use key=user_id for per-user schedules

Timezones

Always specify timezones for scheduled workflows to avoid confusion:
@workflow(
    id="morning-report",
    schedule={
        "cron": "0 9 * * *",
        "timezone": "America/New_York"  # Runs at 9 AM Eastern
    }
)
async def morning_report(ctx: WorkflowContext, payload: SchedulePayload):
    # Runs at 9 AM Eastern time, adjusts for DST automatically
    return {"executed_at": payload.timestamp}
Common timezones:
  • "UTC" - Coordinated Universal Time
  • "America/New_York" - US Eastern
  • "America/Los_Angeles" - US Pacific
  • "Europe/London" - UK
  • "Asia/Tokyo" - Japan
  • "Australia/Sydney" - Australia

Managing schedules via API

Create or update a schedule

from polos import schedules, PolosClient

client = PolosClient()

# Global schedule (one per workflow)
schedule_id = await schedules.create(
    client,
    workflow="cleanup-task",
    cron="0 3 * * *",  # 3 AM daily
    timezone="UTC",
    key="global"
)

# Per-user schedule
schedule_id = await schedules.create(
    client,
    workflow="user-notification",
    cron="0 12 * * *",  # Noon daily
    timezone="America/Chicago",
    key="user_789"
)
Update existing schedule:
# Calling create() with the same workflow and key updates it
schedule_id = await schedules.create(
    client,
    workflow="user-notification",
    cron="0 15 * * *",  # Change to 3 PM
    timezone="America/Chicago",
    key="user_789"  # Same key = update
)