流式 + 结构化混合 Playbook
这篇 playbook 的重点不是“同时开两个流”,而是说明为什么工程落地时推荐把 async_generator + instant 当成默认实践。
前置知识
场景
既要实时给 UI 或日志推送内容,又要把结构化字段交给下游系统使用。
什么时候该用这个模式
- 前端不能长时间空白等待
- 一个请求要同时服务“用户可见内容”和“程序可消费字段”
- 你已经在异步服务或事件循环里工作
推荐模式
- 文本预览:
response.get_async_generator(type="delta") - 结构化字段:
response.get_async_generator(type="instant") - 单次请求复用:
response = ...get_response()
这里特意推荐 async 版本,而不是把同步 generator 当默认。原因是这类场景几乎都发生在:
- SSE / WebSocket
- FastAPI / worker
- 可能同时处理多个请求的服务端
数据流
完整代码
python
import asyncio
from agently import Agently
Agently.set_settings(
"OpenAICompatible",
{
"base_url": "http://localhost:11434/v1",
"model": "qwen2.5:7b",
"model_type": "chat",
},
).set_settings("request_options", {"temperature": 0.2}).set_settings("debug", False)
agent = Agently.create_agent()
response = (
agent.system("你是运营日报助手,输出结构化结果。")
.input("给出一个 3 步的增长实验建议,并提供一句话总结。")
.output(
{
"title": (str, "标题"),
"summary": (str, "一句话总结"),
"steps": [
{
"step": (str, "步骤"),
"owner": (str, "负责人角色"),
}
],
}
)
.get_response()
)
async def stream_text():
chunks = []
async for chunk in response.get_async_generator(type="delta"):
chunks.append(chunk)
return "".join(chunks)
async def stream_structured():
events = []
async for item in response.get_async_generator(type="instant"):
if item.is_complete:
events.append(f"{item.path} => {item.value}")
return events
async def main():
text_stream, structured_events = await asyncio.gather(
stream_text(),
stream_structured(),
)
final_data = await response.async_get_data()
print("TEXT_STREAM:")
print(text_stream)
print("\nSTRUCTURED_STREAM:")
for event in structured_events:
print(event)
print("\nFINAL_DATA:")
print(final_data)
if __name__ == "__main__":
asyncio.run(main())为什么这是推荐实践
- 一个 response 同时支撑多种消费方式,不会重复发请求
instant让字段级更新和下游逻辑可以提前开始- async 版本更适合接到真实服务里,而不是停留在 demo
如果你还要继续升级
当结构化字段一完成就要继续触发工作流时,不要在这一页停下。直接进入:
Related Skills(可选)
agently-model-responseagently-output-controlagently-triggerflow-model-integration