完整业务应用、UI 集成、协议对接
到了最后一个阶段!前面七个阶段你已经掌握了 Pydantic AI 的所有核心技能,现在要把它们串起来,做真正的业务应用。
这个阶段我们学四件大事:
这是一个真实的生产级项目,展示如何把 Pydantic AI 用到正式业务中
想象一下这个场景:你们公司有个公开的 Slack 社区,每天都有新人加入。你想知道这些新人是谁、在哪家公司工作、有没有可能成为你的客户。
以前要人工一个个查,现在用 AI 自动搞定:
slack_lead_qualifier/
├── models.py # 数据模型:Profile、Analysis
├── agent.py # AI 代理:搜索+分析
├── app.py # FastAPI 应用:接收 webhook
├── functions.py # 业务逻辑:处理+通知
├── slack.py # Slack API:发消息
├── store.py # 数据存储:Modal Dict
└── modal.py # 部署配置:无服务器
先想清楚要处理什么数据:
from typing import Annotated, Any
from annotated_types import Ge, Le
from pydantic import BaseModel
from pydantic_ai import format_as_xml
class Profile(BaseModel):
"""新加入的用户信息"""
first_name: str | None = None
last_name: str | None = None
display_name: str | None = None
email: str
def as_prompt(self) -> str:
"""转成 XML 格式,方便 AI 理解"""
return format_as_xml(self, root_tag='profile')
class Analysis(BaseModel):
"""AI 生成的分析结果"""
profile: Profile
organization_name: str # 公司名
organization_domain: str # 公司网站
job_title: str # 职位
relevance: Annotated[int, Ge(1), Le(5)] # 相关度评分 1-5
summary: str # 一句话总结
def as_slack_blocks(self) -> list[dict[str, Any]]:
"""转成 Slack 消息格式"""
return [
{
'type': 'markdown',
'text': (
f'[{self.profile.display_name}]'
f'(mailto:{self.profile.email}), '
f'{self.job_title} at '
f'[**{self.organization_name}**]'
f'(https://{self.organization_domain})'
),
},
{
'type': 'markdown',
'text': self.summary,
},
]
format_as_xml 在阶段四讲过,它把 Pydantic 模型转成 XML,让 AI 能更准确地理解结构化数据。Annotated[int, Ge(1), Le(5)] 限制评分只能是 1 到 5 分。
这是核心!代理拿到用户信息后,用 DuckDuckGo 去搜索,然后生成分析报告:
from types import NoneType
from pydantic_ai import Agent, NativeOutput
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool
from .models import Analysis, Profile
agent = Agent(
'openai:gpt-4o',
instructions="""
当有新人加入 Slack 社区时,请做一个简短的背景调查:
1. 他是谁:职业角色、项目经验(LinkedIn、GitHub等)
2. 在哪工作:公司名和业务领域
3. 相关度评分(1-5):评估他成为客户的可能性
1 = 基本不相关,5 = 非常匹配
用 DuckDuckGo 搜索工具去调查。
如果信息不足以做出判断,返回 None。
""",
tools=[duckduckgo_search_tool()],
# 两种可能的输出:分析结果 或 None
output_type=NativeOutput([Analysis, NoneType]),
)
async def analyze_profile(profile: Profile) -> Analysis | None:
"""分析一个用户的资料"""
result = await agent.run(profile.as_prompt())
return result.output
NativeOutput([Analysis, NoneType]) —— 联合类型输出,搜不到足够信息时返回 Noneduckduckgo_search_tool() —— 内置的搜索工具,不需要 API KeySlack 会通过 webhook 通知我们有新人加入:
from typing import Any
from fastapi import FastAPI, HTTPException, status
from .models import Profile
app = FastAPI()
@app.post('/')
async def process_webhook(payload: dict[str, Any]) -> dict[str, Any]:
# Slack 的 URL 验证(首次配置时会调用)
if payload['type'] == 'url_verification':
return {'challenge': payload['challenge']}
# 有新成员加入
if (
payload['type'] == 'event_callback'
and payload['event']['type'] == 'team_join'
):
profile = Profile.model_validate(
payload['event']['user']['profile']
)
# 异步处理,不阻塞 webhook 响应
process_slack_member(profile)
return {'status': 'OK'}
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY
)
把分析和通知串起来:
from .agent import analyze_profile
from .models import Profile
from .slack import send_slack_message
from .store import AnalysisStore
NEW_LEAD_CHANNEL = '#new-slack-leads'
async def process_slack_member(profile: Profile):
"""处理新成员:分析 -> 存储 -> 通知"""
# 1. AI 分析
analysis = await analyze_profile(profile)
if analysis is None:
return # 信息不足,跳过
# 2. 存储结果
await AnalysisStore.add(analysis)
# 3. 发送 Slack 通知
await send_slack_message(
NEW_LEAD_CHANNEL,
[
{
'type': 'header',
'text': {
'type': 'plain_text',
'text': f'新成员评分 {analysis.relevance}/5',
},
},
{'type': 'divider'},
*analysis.as_slack_blocks(),
],
)
用 Modal 做无服务器部署,自带定时任务:
import modal
image = modal.Image.debian_slim(python_version='3.13').pip_install(
'pydantic',
'pydantic_ai_slim[openai,duckduckgo]',
'logfire[httpx,fastapi]',
'fastapi[standard]',
)
app = modal.App(
name='slack-lead-qualifier',
image=image,
secrets=[
modal.Secret.from_name('openai'),
modal.Secret.from_name('slack'),
],
)
@app.function(min_containers=1) # 至少保持 1 个容器热启动
@modal.asgi_app()
def web_app():
from .app import app as _app
return _app
# 每天早上 8 点自动发送汇总
@app.function(schedule=modal.Cron('0 8 * * *'))
async def send_daily_summary():
from .functions import send_daily_summary as _send_daily_summary
await _send_daily_summary()
NativeOutput 处理"可能没结果"的场景format_as_xml 把结构化数据传给 AIAG-UI(Agent-User Interaction Protocol)是由 CopilotKit 团队推出的开放标准,它解决了一个大问题:
AI 代理怎么和前端 UI 实时通信?
以前你可能用 WebSocket 或者 SSE 自己搞,每个项目搞一套,前端和后端的约定各不相同。AG-UI 把这事标准化了 —— 流式传输、共享状态、前端工具调用、自定义事件,全都有标准。
pip install 'pydantic-ai-slim[ag-ui]'
pip install uvicorn
AG-UI 提供了从简单到复杂的三种用法:
最简单!直接创建一个 ASGI 应用:
from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui.app import AGUIApp
agent = Agent('openai:gpt-4o', instructions='你是一个有趣的助手!')
# 一行代码,搞定!
app = AGUIApp(agent)
启动服务:
uvicorn my_app:app --port 9000
如果你已经有 FastAPI 项目,用这种方式嵌入:
from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import Response
from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui import AGUIAdapter
agent = Agent('openai:gpt-4o', instructions='你是一个有趣的助手!')
app = FastAPI()
@app.post('/chat')
async def run_agent(request: Request) -> Response:
return await AGUIAdapter.dispatch_request(
request, agent=agent
)
需要更多控制时,手动处理流:
import json
from http import HTTPStatus
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import Response, StreamingResponse
from pydantic import ValidationError
from pydantic_ai import Agent
from pydantic_ai.ui import SSE_CONTENT_TYPE
from pydantic_ai.ui.ag_ui import AGUIAdapter
agent = Agent('openai:gpt-4o', instructions='你是一个有趣的助手!')
app = FastAPI()
@app.post('/chat')
async def run_agent(request: Request) -> Response:
accept = request.headers.get('accept', SSE_CONTENT_TYPE)
try:
# 1. 解析请求
run_input = AGUIAdapter.build_run_input(
await request.body()
)
except ValidationError as e:
return Response(
content=json.dumps(e.json()),
media_type='application/json',
status_code=HTTPStatus.UNPROCESSABLE_ENTITY,
)
# 2. 创建适配器并运行
adapter = AGUIAdapter(
agent=agent,
run_input=run_input,
accept=accept,
)
event_stream = adapter.run_stream()
# 3. 编码为 SSE 流并返回
sse_stream = adapter.encode_stream(event_stream)
return StreamingResponse(sse_stream, media_type=accept)
AG-UI 不只是简单的聊天,它支持六种强大的交互模式:
最基本的用法,代理可以调用工具并流式回复:
from datetime import datetime
from zoneinfo import ZoneInfo
from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui.app import AGUIApp
agent = Agent('openai:gpt-4o-mini')
@agent.tool_plain
async def current_time(timezone: str = 'UTC') -> str:
"""获取当前时间"""
tz = ZoneInfo(timezone)
return datetime.now(tz=tz).isoformat()
app = AGUIApp(agent)
这个厉害了!AI 不只是回复文字,还能直接操控前端 UI。
比如用户说"帮我规划一个学习计划",AI 可以在前端动态创建任务列表,并逐步更新状态:
from typing import Any, Literal
from pydantic import BaseModel, Field
from ag_ui.core import EventType, StateDeltaEvent, StateSnapshotEvent
from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui.app import AGUIApp
StepStatus = Literal['pending', 'completed']
class Step(BaseModel):
description: str
status: StepStatus = 'pending'
class Plan(BaseModel):
steps: list[Step] = Field(default_factory=list)
# JSON Patch 操作(RFC 6902 标准)
class JSONPatchOp(BaseModel):
op: Literal['add', 'remove', 'replace']
path: str
value: Any = None
agent = Agent(
'openai:gpt-4o-mini',
instructions="""
规划任务时只用工具,不要发送多余的消息。
- 用 create_plan 创建计划
- 用 update_plan_step 更新步骤状态
- 不要重复描述计划内容
""",
)
@agent.tool_plain
async def create_plan(steps: list[str]) -> StateSnapshotEvent:
"""创建一个多步骤计划"""
plan = Plan(
steps=[Step(description=step) for step in steps]
)
# 返回完整状态快照 -> 前端渲染整个计划
return StateSnapshotEvent(
type=EventType.STATE_SNAPSHOT,
snapshot=plan.model_dump(),
)
@agent.tool_plain
async def update_plan_step(
index: int,
status: StepStatus | None = None,
) -> StateDeltaEvent:
"""更新某一步的状态"""
changes = []
if status is not None:
changes.append(
JSONPatchOp(
op='replace',
path=f'/steps/{index}/status',
value=status,
)
)
# 返回增量更新 -> 前端只更新变化的部分
return StateDeltaEvent(
type=EventType.STATE_DELTA,
delta=changes,
)
app = AGUIApp(agent)
StateSnapshotEvent —— 发送完整状态,前端整体替换(适合初始化)StateDeltaEvent —— 发送 JSON Patch,前端增量更新(适合小改动)
前端有状态,AI 也能读取和修改这个状态:
from pydantic import BaseModel, Field
from ag_ui.core import EventType, StateSnapshotEvent
from pydantic_ai import Agent, RunContext
from pydantic_ai.ui import StateDeps
from pydantic_ai.ui.ag_ui.app import AGUIApp
class Ingredient(BaseModel):
icon: str = '🥕'
name: str
amount: str
class Recipe(BaseModel):
ingredients: list[Ingredient] = Field(default_factory=list)
instructions: list[str] = Field(default_factory=list)
class RecipeSnapshot(BaseModel):
recipe: Recipe = Field(default_factory=Recipe)
# 关键:用 StateDeps 包装状态
agent = Agent(
'openai:gpt-4o-mini',
deps_type=StateDeps[RecipeSnapshot],
)
@agent.tool_plain
async def display_recipe(recipe: Recipe) -> StateSnapshotEvent:
"""把食谱展示给用户"""
return StateSnapshotEvent(
type=EventType.STATE_SNAPSHOT,
snapshot={'recipe': recipe},
)
@agent.instructions
async def recipe_instructions(
ctx: RunContext[StateDeps[RecipeSnapshot]],
) -> str:
"""动态指令:包含当前食谱状态"""
return f"""
你是一个食谱助手。
用 display_recipe 工具展示食谱。
当前食谱状态:
{ctx.deps.state.recipe.model_dump_json(indent=2)}
"""
# 把初始状态传给 AGUIApp
app = AGUIApp(
agent,
deps=StateDeps(RecipeSnapshot()),
)
StateDeps 是什么?有些操作需要人来确认,AG-UI 天然支持"人在回路中"的模式:
from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui.app import AGUIApp
agent = Agent(
'openai:gpt-4o-mini',
instructions="""
规划任务时:
- 用 generate_task_steps 展示建议步骤
- 等待用户确认
- 如果用户同意,确认计划
- 如果用户不同意,询问更多信息
""",
)
# 不需要特殊处理!AG-UI 协议本身就支持
# 前端会展示确认界面,用户可以接受或拒绝
app = AGUIApp(agent)
除了标准的文本和状态更新,你还可以发送完全自定义的事件:
from ag_ui.core import CustomEvent, EventType
from pydantic_ai import Agent, ToolReturn
from pydantic_ai.ui.ag_ui.app import AGUIApp
agent = Agent('openai:gpt-4o-mini')
@agent.tool_plain
async def count_things() -> ToolReturn:
"""用 ToolReturn 附带自定义事件"""
return ToolReturn(
return_value='计数事件已发送',
metadata=[
CustomEvent(
type=EventType.CUSTOM,
name='count',
value=1,
),
CustomEvent(
type=EventType.CUSTOM,
name='count',
value=2,
),
],
)
app = AGUIApp(agent)
metadata 字段携带事件、状态更新等信息,直接传递给前端。
这是最高级的模式 —— AI 的输出直接实时映射到 UI 状态,用户看到的不是文字在打,而是界面在实时变化:
from pydantic import BaseModel
from ag_ui.core import CustomEvent, EventType
from pydantic_ai import Agent, RunContext
from pydantic_ai.ui import StateDeps
from pydantic_ai.ui.ag_ui.app import AGUIApp
class DocumentState(BaseModel):
document: str = ''
agent = Agent(
'openai:gpt-4o-mini',
deps_type=StateDeps[DocumentState],
)
@agent.tool_plain
async def document_predict_state() -> list[CustomEvent]:
"""启用文档状态预测"""
return [
CustomEvent(
type=EventType.CUSTOM,
name='PredictState',
value=[{
'state_key': 'document',
'tool': 'write_document',
'tool_argument': 'document',
}],
),
]
@agent.instructions()
async def instructions(
ctx: RunContext[StateDeps[DocumentState]],
) -> str:
return f"""
你是一个文档写作助手。
写文档前必须先调用 document_predict_state 启用预测。
用 write_document 工具展示文档。
当前文档内容:
{ctx.deps.state.document}
"""
app = AGUIApp(
agent,
deps=StateDeps(DocumentState()),
)
| 事件类型 | 作用 | 适用场景 |
|---|---|---|
StateSnapshotEvent | 发送完整状态 | 初始化、大幅更新 |
StateDeltaEvent | JSON Patch 增量更新 | 局部修改 |
CustomEvent | 自定义事件 | 计数器、通知、预测配置 |
ToolReturn + metadata | 工具返回附带事件 | 任何工具内发送 UI 事件 |
启动你的 AG-UI 服务:
uvicorn my_app:app --port 9000
然后用 AG-UI 官方的调试工具测试:访问 AG-UI Dojo(https://docs.ag-ui.com/tutorials/debugging#the-ag-ui-dojo),连接到 http://localhost:9000,就能看到效果了。
如果你用 React/Next.js 做前端,Vercel AI SDK 是最流行的选择之一。Pydantic AI 原生支持 Vercel AI Data Stream Protocol,后端写 Python,前端用 React。
跟 AG-UI 一样简洁:
from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import Response
from pydantic_ai import Agent
from pydantic_ai.ui.vercel_ai import VercelAIAdapter
agent = Agent('openai:gpt-4o')
app = FastAPI()
@app.post('/chat')
async def chat(request: Request) -> Response:
return await VercelAIAdapter.dispatch_request(
request, agent=agent
)
前端(React)使用 Vercel AI SDK 的 useChat hook 连接即可。
跟 AG-UI 的三种方式类似,你也可以手动处理流:
import json
from http import HTTPStatus
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import Response, StreamingResponse
from pydantic import ValidationError
from pydantic_ai import Agent
from pydantic_ai.ui import SSE_CONTENT_TYPE
from pydantic_ai.ui.vercel_ai import VercelAIAdapter
agent = Agent('openai:gpt-4o')
app = FastAPI()
@app.post('/chat')
async def chat(request: Request) -> Response:
accept = request.headers.get('accept', SSE_CONTENT_TYPE)
try:
run_input = VercelAIAdapter.build_run_input(
await request.body()
)
except ValidationError as e:
return Response(
content=json.dumps(e.json()),
media_type='application/json',
status_code=HTTPStatus.UNPROCESSABLE_ENTITY,
)
adapter = VercelAIAdapter(
agent=agent,
run_input=run_input,
accept=accept,
)
event_stream = adapter.run_stream()
sse_stream = adapter.encode_stream(event_stream)
return StreamingResponse(sse_stream, media_type=accept)
Vercel AI 支持在流中发送额外的数据块,比如搜索结果的来源链接:
from pydantic_ai import Agent, ToolReturn
from pydantic_ai.ui.vercel_ai.response_types import (
DataChunk,
SourceUrlChunk,
)
agent = Agent('openai:gpt-4o')
@agent.tool_plain
async def search_docs(query: str) -> ToolReturn:
"""搜索文档,返回结果+来源链接"""
return ToolReturn(
return_value=f'找到 2 条关于 "{query}" 的结果',
metadata=[
# 来源 URL -- 前端可以展示为引用链接
SourceUrlChunk(
source_id='doc-1',
url='https://example.com/docs/intro',
title='入门文档',
),
# 自定义数据块 -- 前端可以自由使用
DataChunk(
type='data-search-results',
data={'query': query, 'count': 2},
),
],
)
默认支持 v5,如果你用的是 v6:
adapter = VercelAIAdapter(
agent=agent,
run_input=run_input,
accept=accept,
sdk_version=6, # 指定 v6
)
| 对比维度 | AG-UI | Vercel AI |
|---|---|---|
| 来源 | CopilotKit 团队 | Vercel 团队 |
| 前端框架 | 框架无关 | React/Next.js 为主 |
| 状态管理 | 内置共享状态 | 需自行处理 |
| 生成式 UI | 原生支持 | 有限支持 |
| 数据块 | 自定义事件 | SourceUrl/DataChunk |
| 适用场景 | 复杂交互、协作应用 | 标准聊天、内容生成 |
| 成熟度 | 较新 | 更成熟 |
A2A(Agent2Agent Protocol)是 Google 推出的开放标准,解决的是另一个问题:
不同的 AI 代理之间怎么通信?
想象一下:你有一个用 Pydantic AI 写的代理,你的合作伙伴有一个用 LangChain 写的代理,客户有一个用自研框架的代理。A2A 让它们能互相对话,不管底层用什么框架。
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o', instructions='你是一个有趣的助手!')
# 直接把 Agent 变成 A2A 服务!
app = agent.to_a2a()
启动服务:
uvicorn my_a2a:app --host 0.0.0.0 --port 8000
就这么简单,你的 Agent 现在已经是一个标准的 A2A 服务了,任何支持 A2A 协议的客户端都能和它通信。
A2A 协议围绕三个核心概念:
| 组件 | 作用 |
|---|---|
| Storage | 保存和加载任务,存储对话上下文 |
| Broker | 任务调度 —— 决定把任务交给谁处理 |
| Worker | 实际执行任务 —— 就是你的 Agent |
A2A 里有两个重要概念要区分:
context_id 标识,同一个 context_id 下的 Task 共享对话历史。# 第一个请求 - 创建新的 context
# POST /tasks/send
{
"message": {"text": "你好!我叫小明"},
"context_id": "conv-123" # 指定上下文 ID
}
# 第二个请求 - 同一个 context,Agent 记得之前的对话
# POST /tasks/send
{
"message": {"text": "我叫什么名字?"},
"context_id": "conv-123" # 同一个上下文
}
# Agent 会回答 "你叫小明"
当你用 agent.to_a2a() 时,Pydantic AI 自动帮你处理了很多事情:
context_id 的后续请求自动加载历史TextPart 工件DataPart 工件(附带 JSON Schema)当然也可以让你的 A2A 服务具备工具能力:
from pydantic_ai import Agent
agent = Agent(
'openai:gpt-4o',
instructions='你是一个天气查询助手',
)
@agent.tool_plain
async def get_weather(city: str) -> str:
"""查询城市天气"""
# 实际项目中调用天气 API
return f'{city}今天晴,25°C'
app = agent.to_a2a()
如果你的 Agent 返回结构化数据,A2A 会自动把它转成 DataPart:
from pydantic import BaseModel
from pydantic_ai import Agent
class WeatherReport(BaseModel):
city: str
temperature: float
description: str
agent = Agent(
'openai:gpt-4o',
output_type=WeatherReport,
)
app = agent.to_a2a()
# 客户端收到的是 DataPart,包含 JSON 数据和 Schema
| 特性 | 业务应用(直接集成) | AG-UI | Vercel AI | A2A |
|---|---|---|---|---|
| 对接对象 | 外部 API/服务 | 前端 UI | React 前端 | 其他 Agent |
| 通信方式 | HTTP/webhook | SSE 事件流 | SSE 数据流 | HTTP/SSE |
| 状态管理 | 自行处理 | StateDeps 共享 | 自行处理 | 自动存储 |
| 典型场景 | Bot、自动化 | 协作应用 | 聊天应用 | 多代理系统 |
| 快速接入 | 需要完整编写 | AGUIApp(agent) | dispatch_request | agent.to_a2a() |
回顾一下你掌握的技能树:
阶段一:入门基础 ✓ Agent、Tool、结构化输出
阶段二:核心概念 ✓ 依赖注入、消息历史、重试
阶段三:流式响应与UI ✓ run_stream、Web应用、Gradio
阶段四:数据与检索 ✓ SQL生成、RAG、Embedding
阶段五:多代理与工作流 ✓ 委托、Toolset、Graph
阶段六:高级特性 ✓ 思考模式、MCP、多模态
阶段七:生产化 ✓ 测试、评估、可观测性
阶段八:集成与实战 ✓ AG-UI、Vercel AI、A2A
你现在可以用 Pydantic AI 构建从简单到复杂的任何 AI 应用了。去创造吧!
StateDeps 做一个待办事项应用,AI 可以帮你管理任务列表SourceUrlChunk 做一个带引用来源的问答助手