Skip to content

流式 + 结构化混合 Playbook

这篇 playbook 的重点不是“同时开两个流”,而是说明为什么工程落地时推荐把 async_generator + instant 当成默认实践。

前置知识

场景

既要实时给 UI 或日志推送内容,又要把结构化字段交给下游系统使用。

什么时候该用这个模式

  • 前端不能长时间空白等待
  • 一个请求要同时服务“用户可见内容”和“程序可消费字段”
  • 你已经在异步服务或事件循环里工作

推荐模式

  • 文本预览: response.get_async_generator(type="delta")
  • 结构化字段: response.get_async_generator(type="instant")
  • 单次请求复用: response = ...get_response()

这里特意推荐 async 版本,而不是把同步 generator 当默认。原因是这类场景几乎都发生在:

  • SSE / WebSocket
  • FastAPI / worker
  • 可能同时处理多个请求的服务端

数据流

完整代码

python
import asyncio
from agently import Agently

Agently.set_settings(
    "OpenAICompatible",
    {
        "base_url": "http://localhost:11434/v1",
        "model": "qwen2.5:7b",
        "model_type": "chat",
    },
).set_settings("request_options", {"temperature": 0.2}).set_settings("debug", False)

agent = Agently.create_agent()

response = (
    agent.system("你是运营日报助手,输出结构化结果。")
    .input("给出一个 3 步的增长实验建议,并提供一句话总结。")
    .output(
        {
            "title": (str, "标题"),
            "summary": (str, "一句话总结"),
            "steps": [
                {
                    "step": (str, "步骤"),
                    "owner": (str, "负责人角色"),
                }
            ],
        }
    )
    .get_response()
)


async def stream_text():
    chunks = []
    async for chunk in response.get_async_generator(type="delta"):
        chunks.append(chunk)
    return "".join(chunks)


async def stream_structured():
    events = []
    async for item in response.get_async_generator(type="instant"):
        if item.is_complete:
            events.append(f"{item.path} => {item.value}")
    return events


async def main():
    text_stream, structured_events = await asyncio.gather(
        stream_text(),
        stream_structured(),
    )
    final_data = await response.async_get_data()

    print("TEXT_STREAM:")
    print(text_stream)
    print("\nSTRUCTURED_STREAM:")
    for event in structured_events:
        print(event)
    print("\nFINAL_DATA:")
    print(final_data)


if __name__ == "__main__":
    asyncio.run(main())

为什么这是推荐实践

  • 一个 response 同时支撑多种消费方式,不会重复发请求
  • instant 让字段级更新和下游逻辑可以提前开始
  • async 版本更适合接到真实服务里,而不是停留在 demo

如果你还要继续升级

当结构化字段一完成就要继续触发工作流时,不要在这一页停下。直接进入:

  • agently-model-response
  • agently-output-control
  • agently-triggerflow-model-integration