TriggerFlow 编排 Playbook
这篇 playbook 讲的不是“怎么把几个 chunk 串起来”,而是: 真实工程里,TriggerFlow 应该默认按 Async First 去组织。
前置知识
场景
多步流程需要条件分支、并发、异步状态信号,以及可以被 UI 或服务观察的中间态。
为什么这里默认 Async First
- chunk 本身就是很自然的异步边界
runtime_stream和外部消费者通常是异步关系- 一旦模型结果要提前驱动后续工作,就会进入
instant + async_emit(...)组合
编排能力地图
推荐模式
- async chunk 负责显式流程步骤
- 并发用
batch(...)/for_each(...) - 中间态通过
async_put_into_stream(...)推出 - 结构化早到字段通过
instant + async_emit(...)继续派发
完整代码
python
import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData
flow = TriggerFlow()
@flow.chunk("normalize")
async def normalize(data: TriggerFlowRuntimeData):
topic = str(data.value).strip()
data.state.set("topic", topic)
await data.async_put_into_stream({"stage": "normalized", "topic": topic})
return topic
@flow.chunk("fetch_facts")
async def fetch_facts(data: TriggerFlowRuntimeData):
await asyncio.sleep(0.05)
await data.async_put_into_stream({"stage": "facts_ready", "topic": data.value})
return f"facts({data.value})"
@flow.chunk("fetch_risks")
async def fetch_risks(data: TriggerFlowRuntimeData):
await asyncio.sleep(0.03)
await data.async_put_into_stream({"stage": "risks_ready", "topic": data.value})
return f"risks({data.value})"
@flow.chunk("compile_report")
async def compile_report(data: TriggerFlowRuntimeData):
topic = data.state.get("topic")
report = {
"topic": topic,
"facts": data.value.get("fetch_facts"),
"risks": data.value.get("fetch_risks"),
}
await data.async_put_into_stream({"stage": "compiled", "report": report})
await data.async_stop_stream()
return report
flow.to(normalize)
flow.when({"runtime_data": "topic"}).batch(fetch_facts, fetch_risks, concurrency=2).to(compile_report).end()消费侧:
python
async def main():
execution = flow.create_execution(concurrency=2)
async def watch_stream():
async for item in execution.get_async_runtime_stream("Agently TriggerFlow", timeout=5):
print("STREAM:", item)
stream_task = asyncio.create_task(watch_stream())
result = await execution.async_start("Agently TriggerFlow")
await stream_task
print("RESULT:", result)
asyncio.run(main())验证点
- 并发任务都在 async chunk 中显式执行
runtime_stream可持续输出阶段性事件- 最终结果和中间态观察链路分离
继续往前走的升级点
如果你的并发步骤里还要嵌入模型流式字段,不要停留在当前模式。继续组合:
response.get_async_generator(type="instant")await data.async_emit(...)await data.async_put_into_stream(...)
这就是当前 TriggerFlow 最值得推荐的实践方向。
Related Skills(可选)
agently-triggerflowagently-triggerflow-model-integration