import asyncio
from polos import workflow, WorkflowContext, events, PolosClient
from datetime import datetime
from pydantic import BaseModel
class ChargeRequest(BaseModel):
customer_id: str
amount: float
description: str
class ChargeResponse(BaseModel):
status: str
reason: Optional[str] = None # Used for rejections
charge_id: Optional[str] = None
amount: Optional[float] = 0.0
@workflow
async def charge_customer_workflow(ctx: WorkflowContext, input: ChargeRequest):
# Validate the charge
validation = await ctx.step.run(
"validate_charge",
validate_charge_request,
input
)
if not validation.is_valid:
return ChargeResponse(status=status, reason=validation.error)
# For charges over $1000, require approval
if input.amount > 1000:
approval = await ctx.step.suspend(
step_key="high_value_approval",
data={
"customer_id": input.customer_id,
"amount": input.amount,
"description": input.description,
"threshold_exceeded": True
},
timeout=7200 # 2 hours
)
if not approval.data.get("approved"):
return ChargeResponse(
status=status,
reason=approval.get("reason", "Not approved")
)
# Execute the charge
charge_result = await ctx.step.run(
"execute_charge",
charge_stripe,
input
)
# Send confirmation
await ctx.step.run(
"send_confirmation",
send_email,
customer_id=input.customer_id,
charge_id=charge_result.charge_id
)
return ChargeResponse(
status="completed",
charge_id=charge_result.charge_id,
amount=input.amount
)
# Start workflow
async def main():
client = PolosClient()
handle = await charge_customer_workflow.invoke(
client,
ChargeRequest(
customer_id="cust_123",
amount=5000.00,
description="Enterprise annual subscription"
))
print(f"Charge workflow started: {handle.id}")
# Listen for approval request
suspend_topic = f"high_value_approval/{handle.id}"
async for event in events.stream_topic(suspend_topic):
if event.event_type == "suspend":
print("\n⚠️ High-value charge requires approval:")
print(f" Customer: {event.data['customer_id']}")
print(f" Amount: ${event.data['amount']}")
print(f" Description: {event.data['description']}")
# Send Slack notification with approval URL
approval_url = f"https://admin.example.com/approve/{handle.id}"
await send_slack_message(
channel="#financial-approvals",
message=f"High-value charge requires approval: ${event.data['amount']}",
blocks=[{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Charge Approval Required*\nAmount: ${event.data['amount']}\nCustomer: {event.data['customer_id']}\n<{approval_url}|Approve or Reject>"
}
}]
)
break
# Wait for workflow completion
result = await handle.result()
print(f"\n✅ Charge completed: {result.charge_id}")
if __name__ == "__main__":
asyncio.run(main())