Skip to main content
Queues control how many workflow instances can run simultaneously. Use queues to rate-limit execution, manage resource contention, and prevent overload.

Why use queues?

Queues solve common concurrency challenges: Rate limiting:
  • Prevent overwhelming external APIs (OpenAI, Stripe, etc.)
  • Control database connection usage
  • Manage resource consumption
Ordered execution:
  • Process per-user tasks sequentially
  • Maintain ordering for critical operations
  • Prevent race conditions
Resource protection:
  • Limit concurrent access to shared resources
  • Prevent thundering herd problems
  • Control system load

Default queue behavior

Every workflow automatically gets its own queue:
from polos import workflow, WorkflowContext

@workflow
async def process_data(ctx: WorkflowContext, input: dict):
    # Automatically assigned to queue named "process-data"
    # Default concurrency limit from environment
    result = await ctx.step.run("process", process, input)
    return result
Default queue properties:
  • Queue name: Same as workflow ID
  • Concurrency limit: From environment variable (POLOS_DEFAULT_CONCURRENCY_LIMIT)
  • Behavior: Unlimited executions queue up, but only N run simultaneously

Custom queue limits

Set a specific concurrency limit for a workflow
@workflow(queue={"concurrency_limit": 3})
async def rate_limited_workflow(ctx: WorkflowContext, input: dict):
    # Only 3 instances run at once
    # Additional invocations queue until a slot opens
    await ctx.step.run("call_api", call_external_api, input)
    return {"status": "completed"}

Shared queues

Multiple workflows can share the same queue
from polos import queue

# Create a shared queue
openai_queue = queue("openai-api", concurrency_limit=5)

@workflow(queue=openai_queue)
async def gpt_summarizer(ctx: WorkflowContext, input: SummarizerInput):
    summary = await ctx.step.run("summarize", call_openai_gpt, input.text)
    return summary

@workflow(queue=openai_queue)
async def gpt_analyzer(ctx: WorkflowContext, input: AnalyzerInput):
    analysis = await ctx.step.run("analyze", call_openai_gpt, input.data)
    return analysis

# Both workflows share the "openai-api" queue
# Combined max 5 concurrent executions across both workflows
Use cases for shared queues:
  • Rate-limit API calls across multiple workflows
  • Control database connection pooling
  • Manage resource contention

Queue configuration options

1. Inline configuration

@workflow(queue={"concurrency_limit": 10})
async def inline_config(ctx: WorkflowContext, input: dict):
    # Queue name = "inline-config" (workflow ID)
    # Concurrency limit = 10
    pass

2. Queue name only

@workflow(queue="processing-queue")
async def named_queue(ctx: WorkflowContext, input: dict):
    # Queue name = "processing-queue"
    # Concurrency limit = default from environment
    pass

3. Queue object

from polos import queue

priority_queue = queue("priority", concurrency_limit=3)

@workflow(queue=priority_queue)
async def priority_workflow(ctx: WorkflowContext, input: dict):
    # Queue name = "priority"
    # Concurrency limit = 3
    pass

Per-entity queuing (Concurrency keys)

Use concurrency keys to create separate queues per user, tenant, or entity:
@workflow(queue="user-tasks")
async def user_task(ctx: WorkflowContext, input: dict):
    # Process user-specific task
    await ctx.step.run("process", process_user_task, input)
    return {"status": "completed"}

# Invoke with concurrency_key
client = PolosClient()
await user_task.invoke(
    client,
    payload={"task": "data"},
    concurrency_key=f"user:{user_id}"  # Separate queue per user
)
How it works:
  • Each unique concurrency_key gets its own virtual queue
  • Concurrency limit applies per key (not globally)
  • Perfect for multi-tenant systems
Example: Per-user rate limiting
# User A can have 5 concurrent tasks
await user_task.invoke(client, payload={...}, concurrency_key="user:alice")
await user_task.invoke(client, payload={...}, concurrency_key="user:alice")
await user_task.invoke(client, payload={...}, concurrency_key="user:alice")

# User B also gets 5 concurrent slots (independent from User A)
await user_task.invoke(client, payload={...}, concurrency_key="user:bob")
await user_task.invoke(client, payload={...}, concurrency_key="user:bob")

Serial execution

Force workflows to run one at a time:
@workflow(queue={"concurrency_limit": 1})
async def serial_workflow(ctx: WorkflowContext, input: dict):
    # Only 1 instance runs at a time
    # Perfect for operations that must not overlap
    await ctx.step.run("critical_section", critical_operation, input)
    return {"status": "completed"}
Use cases:
  • Database migrations
  • File system operations
  • Operations with global side effects

Queue behavior

Queuing and execution

@workflow(queue={"concurrency_limit": 2})
async def limited_workflow(ctx: WorkflowContext, input: dict):
    await ctx.step.run("work", do_work, input)
    return {"done": True}

# Invoke 5 times
client = PolosClient()
for i in range(5):
    await limited_workflow.invoke(client, {"id": i})

# Execution timeline:
# Time 0: Workflow 1 starts, Workflow 2 starts (limit = 2)
# Time 0: Workflow 3, 4, 5 queued
# Time 5: Workflow 1 completes → Workflow 3 starts
# Time 7: Workflow 2 completes → Workflow 4 starts
# Time 10: Workflow 3 completes → Workflow 5 starts
Key points:
  • Queued workflows wait until a slot opens
  • FIFO order (first in, first out)
  • No compute consumed while queued

Dynamic queue assignment

Override queue at invocation time:
@workflow(queue="default-queue")
async def flexible_workflow(ctx: WorkflowContext, input: dict):
    await ctx.step.run("process", process_data, input)
    return {"status": "completed"}

# Use default queue
client = PolosClient()
await flexible_workflow.invoke(client, {"data": "..."})

# Override with different queue
await flexible_workflow.invoke(
    client,
    {"data": "..."},
    queue="priority-queue"
)

Queue restriction

Scheduled workflows cannot specify queues.

Key takeaways

  • Default queue per workflow - Named after workflow ID
  • Concurrency limits control max simultaneous executions
  • Shared queues rate-limit across multiple workflows
  • Concurrency keys create per-entity virtual queues
  • Scheduled workflows cannot customize queues