class ParentState(WorkflowState):
tasks_started: int = 0
class ChildState(WorkflowState):
parent_id: str = ""
config: dict = {}
@workflow(state_schema=ParentState)
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
ctx.state.tasks_started += 1
# Invoke child workflow with some initial values for its state
result = await ctx.step.invoke_and_wait(
"call_child",
child_workflow,
payload={"data": input.data},
initial_state=ChildState(
parent_id=ctx.execution_id,
config={"mode": "production"}
)
)
return result
@workflow(state_schema=ChildState)
async def child_workflow(ctx: WorkflowContext, input: ChildInput):
# Access initial state
print(f"Parent ID: {ctx.state.parent_id}")
print(f"Config: {ctx.state.config}")
await ctx.step.run("process", process_data, input.data)
return {"processed": True}