一个 Agent 不够用?那就多来几个,让它们分工合作。
想象你是个项目经理(老板 Agent),你不需要所有事都自己干。写文案让文案 Agent 干,翻译让翻译 Agent 干,你只负责协调和拍板。
这就是代理委托(Agent Delegation)—— 一个 Agent 在工具调用里启动另一个 Agent。
from pydantic_ai import Agent, RunContext, UsageLimits
# 小弟:负责生笑话(用便宜的模型)
joke_generator = Agent(
'google-gla:gemini-2.0-flash',
output_type=list[str],
instructions='根据要求生成笑话列表。',
)
# 老板:负责选最好的笑话(用聪明的模型)
joke_selector = Agent(
'openai:gpt-4o',
instructions='用 joke_factory 工具生成笑话,然后选出最好笑的一个。',
)
@joke_selector.tool
async def joke_factory(
ctx: RunContext[None], count: int
) -> list[str]:
"""生成指定数量的笑话。
Args:
ctx: 上下文
count: 要生成多少个笑话
"""
result = await joke_generator.run(
f'请生成 {count} 个笑话',
usage=ctx.usage, # 关键!把 token 用量传给小弟
)
return result.output
# 运行
result = joke_selector.run_sync(
'给我讲个笑话',
usage_limits=UsageLimits(request_limit=5),
)
print(result.output)
usage=ctx.usage —— 把老板的 token 用量传给小弟,这样 UsageLimits 能统一计算所有 Agent 的总消耗,不会超标如果老板和小弟都需要用同一个数据库连接、HTTP 客户端等资源,用 deps 传递:
from dataclasses import dataclass
import httpx
@dataclass
class SharedDeps:
http_client: httpx.AsyncClient
api_key: str
# 小弟:需要用 HTTP 客户端获取数据
data_fetcher = Agent(
'google-gla:gemini-2.0-flash',
deps_type=SharedDeps,
output_type=list[dict],
)
@data_fetcher.tool
async def fetch_data(
ctx: RunContext[SharedDeps], url: str
) -> str:
"""从指定 URL 获取数据。"""
response = await ctx.deps.http_client.get(
url, headers={'Authorization': f'Bearer {ctx.deps.api_key}'}
)
return response.text
# 老板:也需要这些依赖
analyst = Agent(
'openai:gpt-4o',
deps_type=SharedDeps,
instructions='使用 analyze_url 工具获取并分析数据。',
)
@analyst.tool
async def analyze_url(
ctx: RunContext[SharedDeps], url: str
) -> list[dict]:
"""获取 URL 数据并返回分析结果。"""
result = await data_fetcher.run(
f'获取并整理 {url} 的数据',
deps=ctx.deps, # 共享依赖
usage=ctx.usage, # 共享用量
)
return result.output
deps=ctx.deps 直接传过去就行。
joke_factory 工具usage=ctx.usage(共享计量)deps=ctx.deps(共享资源)代理委托是"老板叫小弟干活",小弟干完老板继续。但有时候你想要的是:先让 A 干完,再让 B 干,由你的代码来决定顺序。
这就是程序化交接(Programmatic Hand-Off)。
这是一个真实场景 —— 用户先搜航班,搜到了再选座位。两个步骤用不同的 Agent:
from typing import Literal
from pydantic import BaseModel, Field
from pydantic_ai import Agent, ModelMessage, RunUsage, UsageLimits
# 航班信息
class FlightDetails(BaseModel):
flight_number: str
origin: str
destination: str
price: float
# 搜索失败
class NotFound(BaseModel):
reason: str
# Agent 1:搜航班
flight_agent = Agent(
'openai:gpt-4o',
output_type=FlightDetails | NotFound, # type: ignore
instructions='根据用户需求搜索航班。',
)
@flight_agent.tool
async def search_flights(
ctx, origin: str, destination: str, date: str
) -> str:
"""搜索航班。"""
# 实际中这里调 API,这里用模拟数据
return f'{origin} → {destination} on {date}: Flight CA123, $500'
# 座位偏好
class SeatPreference(BaseModel):
row: int = Field(ge=1, le=30)
seat: Literal['A', 'B', 'C', 'D', 'E', 'F']
# Agent 2:选座位
seat_agent = Agent(
'openai:gpt-4o',
output_type=SeatPreference | NotFound, # type: ignore
instructions='理解用户的座位偏好。',
)
当用户和 Agent A 聊了几轮后切换到 Agent B,怎么让 B 知道之前的对话?用 message_history:
async def book_flight():
usage = RunUsage() # 统一跟踪所有 Agent 的 token
limits = UsageLimits(request_limit=15)
# === 步骤 1:搜航班 ===
message_history: list[ModelMessage] | None = None
for attempt in range(3): # 最多搜 3 次
user_input = input('你想从哪飞到哪?')
result = await flight_agent.run(
user_input,
message_history=message_history,
usage=usage,
usage_limits=limits,
)
if isinstance(result.output, FlightDetails):
flight = result.output
print(f'找到航班: {flight.flight_number}, ¥{flight.price}')
break
else:
print(f'没找到: {result.output.reason}')
# 关键!把之前的对话传给下一轮
message_history = result.all_messages(
output_tool_return_content='请换个搜索条件试试'
)
else:
print('3 次都没找到合适的航班')
return
# === 步骤 2:选座位 ===
confirm = input('要订这个航班吗?(y/n) ')
if confirm != 'y':
return
seat_history: list[ModelMessage] | None = None
while True:
seat_input = input('你想坐哪个位置?')
result = await seat_agent.run(
seat_input,
message_history=seat_history,
usage=usage,
usage_limits=limits,
)
if isinstance(result.output, SeatPreference):
seat = result.output
print(f'座位已选: {seat.row}{seat.seat}')
break
else:
print('没理解你的座位偏好,请再说一次')
seat_history = result.all_messages()
print(f'预订完成: {flight.flight_number}, 座位 {seat.row}{seat.seat}')
RunUsage() 共享 —— 多个 Agent 共用一个用量计数器,方便统一限制message_history 传递 —— 让 Agent 记住之前的对话,不用重复解释all_messages(output_tool_return_content=...) —— 失败时给 Agent 一个提示,让它知道要换个思路| 对比项 | 代理委托 | 程序化交接 |
|---|---|---|
| 谁控制流程 | 老板 Agent 决定 | 你的代码决定 |
| 依赖关系 | 通常相同 | 可以不同 |
| 适用场景 | Agent 需要实时获取信息 | 步骤明确的流水线 |
| 例子 | 边聊边查数据 | 先搜航班再选座位 |
前面我们都是用 @agent.tool 给 Agent 加工具。但真实项目中:
这就需要 Toolset(工具集)。
from pydantic_ai.toolsets import FunctionToolset
# 天气工具集
weather_tools = FunctionToolset()
@weather_tools.tool
def get_temperature(city: str) -> float:
"""获取指定城市的温度(摄氏度)。"""
temps = {'北京': 22.0, '上海': 25.0, '广州': 30.0}
return temps.get(city, 20.0)
@weather_tools.tool
def get_weather(city: str) -> str:
"""获取指定城市的天气状况。"""
return f'{city}今天晴转多云'
# 时间工具集
from datetime import datetime
time_tools = FunctionToolset()
@time_tools.tool
def get_current_time() -> str:
"""获取当前时间。"""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 给 Agent 装上工具集
from pydantic_ai import Agent
agent = Agent(
'openai:gpt-4o',
toolsets=[weather_tools, time_tools], # 一次装两个工具集
)
from pydantic_ai.toolsets import CombinedToolset
# 把天气和时间工具集合并成一个
all_tools = CombinedToolset([weather_tools, time_tools])
agent = Agent('openai:gpt-4o', toolsets=[all_tools])
# 只保留名字里有 "temperature" 的工具
temp_only = weather_tools.filtered(
lambda ctx, tool_def: 'temperature' in tool_def.name
)
当多个工具集里有同名工具时,加前缀解决:
combined = CombinedToolset([
weather_tools.prefixed('weather'), # weather_get_temperature
time_tools.prefixed('time'), # time_get_current_time
])
renamed = weather_tools.renamed({
'get_temperature': 'temp_celsius', # 新名字 → 旧名字的映射
})
有时候不同用户要用不同的工具。可以用 @agent.toolset 动态决定:
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
@dataclass
class UserDeps:
role: str # 'admin' 或 'user'
agent = Agent('openai:gpt-4o', deps_type=UserDeps)
@agent.toolset
def dynamic_tools(ctx: RunContext[UserDeps]):
"""根据用户角色返回不同的工具集"""
if ctx.deps.role == 'admin':
return CombinedToolset([weather_tools, time_tools, admin_tools])
else:
return weather_tools # 普通用户只能查天气
有些工具执行起来有风险(比如删除文件),需要人工确认:
# 整个工具集都需要审批
safe_tools = dangerous_tools.approval_required(
lambda ctx, tool_def, tool_args: True # 所有工具都要审批
)
# 只有特定工具需要审批
safe_tools = dangerous_tools.approval_required(
lambda ctx, tool_def, tool_args: tool_def.name.startswith('delete')
)
想知道 AI 调了什么工具、传了什么参数?用 WrapperToolset:
from pydantic_ai.toolsets import WrapperToolset
class LoggingToolset(WrapperToolset):
async def call_tool(self, name, tool_args, ctx, tool):
print(f'调用工具: {name}({tool_args})')
result = await super().call_tool(name, tool_args, ctx, tool)
print(f'结果: {result}')
return result
# 给任何工具集包一层日志
logged_tools = LoggingToolset(weather_tools)
agent = Agent('openai:gpt-4o', toolsets=[logged_tools])
有些工具执行了就无法撤回 —— 比如删文件、发邮件、转账。你肯定希望 AI 先告诉你它要干什么,你确认了再执行。
这就是延迟工具(Deferred Tools)—— AI 调用工具后不立即执行,而是暂停等人审批。
from pydantic_ai import Agent, DeferredToolRequests
agent = Agent(
'openai:gpt-4o',
output_type=[str, DeferredToolRequests], # 关键!
)
@agent.tool_plain(requires_approval=True)
def delete_file(path: str) -> str:
"""删除指定文件。"""
# os.remove(path) # 实际删除操作
return f'文件 {path} 已删除'
output_type! 加了延迟工具后,Agent 可能返回正常结果(str),也可能返回"等待审批"(DeferredToolRequests)。所以要用列表形式的 output_type。
有些文件删就删了无所谓,但重要文件要审批:
from pydantic_ai import ApprovalRequired, RunContext
PROTECTED_FILES = ['config.yml', '.env', 'database.db']
@agent.tool
def delete_file(ctx: RunContext, path: str) -> str:
"""删除指定文件。"""
# 重要文件需要审批
if path in PROTECTED_FILES and not ctx.tool_call_approved:
raise ApprovalRequired(
metadata={'reason': f'{path} 是受保护文件'}
)
return f'文件 {path} 已删除'
ctx.tool_call_approved 是关键 —— 第一次调用时是 False,审批通过后框架会再次调用,这时就是 True。
from pydantic_ai import DeferredToolResults, ToolDenied
# 第一步:运行 Agent
result = agent.run_sync('删除 config.yml 和 test.txt')
messages = result.all_messages()
# 第二步:检查是否需要审批
if isinstance(result.output, DeferredToolRequests):
print('以下操作需要审批:')
for call in result.output.approvals:
print(f' - {call.tool_name}({call.args_as_dict()})')
# 第三步:逐个审批
results = DeferredToolResults()
for call in result.output.approvals:
confirm = input(f'批准 {call.tool_name}? (y/n) ')
if confirm == 'y':
results.approvals[call.tool_call_id] = True
else:
results.approvals[call.tool_call_id] = ToolDenied(
'操作被拒绝'
)
# 第四步:带着审批结果继续运行
result = agent.run_sync(
message_history=messages,
deferred_tool_results=results,
)
print(result.output)
else:
print(result.output)
delete_file("config.yml")raise ApprovalRequiredDeferredToolRequestsresults.approvals[id] = True → 再次运行 → 工具被真正执行(ctx.tool_call_approved = True)results.approvals[id] = ToolDenied → Agent 收到拒绝信息 → 告诉用户"操作已取消"除了审批,还有一种场景:工具的结果来自外部服务(比如前端浏览器、第三方 API)。
from pydantic_ai import CallDeferred
@agent.tool
async def get_user_location(ctx: RunContext) -> str:
"""获取用户当前位置(需要前端提供)。"""
# 这个信息只有浏览器知道,后端拿不到
raise CallDeferred(metadata={'type': 'geolocation'})
# 前端拿到 DeferredToolRequests 后
# 从浏览器获取位置信息
# 把结果通过 DeferredToolResults 传回来
前面学的代理委托和程序化交接已经很强了,但有些场景还是搞不定:
这时候就需要 Graph(图)—— Pydantic AI 的状态机引擎。
图由节点(Node)组成,节点之间通过返回类型自动连接成边(Edge):
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from dataclasses import dataclass
# 节点 1:检查数字能否被 5 整除
@dataclass
class CheckNumber(BaseNode[None, None, int]):
number: int
async def run(self, ctx: GraphRunContext) -> 'AddOne' | End[int]:
if self.number % 5 == 0:
return End(self.number) # 能整除 → 结束
else:
return AddOne(self.number) # 不能 → 去加 1
# 节点 2:给数字加 1
@dataclass
class AddOne(BaseNode):
number: int
async def run(self, ctx: GraphRunContext) -> CheckNumber:
return CheckNumber(self.number + 1) # 加完 → 回去检查
# 创建图并运行
graph = Graph(nodes=[CheckNumber, AddOne])
result = graph.run_sync(CheckNumber(3))
print(result.output) # 5
run() 方法返回"下一步去哪"。返回 End(值) 就结束,返回另一个节点就继续。像流程图一样,一步一步走。
节点之间可以通过 State(状态) 共享数据:
@dataclass
class OrderState:
"""订单流程的共享状态"""
customer_name: str = ''
items: list[str] = None
total_price: float = 0.0
is_paid: bool = False
def __post_init__(self):
if self.items is None:
self.items = []
@dataclass
class TakeOrder(BaseNode[OrderState]):
async def run(self, ctx: GraphRunContext[OrderState]) -> 'CalculateTotal':
# 读写共享状态
ctx.state.customer_name = '张三'
ctx.state.items = ['咖啡', '蛋糕']
return CalculateTotal()
@dataclass
class CalculateTotal(BaseNode[OrderState]):
async def run(self, ctx: GraphRunContext[OrderState]) -> 'ProcessPayment':
prices = {'咖啡': 28.0, '蛋糕': 35.0}
ctx.state.total_price = sum(
prices.get(item, 0) for item in ctx.state.items
)
return ProcessPayment()
@dataclass
class ProcessPayment(BaseNode[OrderState, None, str]):
async def run(
self, ctx: GraphRunContext[OrderState]
) -> End[str]:
ctx.state.is_paid = True
return End(
f'{ctx.state.customer_name} 支付了 ¥{ctx.state.total_price}'
)
order_graph = Graph(nodes=[TakeOrder, CalculateTotal, ProcessPayment])
state = OrderState()
result = order_graph.run_sync(TakeOrder(), state=state)
print(result.output)
# 张三 支付了 ¥63.0
这是个典型的实战场景 —— AI 写邮件,AI 审核,不通过就重写:
from pydantic import BaseModel
from pydantic_ai import Agent, ModelMessage, format_as_xml
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from dataclasses import dataclass, field
# 状态
@dataclass
class EmailState:
recipient: str
topic: str
agent_messages: list[ModelMessage] = field(default_factory=list)
# 输出类型
class Email(BaseModel):
subject: str
body: str
class NeedRevision(BaseModel):
feedback: str
class Approved(BaseModel):
pass
# Agent 1:写邮件
writer = Agent('openai:gpt-4o', output_type=Email)
# Agent 2:审核邮件
reviewer = Agent(
'openai:gpt-4o',
output_type=NeedRevision | Approved, # type: ignore
)
# 节点 1:写邮件
@dataclass
class WriteEmail(BaseNode[EmailState]):
feedback: str | None = None
async def run(
self, ctx: GraphRunContext[EmailState]
) -> 'ReviewEmail':
if self.feedback:
prompt = f'重写邮件。反馈:{self.feedback}'
else:
prompt = (
f'给 {ctx.state.recipient} 写一封关于'
f' {ctx.state.topic} 的邮件'
)
result = await writer.run(
prompt,
message_history=ctx.state.agent_messages,
)
ctx.state.agent_messages += result.new_messages()
return ReviewEmail(email=result.output)
# 节点 2:审核邮件
@dataclass
class ReviewEmail(BaseNode[EmailState, None, Email]):
email: Email
async def run(
self, ctx: GraphRunContext[EmailState]
) -> WriteEmail | End[Email]:
result = await reviewer.run(
f'审核这封邮件:\n'
f'收件人:{ctx.state.recipient}\n'
f'主题:{self.email.subject}\n'
f'内容:{self.email.body}'
)
if isinstance(result.output, NeedRevision):
# 不通过 → 带反馈回去重写
return WriteEmail(feedback=result.output.feedback)
else:
# 通过 → 结束
return End(self.email)
# 创建并运行
email_graph = Graph(nodes=[WriteEmail, ReviewEmail])
async def main():
state = EmailState(recipient='李总', topic='Q3 项目进度汇报')
result = await email_graph.run(WriteEmail(), state=state)
print(f'主题: {result.output.subject}')
print(f'内容: {result.output.body}')
图工作流可以自动生成流程图:
# 生成 Mermaid 代码
print(email_graph.mermaid_code(start_node=WriteEmail))
# 保存为图片
email_graph.mermaid_save('email_workflow.png', start_node=WriteEmail)
生成的图大概长这样:
长时间运行的工作流(比如需要人工审批的),可能运行到一半需要暂停。用 FileStatePersistence 把状态存到文件:
from pydantic_graph.persistence.file import FileStatePersistence
from pathlib import Path
# 创建持久化
persistence = FileStatePersistence(Path('workflow_state.json'))
# 初始化工作流
state = EmailState(recipient='李总', topic='Q3 汇报')
await email_graph.initialize(
WriteEmail(), state=state, persistence=persistence
)
# 可以一步一步执行
async with email_graph.iter_from_persistence(persistence) as run:
node = await run.next() # 执行一步
# 状态自动保存到文件
# 即使程序重启,也能从文件恢复
async with email_graph.iter_from_persistence(persistence) as run:
node = await run.next() # 从上次的位置继续
| 模式 | 适用场景 | 复杂度 | 谁控制流程 |
|---|---|---|---|
| 代理委托 | 一个 Agent 需要另一个帮忙 | 低 | Agent |
| 程序化交接 | 步骤明确的流水线 | 中 | 你的代码 |
| 工具集管理 | 工具复用、动态切换 | 低 | 配置决定 |
| 延迟工具 | 需要审批或外部输入 | 中 | 人/外部系统 |
| 图工作流 | 有分支、循环、持久化 | 高 | 图的结构 |
一个 Agent 翻译,另一个 Agent 评分并提供改进建议:
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
class Translation(BaseModel):
text: str
language: str
class Review(BaseModel):
score: int # 1-10
suggestion: str
translator = Agent('openai:gpt-4o', output_type=Translation)
reviewer = Agent('openai:gpt-4o', output_type=Review)
# 用代理委托模式:评审 Agent 在工具里调用翻译 Agent
main_agent = Agent(
'openai:gpt-4o',
instructions='使用 translate_and_review 工具完成翻译任务。',
)
@main_agent.tool
async def translate_and_review(
ctx: RunContext, text: str, target_lang: str
) -> str:
"""翻译文本并审核质量。"""
# 第一步:翻译
t_result = await translator.run(
f'将以下文本翻译为{target_lang}:{text}',
usage=ctx.usage,
)
# 第二步:审核
r_result = await reviewer.run(
f'评审这段翻译(原文:{text},译文:{t_result.output.text})',
usage=ctx.usage,
)
return (
f'翻译: {t_result.output.text}\n'
f'评分: {r_result.output.score}/10\n'
f'建议: {r_result.output.suggestion}'
)
用延迟工具实现一个简单的文件操作审批:
from pydantic_ai import (
Agent, DeferredToolRequests,
DeferredToolResults, ToolDenied,
)
agent = Agent(
'openai:gpt-4o',
output_type=[str, DeferredToolRequests],
)
@agent.tool_plain(requires_approval=True)
def write_file(filename: str, content: str) -> str:
"""写入文件。"""
# with open(filename, 'w') as f:
# f.write(content)
return f'已写入 {filename}'
# 运行并处理审批
result = agent.run_sync('创建一个 hello.txt,内容是 Hello World')
if isinstance(result.output, DeferredToolRequests):
for call in result.output.approvals:
print(f'待审批: {call.tool_name}({call.args_as_dict()})')