Pydantic AI 教程 · 阶段三

流式响应与 UI

让你的 AI 应用从"等半天蹦出一坨"变成"像打字一样一个字一个字出来"。

1流式文本输出:打字机效果

为什么需要流式?

如果你用过 ChatGPT,就知道它的回答是一个字一个字打出来的,而不是等半天一次性蹦出来。这就是流式输出

好处很明显:

最简单的流式输出

import asyncio
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    async with agent.run_stream('给我讲一个关于程序员的笑话') as response:
        async for text in response.stream_text():
            print(text)
            # 第 1 次:从前
            # 第 2 次:从前有一个程序员
            # 第 3 次:从前有一个程序员,他写了
            # ...每次都是到目前为止的完整文本

asyncio.run(main())
注意看输出: 每次拿到的 text到目前为止的完整文本,不是只拿到新增的部分。就像看人打字,你看到的永远是完整的一句话,只是越来越长。

只拿新增的部分:delta 模式

如果你只想拿到每次新增的几个字(比如用来拼接或统计),加一个 delta=True

async with agent.run_stream('你好') as response:
    async for chunk in response.stream_text(delta=True):
        print(chunk, end='', flush=True)
        # 你
        # 好
        # !
        # 很
        # 高兴
        # 认识你

这样每次拿到的就是新增的那几个字,适合在终端里模拟打字效果。

同步版本:不想写 async

如果你的代码不是异步的,有同步版本可以用:

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

with agent.run_stream_sync('你好') as response:
    for text in response.stream_text():
        print(text)

用 Rich 实时渲染 Markdown

AI 返回的内容经常包含 Markdown(标题、代码块、列表等)。用 Rich 库可以实时渲染:

import asyncio
from rich.console import Console
from rich.live import Live
from rich.markdown import Markdown
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')
console = Console()

async def main():
    with Live('', console=console, vertical_overflow='visible') as live:
        async with agent.run_stream('用 Python 写一个快速排序,加上详细注释') as result:
            async for text in result.stream_output():
                live.update(Markdown(text))

asyncio.run(main())

效果:终端里会像 ChatGPT 一样,实时显示格式化的 Markdown 内容,代码块有高亮,标题有格式。


2流式结构化输出

stream_text() vs stream_output()

方法返回什么适用场景
stream_text()字符串纯文本聊天、Markdown 渲染
stream_output()output_type 对应的对象结构化数据、TypedDict、Pydantic 模型

output_typestr(默认)时,两者效果一样。当你定义了结构化输出类型时,stream_output() 会逐步构建出部分验证的对象。

流式返回 TypedDict

想象你让 AI 生成一个表格数据,但不想等它全部生成完再显示——可以边生成边显示:

import asyncio
from typing import TypedDict, NotRequired
from pydantic_ai import Agent

class City(TypedDict):
    name: str
    country: str
    population: NotRequired[int]
    description: NotRequired[str]

agent = Agent('openai:gpt-4o', output_type=list[City])

async def main():
    async with agent.run_stream('列出中国 5 个最大的城市') as result:
        async for cities in result.stream_output(debounce_by=0.01):
            # cities 是一个逐步填充的 list[City]
            print(f'已收到 {len(cities)} 个城市')
            for city in cities:
                name = city.get('name', '...')
                pop = city.get('population', '加载中')
                print(f'  - {name}: {pop}')
            print('---')

asyncio.run(main())

输出会是这样的:

已收到 1 个城市
  - 上海: 加载中
---
已收到 1 个城市
  - 上海: 24870000
---
已收到 2 个城市
  - 上海: 24870000
  - 北京: 加载中
---
...逐步填充,直到 5 个城市全部完成
关键点:
  • NotRequired 标记可以还没收到的字段
  • debounce_by=0.01 控制更新频率(秒),防止刷新太快
  • 每次拿到的都是到目前为止的完整对象,只是有些字段可能还没填充

实时表格:用 Rich 渲染流式数据

import asyncio
from typing import TypedDict, NotRequired
from rich.console import Console
from rich.live import Live
from rich.table import Table
from pydantic_ai import Agent

class Whale(TypedDict):
    name: str
    length: float
    weight: NotRequired[float]
    ocean: NotRequired[str]

agent = Agent('openai:gpt-4o', output_type=list[Whale])
console = Console()

async def main():
    with Live(console=console) as live:
        async with agent.run_stream('生成 5 种鲸鱼的信息') as result:
            async for whales in result.stream_output(debounce_by=0.01):
                table = Table(title='鲸鱼百科')
                table.add_column('名称')
                table.add_column('长度(米)')
                table.add_column('体重(吨)')
                table.add_column('栖息海洋')
                for whale in whales:
                    table.add_row(
                        whale['name'],
                        f'{whale["length"]:.0f}',
                        f'{w:.0f}' if (w := whale.get('weight')) else '...',
                        whale.get('ocean', '...'),
                    )
                live.update(table)

asyncio.run(main())

效果:终端里会出现一个动态更新的表格,新行逐步出现,每行的字段也是逐步填充。


3Web 应用集成:FastAPI + SSE

最简单的方式:to_web()

Pydantic AI 内置了一个一行搞定的 Web UI:

from pydantic_ai import Agent

agent = Agent(
    'openai:gpt-4o',
    instructions='你是一个乐于助人的中文助手。'
)

app = agent.to_web()

# 运行: uvicorn my_app:app --host 127.0.0.1 --port 7932
# 然后浏览器打开 http://127.0.0.1:7932

这样就有了一个带聊天 UI 的 Web 应用。但如果你想自己控制前端和后端逻辑,就需要自己写。

自己用 FastAPI 构建聊天 API

下面是一个完整的例子:用 FastAPI 做后端,实现流式聊天 + 消息持久化。

后端:流式 SSE 接口

import json
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic_ai import Agent
from pydantic_ai.messages import (
    ModelMessage,
    ModelMessagesTypeAdapter,
)

app = FastAPI()
agent = Agent('openai:gpt-4o', instructions='你是一个友好的中文助手。')

# 简单的内存存储(生产环境换成数据库)
chat_messages: list[ModelMessage] = []

@app.post('/chat')
async def chat(prompt: str):
    async def stream():
        nonlocal chat_messages

        # 带着历史消息一起发给 AI
        async with agent.run_stream(
            prompt,
            message_history=chat_messages
        ) as result:
            # 流式返回文本
            async for text in result.stream_text(delta=True):
                yield json.dumps({'text': text}) + '\n'

        # 把新消息存起来
        chat_messages.extend(result.new_messages())

    return StreamingResponse(stream(), media_type='text/plain')

前端:读取 SSE 流

<script>
async function sendMessage(prompt) {
    const response = await fetch('/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ prompt })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const lines = decode(value).split('\n');
        for (const line of lines) {
            if (line.trim()) {
                const data = JSON.parse(line);
                // 把 data.text 追加到聊天界面
                appendToChat(data.text);
            }
        }
    }
}
</script>

消息持久化:保存到数据库

聊天记录不能只在内存里,服务器重启就没了。下面用 SQLite 做持久化:

import sqlite3
from pydantic_ai.messages import ModelMessage, ModelMessagesTypeAdapter
from pydantic_core import to_json

# 初始化数据库
def init_db():
    conn = sqlite3.connect('chat.db')
    conn.execute('''
        CREATE TABLE IF NOT EXISTS messages (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            conversation_id TEXT NOT NULL,
            message_list BLOB NOT NULL
        )
    ''')
    conn.commit()
    return conn

db = init_db()

# 保存消息
def save_messages(conversation_id: str, messages: list[ModelMessage]):
    json_bytes = to_json(messages)
    db.execute(
        'INSERT INTO messages (conversation_id, message_list) VALUES (?, ?)',
        (conversation_id, json_bytes),
    )
    db.commit()

# 读取消息
def load_messages(conversation_id: str) -> list[ModelMessage]:
    cursor = db.execute(
        'SELECT message_list FROM messages WHERE conversation_id = ? ORDER BY id',
        (conversation_id,),
    )
    messages: list[ModelMessage] = []
    for (blob,) in cursor.fetchall():
        messages.extend(ModelMessagesTypeAdapter.validate_json(blob))
    return messages

在聊天接口里使用:

@app.post('/chat')
async def chat(conversation_id: str, prompt: str):
    async def stream():
        # 从数据库加载历史
        history = load_messages(conversation_id)

        async with agent.run_stream(prompt, message_history=history) as result:
            async for text in result.stream_text(delta=True):
                yield json.dumps({'text': text}) + '\n'

        # 保存本轮新消息
        save_messages(conversation_id, result.new_messages())

    return StreamingResponse(stream(), media_type='text/plain')

4Gradio UI:几行代码搞定聊天界面

Gradio 是什么?

Gradio 是一个 Python 库,几行代码就能生成一个 Web 界面。非常适合 AI 应用的演示和原型。

最简单的 Gradio 聊天

pip install gradio
import asyncio
import gradio as gr
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', instructions='你是一个友好的助手,用中文回答。')

async def chat(message: str, history: list[dict]):
    """Gradio 聊天回调函数。"""
    result = await agent.run(message)
    return result.output

demo = gr.ChatInterface(fn=chat, title='我的 AI 助手')
demo.launch()

运行后浏览器会自动打开一个聊天界面,开箱即用。

带流式输出的 Gradio 聊天

上面那个是等 AI 说完才显示,下面加上流式输出:

import gradio as gr
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', instructions='你是一个友好的助手,用中文回答。')

async def chat_stream(message: str, history: list[dict]):
    """流式输出版本。"""
    async with agent.run_stream(message) as result:
        async for text in result.stream_text():
            yield text  # Gradio 会自动处理流式显示

demo = gr.ChatInterface(fn=chat_stream, title='我的 AI 助手(流式版)')
demo.launch()

只改了两个地方:把 run 换成 run_stream,把 return 换成 yield

完整版:带工具调用可视化

这是一个天气助手的完整例子,能在界面上展示 AI 调用了哪些工具:

import gradio as gr
from pydantic_ai import Agent
from pydantic_ai.messages import ToolCallPart, ToolReturnPart

agent = Agent(
    'openai:gpt-4o',
    instructions='你是一个天气助手。查询天气时请使用工具。'
)

@agent.tool_plain
def get_weather(city: str) -> str:
    """查询指定城市的天气。

    Args:
        city: 城市名称
    """
    # 这里用假数据演示,实际可以调用天气 API
    weathers = {'北京': '晴天 25°C', '上海': '多云 22°C', '广州': '小雨 28°C'}
    return weathers.get(city, f'{city}: 暂无数据')

# 存储消息历史
past_messages = []

async def stream_from_agent(prompt: str, chatbot: list[dict]):
    global past_messages

    # 1. 添加用户消息
    chatbot.append({'role': 'user', 'content': prompt})
    yield gr.Textbox(interactive=False, value=''), chatbot

    # 2. 流式运行 Agent
    async with agent.run_stream(prompt, message_history=past_messages) as result:
        # 3. 展示工具调用(在流式文本之前)
        for message in result.new_messages():
            for part in message.parts:
                if isinstance(part, ToolCallPart):
                    chatbot.append({
                        'role': 'assistant',
                        'content': f'参数: {part.args_as_json_str()}',
                        'metadata': {'title': f'🧰 调用工具: {part.tool_name}'}
                    })
                    yield gr.skip(), chatbot

        # 4. 流式输出最终回答
        chatbot.append({'role': 'assistant', 'content': ''})
        async for text in result.stream_text():
            chatbot[-1]['content'] = text
            yield gr.skip(), chatbot

    # 5. 保存消息历史
    past_messages = result.all_messages()
    yield gr.Textbox(interactive=True), gr.skip()

# 构建界面
with gr.Blocks() as demo:
    gr.Markdown('# 🌤️ 天气助手')
    chatbot = gr.Chatbot(type='messages')
    text_input = gr.Textbox(placeholder='问我天气,比如:北京今天天气怎么样?')
    text_input.submit(stream_from_agent, [text_input, chatbot], [text_input, chatbot])

demo.launch()
运行后你会看到:
  1. 输入"北京天气怎么样"
  2. 界面显示一个折叠的工具调用卡片(🧰 调用工具: get_weather)
  3. 然后 AI 的回答像打字一样一个字一个字出来

🎉 阶段三总结

恭喜!你已经掌握了 Pydantic AI 的流式输出和 UI 集成:

功能方法适用场景
流式文本run_stream() + stream_text()聊天、Markdown 渲染
增量文本stream_text(delta=True)打字机效果、拼接
流式结构化数据stream_output()动态表格、TypedDict
同步流式run_stream_sync()脚本、非异步环境
一键 Web UIagent.to_web()快速演示
FastAPI + SSEStreamingResponse生产级 Web 应用
Gradio 聊天gr.ChatInterface + yield原型、演示

stream_text() vs stream_output() 怎么选?

判断 output_typestr(默认)?
stream_text()
判断 output_type 是 Pydantic 模型 / TypedDict?
stream_output()
判断 只想拿新增的文字?
stream_text(delta=True)

✍️ 练习建议

  1. 终端打字机:用 stream_text(delta=True) + print(end='', flush=True) 做一个终端聊天机器人
  2. Rich 表格:定义一个 list[Movie] 的 TypedDict,让 AI 生成电影推荐,用 Rich 实时渲染表格
  3. FastAPI 聊天:用 FastAPI + SQLite 做一个带消息持久化的聊天 API
  4. Gradio 原型:给你之前写的任何一个 Agent 加上 Gradio 界面

🚀 下一步:阶段四 · 数据与检索

深入讲解 SQL 生成、数据分析和 RAG,
让 AI 能操作你的真实数据。