让你的 AI 应用从"等半天蹦出一坨"变成"像打字一样一个字一个字出来"。
如果你用过 ChatGPT,就知道它的回答是一个字一个字打出来的,而不是等半天一次性蹦出来。这就是流式输出。
好处很明显:
import asyncio
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o')
async def main():
async with agent.run_stream('给我讲一个关于程序员的笑话') as response:
async for text in response.stream_text():
print(text)
# 第 1 次:从前
# 第 2 次:从前有一个程序员
# 第 3 次:从前有一个程序员,他写了
# ...每次都是到目前为止的完整文本
asyncio.run(main())
text 是到目前为止的完整文本,不是只拿到新增的部分。就像看人打字,你看到的永远是完整的一句话,只是越来越长。
如果你只想拿到每次新增的几个字(比如用来拼接或统计),加一个 delta=True:
async with agent.run_stream('你好') as response:
async for chunk in response.stream_text(delta=True):
print(chunk, end='', flush=True)
# 你
# 好
# !
# 很
# 高兴
# 认识你
这样每次拿到的就是新增的那几个字,适合在终端里模拟打字效果。
如果你的代码不是异步的,有同步版本可以用:
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o')
with agent.run_stream_sync('你好') as response:
for text in response.stream_text():
print(text)
AI 返回的内容经常包含 Markdown(标题、代码块、列表等)。用 Rich 库可以实时渲染:
import asyncio
from rich.console import Console
from rich.live import Live
from rich.markdown import Markdown
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o')
console = Console()
async def main():
with Live('', console=console, vertical_overflow='visible') as live:
async with agent.run_stream('用 Python 写一个快速排序,加上详细注释') as result:
async for text in result.stream_output():
live.update(Markdown(text))
asyncio.run(main())
效果:终端里会像 ChatGPT 一样,实时显示格式化的 Markdown 内容,代码块有高亮,标题有格式。
| 方法 | 返回什么 | 适用场景 |
|---|---|---|
stream_text() | 字符串 | 纯文本聊天、Markdown 渲染 |
stream_output() | output_type 对应的对象 | 结构化数据、TypedDict、Pydantic 模型 |
当 output_type 是 str(默认)时,两者效果一样。当你定义了结构化输出类型时,stream_output() 会逐步构建出部分验证的对象。
想象你让 AI 生成一个表格数据,但不想等它全部生成完再显示——可以边生成边显示:
import asyncio
from typing import TypedDict, NotRequired
from pydantic_ai import Agent
class City(TypedDict):
name: str
country: str
population: NotRequired[int]
description: NotRequired[str]
agent = Agent('openai:gpt-4o', output_type=list[City])
async def main():
async with agent.run_stream('列出中国 5 个最大的城市') as result:
async for cities in result.stream_output(debounce_by=0.01):
# cities 是一个逐步填充的 list[City]
print(f'已收到 {len(cities)} 个城市')
for city in cities:
name = city.get('name', '...')
pop = city.get('population', '加载中')
print(f' - {name}: {pop}')
print('---')
asyncio.run(main())
输出会是这样的:
已收到 1 个城市
- 上海: 加载中
---
已收到 1 个城市
- 上海: 24870000
---
已收到 2 个城市
- 上海: 24870000
- 北京: 加载中
---
...逐步填充,直到 5 个城市全部完成
NotRequired 标记可以还没收到的字段debounce_by=0.01 控制更新频率(秒),防止刷新太快import asyncio
from typing import TypedDict, NotRequired
from rich.console import Console
from rich.live import Live
from rich.table import Table
from pydantic_ai import Agent
class Whale(TypedDict):
name: str
length: float
weight: NotRequired[float]
ocean: NotRequired[str]
agent = Agent('openai:gpt-4o', output_type=list[Whale])
console = Console()
async def main():
with Live(console=console) as live:
async with agent.run_stream('生成 5 种鲸鱼的信息') as result:
async for whales in result.stream_output(debounce_by=0.01):
table = Table(title='鲸鱼百科')
table.add_column('名称')
table.add_column('长度(米)')
table.add_column('体重(吨)')
table.add_column('栖息海洋')
for whale in whales:
table.add_row(
whale['name'],
f'{whale["length"]:.0f}',
f'{w:.0f}' if (w := whale.get('weight')) else '...',
whale.get('ocean', '...'),
)
live.update(table)
asyncio.run(main())
效果:终端里会出现一个动态更新的表格,新行逐步出现,每行的字段也是逐步填充。
Pydantic AI 内置了一个一行搞定的 Web UI:
from pydantic_ai import Agent
agent = Agent(
'openai:gpt-4o',
instructions='你是一个乐于助人的中文助手。'
)
app = agent.to_web()
# 运行: uvicorn my_app:app --host 127.0.0.1 --port 7932
# 然后浏览器打开 http://127.0.0.1:7932
这样就有了一个带聊天 UI 的 Web 应用。但如果你想自己控制前端和后端逻辑,就需要自己写。
下面是一个完整的例子:用 FastAPI 做后端,实现流式聊天 + 消息持久化。
import json
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic_ai import Agent
from pydantic_ai.messages import (
ModelMessage,
ModelMessagesTypeAdapter,
)
app = FastAPI()
agent = Agent('openai:gpt-4o', instructions='你是一个友好的中文助手。')
# 简单的内存存储(生产环境换成数据库)
chat_messages: list[ModelMessage] = []
@app.post('/chat')
async def chat(prompt: str):
async def stream():
nonlocal chat_messages
# 带着历史消息一起发给 AI
async with agent.run_stream(
prompt,
message_history=chat_messages
) as result:
# 流式返回文本
async for text in result.stream_text(delta=True):
yield json.dumps({'text': text}) + '\n'
# 把新消息存起来
chat_messages.extend(result.new_messages())
return StreamingResponse(stream(), media_type='text/plain')
<script>
async function sendMessage(prompt) {
const response = await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decode(value).split('\n');
for (const line of lines) {
if (line.trim()) {
const data = JSON.parse(line);
// 把 data.text 追加到聊天界面
appendToChat(data.text);
}
}
}
}
</script>
聊天记录不能只在内存里,服务器重启就没了。下面用 SQLite 做持久化:
import sqlite3
from pydantic_ai.messages import ModelMessage, ModelMessagesTypeAdapter
from pydantic_core import to_json
# 初始化数据库
def init_db():
conn = sqlite3.connect('chat.db')
conn.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id TEXT NOT NULL,
message_list BLOB NOT NULL
)
''')
conn.commit()
return conn
db = init_db()
# 保存消息
def save_messages(conversation_id: str, messages: list[ModelMessage]):
json_bytes = to_json(messages)
db.execute(
'INSERT INTO messages (conversation_id, message_list) VALUES (?, ?)',
(conversation_id, json_bytes),
)
db.commit()
# 读取消息
def load_messages(conversation_id: str) -> list[ModelMessage]:
cursor = db.execute(
'SELECT message_list FROM messages WHERE conversation_id = ? ORDER BY id',
(conversation_id,),
)
messages: list[ModelMessage] = []
for (blob,) in cursor.fetchall():
messages.extend(ModelMessagesTypeAdapter.validate_json(blob))
return messages
在聊天接口里使用:
@app.post('/chat')
async def chat(conversation_id: str, prompt: str):
async def stream():
# 从数据库加载历史
history = load_messages(conversation_id)
async with agent.run_stream(prompt, message_history=history) as result:
async for text in result.stream_text(delta=True):
yield json.dumps({'text': text}) + '\n'
# 保存本轮新消息
save_messages(conversation_id, result.new_messages())
return StreamingResponse(stream(), media_type='text/plain')
Gradio 是一个 Python 库,几行代码就能生成一个 Web 界面。非常适合 AI 应用的演示和原型。
pip install gradio
import asyncio
import gradio as gr
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o', instructions='你是一个友好的助手,用中文回答。')
async def chat(message: str, history: list[dict]):
"""Gradio 聊天回调函数。"""
result = await agent.run(message)
return result.output
demo = gr.ChatInterface(fn=chat, title='我的 AI 助手')
demo.launch()
运行后浏览器会自动打开一个聊天界面,开箱即用。
上面那个是等 AI 说完才显示,下面加上流式输出:
import gradio as gr
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o', instructions='你是一个友好的助手,用中文回答。')
async def chat_stream(message: str, history: list[dict]):
"""流式输出版本。"""
async with agent.run_stream(message) as result:
async for text in result.stream_text():
yield text # Gradio 会自动处理流式显示
demo = gr.ChatInterface(fn=chat_stream, title='我的 AI 助手(流式版)')
demo.launch()
只改了两个地方:把 run 换成 run_stream,把 return 换成 yield。
这是一个天气助手的完整例子,能在界面上展示 AI 调用了哪些工具:
import gradio as gr
from pydantic_ai import Agent
from pydantic_ai.messages import ToolCallPart, ToolReturnPart
agent = Agent(
'openai:gpt-4o',
instructions='你是一个天气助手。查询天气时请使用工具。'
)
@agent.tool_plain
def get_weather(city: str) -> str:
"""查询指定城市的天气。
Args:
city: 城市名称
"""
# 这里用假数据演示,实际可以调用天气 API
weathers = {'北京': '晴天 25°C', '上海': '多云 22°C', '广州': '小雨 28°C'}
return weathers.get(city, f'{city}: 暂无数据')
# 存储消息历史
past_messages = []
async def stream_from_agent(prompt: str, chatbot: list[dict]):
global past_messages
# 1. 添加用户消息
chatbot.append({'role': 'user', 'content': prompt})
yield gr.Textbox(interactive=False, value=''), chatbot
# 2. 流式运行 Agent
async with agent.run_stream(prompt, message_history=past_messages) as result:
# 3. 展示工具调用(在流式文本之前)
for message in result.new_messages():
for part in message.parts:
if isinstance(part, ToolCallPart):
chatbot.append({
'role': 'assistant',
'content': f'参数: {part.args_as_json_str()}',
'metadata': {'title': f'🧰 调用工具: {part.tool_name}'}
})
yield gr.skip(), chatbot
# 4. 流式输出最终回答
chatbot.append({'role': 'assistant', 'content': ''})
async for text in result.stream_text():
chatbot[-1]['content'] = text
yield gr.skip(), chatbot
# 5. 保存消息历史
past_messages = result.all_messages()
yield gr.Textbox(interactive=True), gr.skip()
# 构建界面
with gr.Blocks() as demo:
gr.Markdown('# 🌤️ 天气助手')
chatbot = gr.Chatbot(type='messages')
text_input = gr.Textbox(placeholder='问我天气,比如:北京今天天气怎么样?')
text_input.submit(stream_from_agent, [text_input, chatbot], [text_input, chatbot])
demo.launch()
恭喜!你已经掌握了 Pydantic AI 的流式输出和 UI 集成:
| 功能 | 方法 | 适用场景 |
|---|---|---|
| 流式文本 | run_stream() + stream_text() | 聊天、Markdown 渲染 |
| 增量文本 | stream_text(delta=True) | 打字机效果、拼接 |
| 流式结构化数据 | stream_output() | 动态表格、TypedDict |
| 同步流式 | run_stream_sync() | 脚本、非异步环境 |
| 一键 Web UI | agent.to_web() | 快速演示 |
| FastAPI + SSE | StreamingResponse | 生产级 Web 应用 |
| Gradio 聊天 | gr.ChatInterface + yield | 原型、演示 |
output_type 是 str(默认)?stream_text()output_type 是 Pydantic 模型 / TypedDict?stream_output()stream_text(delta=True)stream_text(delta=True) + print(end='', flush=True) 做一个终端聊天机器人list[Movie] 的 TypedDict,让 AI 生成电影推荐,用 Rich 实时渲染表格