Skip to content

TriggerFlow 编排 Playbook

这篇 playbook 讲的不是“怎么把几个 chunk 串起来”,而是: 真实工程里,TriggerFlow 应该默认按 Async First 去组织。

前置知识

场景

多步流程需要条件分支、并发、异步状态信号,以及可以被 UI 或服务观察的中间态。

为什么这里默认 Async First

  • chunk 本身就是很自然的异步边界
  • runtime_stream 和外部消费者通常是异步关系
  • 一旦模型结果要提前驱动后续工作,就会进入 instant + async_emit(...) 组合

编排能力地图

推荐模式

  1. async chunk 负责显式流程步骤
  2. 并发用 batch(...) / for_each(...)
  3. 中间态通过 async_put_into_stream(...) 推出
  4. 结构化早到字段通过 instant + async_emit(...) 继续派发

完整代码

python
import asyncio
from agently import TriggerFlow, TriggerFlowRuntimeData

flow = TriggerFlow()


@flow.chunk("normalize")
async def normalize(data: TriggerFlowRuntimeData):
    topic = str(data.value).strip()
    data.state.set("topic", topic)
    await data.async_put_into_stream({"stage": "normalized", "topic": topic})
    return topic


@flow.chunk("fetch_facts")
async def fetch_facts(data: TriggerFlowRuntimeData):
    await asyncio.sleep(0.05)
    await data.async_put_into_stream({"stage": "facts_ready", "topic": data.value})
    return f"facts({data.value})"


@flow.chunk("fetch_risks")
async def fetch_risks(data: TriggerFlowRuntimeData):
    await asyncio.sleep(0.03)
    await data.async_put_into_stream({"stage": "risks_ready", "topic": data.value})
    return f"risks({data.value})"


@flow.chunk("compile_report")
async def compile_report(data: TriggerFlowRuntimeData):
    topic = data.state.get("topic")
    report = {
        "topic": topic,
        "facts": data.value.get("fetch_facts"),
        "risks": data.value.get("fetch_risks"),
    }
    await data.async_put_into_stream({"stage": "compiled", "report": report})
    await data.async_stop_stream()
    return report


flow.to(normalize)
flow.when({"runtime_data": "topic"}).batch(fetch_facts, fetch_risks, concurrency=2).to(compile_report).end()

消费侧:

python
async def main():
    execution = flow.create_execution(concurrency=2)

    async def watch_stream():
        async for item in execution.get_async_runtime_stream("Agently TriggerFlow", timeout=5):
            print("STREAM:", item)

    stream_task = asyncio.create_task(watch_stream())
    result = await execution.async_start("Agently TriggerFlow")
    await stream_task
    print("RESULT:", result)


asyncio.run(main())

验证点

  • 并发任务都在 async chunk 中显式执行
  • runtime_stream 可持续输出阶段性事件
  • 最终结果和中间态观察链路分离

继续往前走的升级点

如果你的并发步骤里还要嵌入模型流式字段,不要停留在当前模式。继续组合:

  • response.get_async_generator(type="instant")
  • await data.async_emit(...)
  • await data.async_put_into_stream(...)

这就是当前 TriggerFlow 最值得推荐的实践方向。

  • agently-triggerflow
  • agently-triggerflow-model-integration