Pydantic AI 教程 · 阶段八

集成与实战

完整业务应用、UI 集成、协议对接

到了最后一个阶段!前面七个阶段你已经掌握了 Pydantic AI 的所有核心技能,现在要把它们串起来,做真正的业务应用。

这个阶段我们学四件大事:

  1. 业务应用实战 —— 一个完整的 Slack 机器人项目,从代理到部署
  2. AG-UI 协议 —— 让 AI 代理和前端 UI 实时交互
  3. Vercel AI 集成 —— 对接前端 AI SDK
  4. A2A 协议 —— Agent 和 Agent 之间的通信标准

1业务应用实战:Slack 潜客分析机器人

这是一个真实的生产级项目,展示如何把 Pydantic AI 用到正式业务中

项目是干什么的?

想象一下这个场景:你们公司有个公开的 Slack 社区,每天都有新人加入。你想知道这些新人是谁、在哪家公司工作、有没有可能成为你的客户。

以前要人工一个个查,现在用 AI 自动搞定:

Step 1 新人加入 Slack
Step 2 Slack 发送 webhook 事件
Step 3 FastAPI 接收事件,提取用户信息
Step 4 AI 代理自动搜索(DuckDuckGo)
Step 5 生成分析报告(1-5分评级)
Step 6 发送到 Slack 频道通知团队
Step 7 每天早上自动汇总 Top 5

项目结构

slack_lead_qualifier/
├── models.py        # 数据模型:Profile、Analysis
├── agent.py         # AI 代理:搜索+分析
├── app.py           # FastAPI 应用:接收 webhook
├── functions.py     # 业务逻辑:处理+通知
├── slack.py         # Slack API:发消息
├── store.py         # 数据存储:Modal Dict
└── modal.py         # 部署配置:无服务器

第一步:定义数据模型

先想清楚要处理什么数据:

from typing import Annotated, Any
from annotated_types import Ge, Le
from pydantic import BaseModel
from pydantic_ai import format_as_xml


class Profile(BaseModel):
    """新加入的用户信息"""
    first_name: str | None = None
    last_name: str | None = None
    display_name: str | None = None
    email: str

    def as_prompt(self) -> str:
        """转成 XML 格式,方便 AI 理解"""
        return format_as_xml(self, root_tag='profile')


class Analysis(BaseModel):
    """AI 生成的分析结果"""
    profile: Profile
    organization_name: str          # 公司名
    organization_domain: str        # 公司网站
    job_title: str                  # 职位
    relevance: Annotated[int, Ge(1), Le(5)]  # 相关度评分 1-5
    summary: str                    # 一句话总结

    def as_slack_blocks(self) -> list[dict[str, Any]]:
        """转成 Slack 消息格式"""
        return [
            {
                'type': 'markdown',
                'text': (
                    f'[{self.profile.display_name}]'
                    f'(mailto:{self.profile.email}), '
                    f'{self.job_title} at '
                    f'[**{self.organization_name}**]'
                    f'(https://{self.organization_domain})'
                ),
            },
            {
                'type': 'markdown',
                'text': self.summary,
            },
        ]
小技巧: format_as_xml 在阶段四讲过,它把 Pydantic 模型转成 XML,让 AI 能更准确地理解结构化数据。Annotated[int, Ge(1), Le(5)] 限制评分只能是 1 到 5 分。

第二步:创建 AI 代理

这是核心!代理拿到用户信息后,用 DuckDuckGo 去搜索,然后生成分析报告:

from types import NoneType
from pydantic_ai import Agent, NativeOutput
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool
from .models import Analysis, Profile

agent = Agent(
    'openai:gpt-4o',
    instructions="""
    当有新人加入 Slack 社区时,请做一个简短的背景调查:

    1. 他是谁:职业角色、项目经验(LinkedIn、GitHub等)
    2. 在哪工作:公司名和业务领域
    3. 相关度评分(1-5):评估他成为客户的可能性
       1 = 基本不相关,5 = 非常匹配

    用 DuckDuckGo 搜索工具去调查。
    如果信息不足以做出判断,返回 None。
    """,
    tools=[duckduckgo_search_tool()],
    # 两种可能的输出:分析结果 或 None
    output_type=NativeOutput([Analysis, NoneType]),
)


async def analyze_profile(profile: Profile) -> Analysis | None:
    """分析一个用户的资料"""
    result = await agent.run(profile.as_prompt())
    return result.output
关键设计:
NativeOutput([Analysis, NoneType]) —— 联合类型输出,搜不到足够信息时返回 None
duckduckgo_search_tool() —— 内置的搜索工具,不需要 API Key
• 代理的 instructions 写得很具体,包括评分标准

第三步:FastAPI 接收 Slack 事件

Slack 会通过 webhook 通知我们有新人加入:

from typing import Any
from fastapi import FastAPI, HTTPException, status
from .models import Profile

app = FastAPI()


@app.post('/')
async def process_webhook(payload: dict[str, Any]) -> dict[str, Any]:
    # Slack 的 URL 验证(首次配置时会调用)
    if payload['type'] == 'url_verification':
        return {'challenge': payload['challenge']}

    # 有新成员加入
    if (
        payload['type'] == 'event_callback'
        and payload['event']['type'] == 'team_join'
    ):
        profile = Profile.model_validate(
            payload['event']['user']['profile']
        )
        # 异步处理,不阻塞 webhook 响应
        process_slack_member(profile)
        return {'status': 'OK'}

    raise HTTPException(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY
    )

第四步:业务逻辑串联

把分析和通知串起来:

from .agent import analyze_profile
from .models import Profile
from .slack import send_slack_message
from .store import AnalysisStore

NEW_LEAD_CHANNEL = '#new-slack-leads'


async def process_slack_member(profile: Profile):
    """处理新成员:分析 -> 存储 -> 通知"""
    # 1. AI 分析
    analysis = await analyze_profile(profile)

    if analysis is None:
        return  # 信息不足,跳过

    # 2. 存储结果
    await AnalysisStore.add(analysis)

    # 3. 发送 Slack 通知
    await send_slack_message(
        NEW_LEAD_CHANNEL,
        [
            {
                'type': 'header',
                'text': {
                    'type': 'plain_text',
                    'text': f'新成员评分 {analysis.relevance}/5',
                },
            },
            {'type': 'divider'},
            *analysis.as_slack_blocks(),
        ],
    )

第五步:部署到云端(Modal)

用 Modal 做无服务器部署,自带定时任务:

import modal

image = modal.Image.debian_slim(python_version='3.13').pip_install(
    'pydantic',
    'pydantic_ai_slim[openai,duckduckgo]',
    'logfire[httpx,fastapi]',
    'fastapi[standard]',
)

app = modal.App(
    name='slack-lead-qualifier',
    image=image,
    secrets=[
        modal.Secret.from_name('openai'),
        modal.Secret.from_name('slack'),
    ],
)


@app.function(min_containers=1)  # 至少保持 1 个容器热启动
@modal.asgi_app()
def web_app():
    from .app import app as _app
    return _app


# 每天早上 8 点自动发送汇总
@app.function(schedule=modal.Cron('0 8 * * *'))
async def send_daily_summary():
    from .functions import send_daily_summary as _send_daily_summary
    await _send_daily_summary()

整个项目的架构图

┌─────────────┐ webhook ┌──────────────┐ │ Slack │ ──────────────►│ FastAPI │ │ (新人加入) │ │ (Modal) │ └─────────────┘ └──────┬───────┘ │ ┌───────▼───────┐ │ AI Agent │ │ (GPT-4o + │ │ DuckDuckGo) │ └───────┬───────┘ │ ┌────────────┴────────────┐ │ │ │ ┌─────▼─────┐ ┌────▼────┐ ┌────▼────┐ │ Storage │ │ Slack │ │ 每日 │ │ (Modal │ │ 通知 │ │ 汇总 │ │ Dict) │ │ │ │ (Cron) │ └───────────┘ └─────────┘ └─────────┘
实战要点总结:
• 用 NativeOutput 处理"可能没结果"的场景
• 用 format_as_xml 把结构化数据传给 AI
• webhook 响应要快,耗时操作异步处理
• 用 Modal 等无服务器平台简化部署
• 定时任务做汇总,减少打扰频率

2AG-UI 协议:让 AI 和前端实时对话

什么是 AG-UI?

AG-UI(Agent-User Interaction Protocol)是由 CopilotKit 团队推出的开放标准,它解决了一个大问题:

AI 代理怎么和前端 UI 实时通信?

以前你可能用 WebSocket 或者 SSE 自己搞,每个项目搞一套,前端和后端的约定各不相同。AG-UI 把这事标准化了 —— 流式传输、共享状态、前端工具调用、自定义事件,全都有标准。

安装

pip install 'pydantic-ai-slim[ag-ui]'
pip install uvicorn

三种接入方式

AG-UI 提供了从简单到复杂的三种用法:

方式一:一行代码搞定(AGUIApp)

最简单!直接创建一个 ASGI 应用:

from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui.app import AGUIApp

agent = Agent('openai:gpt-4o', instructions='你是一个有趣的助手!')

# 一行代码,搞定!
app = AGUIApp(agent)

启动服务:

uvicorn my_app:app --port 9000

方式二:嵌入 FastAPI(dispatch_request)

如果你已经有 FastAPI 项目,用这种方式嵌入:

from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import Response
from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui import AGUIAdapter

agent = Agent('openai:gpt-4o', instructions='你是一个有趣的助手!')

app = FastAPI()

@app.post('/chat')
async def run_agent(request: Request) -> Response:
    return await AGUIAdapter.dispatch_request(
        request, agent=agent
    )

方式三:完全控制(run_stream)

需要更多控制时,手动处理流:

import json
from http import HTTPStatus
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import Response, StreamingResponse
from pydantic import ValidationError
from pydantic_ai import Agent
from pydantic_ai.ui import SSE_CONTENT_TYPE
from pydantic_ai.ui.ag_ui import AGUIAdapter

agent = Agent('openai:gpt-4o', instructions='你是一个有趣的助手!')

app = FastAPI()

@app.post('/chat')
async def run_agent(request: Request) -> Response:
    accept = request.headers.get('accept', SSE_CONTENT_TYPE)

    try:
        # 1. 解析请求
        run_input = AGUIAdapter.build_run_input(
            await request.body()
        )
    except ValidationError as e:
        return Response(
            content=json.dumps(e.json()),
            media_type='application/json',
            status_code=HTTPStatus.UNPROCESSABLE_ENTITY,
        )

    # 2. 创建适配器并运行
    adapter = AGUIAdapter(
        agent=agent,
        run_input=run_input,
        accept=accept,
    )
    event_stream = adapter.run_stream()

    # 3. 编码为 SSE 流并返回
    sse_stream = adapter.encode_stream(event_stream)
    return StreamingResponse(sse_stream, media_type=accept)

AG-UI 六大实战模式

AG-UI 不只是简单的聊天,它支持六种强大的交互模式:

模式 1:基础聊天 + 工具调用

最基本的用法,代理可以调用工具并流式回复:

from datetime import datetime
from zoneinfo import ZoneInfo
from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui.app import AGUIApp

agent = Agent('openai:gpt-4o-mini')

@agent.tool_plain
async def current_time(timezone: str = 'UTC') -> str:
    """获取当前时间"""
    tz = ZoneInfo(timezone)
    return datetime.now(tz=tz).isoformat()

app = AGUIApp(agent)

模式 2:生成式 UI(任务规划)

这个厉害了!AI 不只是回复文字,还能直接操控前端 UI

比如用户说"帮我规划一个学习计划",AI 可以在前端动态创建任务列表,并逐步更新状态:

from typing import Any, Literal
from pydantic import BaseModel, Field
from ag_ui.core import EventType, StateDeltaEvent, StateSnapshotEvent
from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui.app import AGUIApp

StepStatus = Literal['pending', 'completed']


class Step(BaseModel):
    description: str
    status: StepStatus = 'pending'


class Plan(BaseModel):
    steps: list[Step] = Field(default_factory=list)


# JSON Patch 操作(RFC 6902 标准)
class JSONPatchOp(BaseModel):
    op: Literal['add', 'remove', 'replace']
    path: str
    value: Any = None


agent = Agent(
    'openai:gpt-4o-mini',
    instructions="""
    规划任务时只用工具,不要发送多余的消息。
    - 用 create_plan 创建计划
    - 用 update_plan_step 更新步骤状态
    - 不要重复描述计划内容
    """,
)


@agent.tool_plain
async def create_plan(steps: list[str]) -> StateSnapshotEvent:
    """创建一个多步骤计划"""
    plan = Plan(
        steps=[Step(description=step) for step in steps]
    )
    # 返回完整状态快照 -> 前端渲染整个计划
    return StateSnapshotEvent(
        type=EventType.STATE_SNAPSHOT,
        snapshot=plan.model_dump(),
    )


@agent.tool_plain
async def update_plan_step(
    index: int,
    status: StepStatus | None = None,
) -> StateDeltaEvent:
    """更新某一步的状态"""
    changes = []
    if status is not None:
        changes.append(
            JSONPatchOp(
                op='replace',
                path=f'/steps/{index}/status',
                value=status,
            )
        )
    # 返回增量更新 -> 前端只更新变化的部分
    return StateDeltaEvent(
        type=EventType.STATE_DELTA,
        delta=changes,
    )


app = AGUIApp(agent)
两种状态更新方式:
StateSnapshotEvent —— 发送完整状态,前端整体替换(适合初始化)
StateDeltaEvent —— 发送 JSON Patch,前端增量更新(适合小改动)

模式 3:共享状态(食谱助手)

前端有状态,AI 也能读取和修改这个状态:

from pydantic import BaseModel, Field
from ag_ui.core import EventType, StateSnapshotEvent
from pydantic_ai import Agent, RunContext
from pydantic_ai.ui import StateDeps
from pydantic_ai.ui.ag_ui.app import AGUIApp


class Ingredient(BaseModel):
    icon: str = '🥕'
    name: str
    amount: str


class Recipe(BaseModel):
    ingredients: list[Ingredient] = Field(default_factory=list)
    instructions: list[str] = Field(default_factory=list)


class RecipeSnapshot(BaseModel):
    recipe: Recipe = Field(default_factory=Recipe)


# 关键:用 StateDeps 包装状态
agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=StateDeps[RecipeSnapshot],
)


@agent.tool_plain
async def display_recipe(recipe: Recipe) -> StateSnapshotEvent:
    """把食谱展示给用户"""
    return StateSnapshotEvent(
        type=EventType.STATE_SNAPSHOT,
        snapshot={'recipe': recipe},
    )


@agent.instructions
async def recipe_instructions(
    ctx: RunContext[StateDeps[RecipeSnapshot]],
) -> str:
    """动态指令:包含当前食谱状态"""
    return f"""
    你是一个食谱助手。
    用 display_recipe 工具展示食谱。
    当前食谱状态:
    {ctx.deps.state.recipe.model_dump_json(indent=2)}
    """


# 把初始状态传给 AGUIApp
app = AGUIApp(
    agent,
    deps=StateDeps(RecipeSnapshot()),
)
StateDeps 是什么?
它是 AG-UI 专用的依赖类型,可以从前端接收状态,也可以把状态推送回前端。相当于前端和后端共享了同一份数据。

模式 4:人机协作

有些操作需要人来确认,AG-UI 天然支持"人在回路中"的模式:

from pydantic_ai import Agent
from pydantic_ai.ui.ag_ui.app import AGUIApp

agent = Agent(
    'openai:gpt-4o-mini',
    instructions="""
    规划任务时:
    - 用 generate_task_steps 展示建议步骤
    - 等待用户确认
    - 如果用户同意,确认计划
    - 如果用户不同意,询问更多信息
    """,
)

# 不需要特殊处理!AG-UI 协议本身就支持
# 前端会展示确认界面,用户可以接受或拒绝
app = AGUIApp(agent)

模式 5:自定义事件

除了标准的文本和状态更新,你还可以发送完全自定义的事件:

from ag_ui.core import CustomEvent, EventType
from pydantic_ai import Agent, ToolReturn
from pydantic_ai.ui.ag_ui.app import AGUIApp

agent = Agent('openai:gpt-4o-mini')


@agent.tool_plain
async def count_things() -> ToolReturn:
    """用 ToolReturn 附带自定义事件"""
    return ToolReturn(
        return_value='计数事件已发送',
        metadata=[
            CustomEvent(
                type=EventType.CUSTOM,
                name='count',
                value=1,
            ),
            CustomEvent(
                type=EventType.CUSTOM,
                name='count',
                value=2,
            ),
        ],
    )

app = AGUIApp(agent)
ToolReturn 的妙用: 工具的返回值不只是给 AI 看的数据,还可以通过 metadata 字段携带事件、状态更新等信息,直接传递给前端。

模式 6:预测式状态更新

这是最高级的模式 —— AI 的输出直接实时映射到 UI 状态,用户看到的不是文字在打,而是界面在实时变化:

from pydantic import BaseModel
from ag_ui.core import CustomEvent, EventType
from pydantic_ai import Agent, RunContext
from pydantic_ai.ui import StateDeps
from pydantic_ai.ui.ag_ui.app import AGUIApp


class DocumentState(BaseModel):
    document: str = ''


agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=StateDeps[DocumentState],
)


@agent.tool_plain
async def document_predict_state() -> list[CustomEvent]:
    """启用文档状态预测"""
    return [
        CustomEvent(
            type=EventType.CUSTOM,
            name='PredictState',
            value=[{
                'state_key': 'document',
                'tool': 'write_document',
                'tool_argument': 'document',
            }],
        ),
    ]


@agent.instructions()
async def instructions(
    ctx: RunContext[StateDeps[DocumentState]],
) -> str:
    return f"""
    你是一个文档写作助手。
    写文档前必须先调用 document_predict_state 启用预测。
    用 write_document 工具展示文档。
    当前文档内容:
    {ctx.deps.state.document}
    """

app = AGUIApp(
    agent,
    deps=StateDeps(DocumentState()),
)

AG-UI 事件类型汇总

事件类型作用适用场景
StateSnapshotEvent发送完整状态初始化、大幅更新
StateDeltaEventJSON Patch 增量更新局部修改
CustomEvent自定义事件计数器、通知、预测配置
ToolReturn + metadata工具返回附带事件任何工具内发送 UI 事件

运行和测试

启动你的 AG-UI 服务:

uvicorn my_app:app --port 9000

然后用 AG-UI 官方的调试工具测试:访问 AG-UI Dojo(https://docs.ag-ui.com/tutorials/debugging#the-ag-ui-dojo),连接到 http://localhost:9000,就能看到效果了。


3Vercel AI 集成:对接前端 AI SDK

什么是 Vercel AI SDK?

如果你用 React/Next.js 做前端,Vercel AI SDK 是最流行的选择之一。Pydantic AI 原生支持 Vercel AI Data Stream Protocol,后端写 Python,前端用 React。

快速接入

跟 AG-UI 一样简洁:

from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import Response
from pydantic_ai import Agent
from pydantic_ai.ui.vercel_ai import VercelAIAdapter

agent = Agent('openai:gpt-4o')

app = FastAPI()

@app.post('/chat')
async def chat(request: Request) -> Response:
    return await VercelAIAdapter.dispatch_request(
        request, agent=agent
    )

前端(React)使用 Vercel AI SDK 的 useChat hook 连接即可。

需要更多控制时

跟 AG-UI 的三种方式类似,你也可以手动处理流:

import json
from http import HTTPStatus
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import Response, StreamingResponse
from pydantic import ValidationError
from pydantic_ai import Agent
from pydantic_ai.ui import SSE_CONTENT_TYPE
from pydantic_ai.ui.vercel_ai import VercelAIAdapter

agent = Agent('openai:gpt-4o')

app = FastAPI()

@app.post('/chat')
async def chat(request: Request) -> Response:
    accept = request.headers.get('accept', SSE_CONTENT_TYPE)

    try:
        run_input = VercelAIAdapter.build_run_input(
            await request.body()
        )
    except ValidationError as e:
        return Response(
            content=json.dumps(e.json()),
            media_type='application/json',
            status_code=HTTPStatus.UNPROCESSABLE_ENTITY,
        )

    adapter = VercelAIAdapter(
        agent=agent,
        run_input=run_input,
        accept=accept,
    )
    event_stream = adapter.run_stream()
    sse_stream = adapter.encode_stream(event_stream)
    return StreamingResponse(sse_stream, media_type=accept)

数据块(Data Chunks)

Vercel AI 支持在流中发送额外的数据块,比如搜索结果的来源链接:

from pydantic_ai import Agent, ToolReturn
from pydantic_ai.ui.vercel_ai.response_types import (
    DataChunk,
    SourceUrlChunk,
)

agent = Agent('openai:gpt-4o')


@agent.tool_plain
async def search_docs(query: str) -> ToolReturn:
    """搜索文档,返回结果+来源链接"""
    return ToolReturn(
        return_value=f'找到 2 条关于 "{query}" 的结果',
        metadata=[
            # 来源 URL -- 前端可以展示为引用链接
            SourceUrlChunk(
                source_id='doc-1',
                url='https://example.com/docs/intro',
                title='入门文档',
            ),
            # 自定义数据块 -- 前端可以自由使用
            DataChunk(
                type='data-search-results',
                data={'query': query, 'count': 2},
            ),
        ],
    )

Vercel AI SDK 版本

默认支持 v5,如果你用的是 v6:

adapter = VercelAIAdapter(
    agent=agent,
    run_input=run_input,
    accept=accept,
    sdk_version=6,  # 指定 v6
)

AG-UI vs Vercel AI:怎么选?

对比维度AG-UIVercel AI
来源CopilotKit 团队Vercel 团队
前端框架框架无关React/Next.js 为主
状态管理内置共享状态需自行处理
生成式 UI原生支持有限支持
数据块自定义事件SourceUrl/DataChunk
适用场景复杂交互、协作应用标准聊天、内容生成
成熟度较新更成熟
选择建议:
• 做聊天机器人、内容生成 → 用 Vercel AI,生态成熟
• 做协作工具、复杂 UI 交互 → 用 AG-UI,功能更强
• 都可以!Pydantic AI 两个都原生支持

4A2A 协议:Agent 对 Agent 通信

什么是 A2A?

A2A(Agent2Agent Protocol)是 Google 推出的开放标准,解决的是另一个问题:

不同的 AI 代理之间怎么通信?

想象一下:你有一个用 Pydantic AI 写的代理,你的合作伙伴有一个用 LangChain 写的代理,客户有一个用自研框架的代理。A2A 让它们能互相对话,不管底层用什么框架。

┌─────────────────┐ A2A 协议 ┌──────────────────┐ │ Pydantic AI │ ◄──────────────► │ LangChain │ │ Agent │ │ Agent │ └─────────────────┘ └──────────────────┘ ║ A2A 协议 ║ A2A 协议 ┌─────────────────┐ ┌──────────────────┐ │ 自研框架 │ │ CrewAI │ │ Agent │ │ Agent │ └─────────────────┘ └──────────────────┘

最简用法:一行代码

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', instructions='你是一个有趣的助手!')

# 直接把 Agent 变成 A2A 服务!
app = agent.to_a2a()

启动服务:

uvicorn my_a2a:app --host 0.0.0.0 --port 8000

就这么简单,你的 Agent 现在已经是一个标准的 A2A 服务了,任何支持 A2A 协议的客户端都能和它通信。

A2A 的核心概念

A2A 协议围绕三个核心概念:

┌─────────────────────────────────────────────┐ │ Server │ │ │ │ ┌──────────────────────────────────────┐ │ │ │ TaskManager │ │ │ │ │ │ │ │ ┌──────────┐ ┌─────────────────┐ │ │ │ │ │ Broker │──│ Storage │ │ │ │ │ └────┬─────┘ └─────────────────┘ │ │ │ │ │ │ │ │ │ ┌────▼─────┐ │ │ │ │ │ Worker │ │ │ │ │ └──────────┘ │ │ │ └──────────────────────────────────────┘ │ └─────────────────────────────────────────────┘
组件作用
Storage保存和加载任务,存储对话上下文
Broker任务调度 —— 决定把任务交给谁处理
Worker实际执行任务 —— 就是你的 Agent

Task 和 Context

A2A 里有两个重要概念要区分:

# 第一个请求 - 创建新的 context
# POST /tasks/send
{
    "message": {"text": "你好!我叫小明"},
    "context_id": "conv-123"  # 指定上下文 ID
}

# 第二个请求 - 同一个 context,Agent 记得之前的对话
# POST /tasks/send
{
    "message": {"text": "我叫什么名字?"},
    "context_id": "conv-123"  # 同一个上下文
}
# Agent 会回答 "你叫小明"

自动处理的事情

当你用 agent.to_a2a() 时,Pydantic AI 自动帮你处理了很多事情:

  1. 对话历史存储:完整的消息历史(包括工具调用和响应)自动保存
  2. 上下文延续:相同 context_id 的后续请求自动加载历史
  3. 结果转换:Agent 的输出自动转成 A2A 标准格式
    • 字符串输出 → TextPart 工件
    • 结构化数据 → DataPart 工件(附带 JSON Schema)

带工具的 A2A 服务

当然也可以让你的 A2A 服务具备工具能力:

from pydantic_ai import Agent

agent = Agent(
    'openai:gpt-4o',
    instructions='你是一个天气查询助手',
)


@agent.tool_plain
async def get_weather(city: str) -> str:
    """查询城市天气"""
    # 实际项目中调用天气 API
    return f'{city}今天晴,25°C'


app = agent.to_a2a()

结构化输出的 A2A 服务

如果你的 Agent 返回结构化数据,A2A 会自动把它转成 DataPart

from pydantic import BaseModel
from pydantic_ai import Agent


class WeatherReport(BaseModel):
    city: str
    temperature: float
    description: str


agent = Agent(
    'openai:gpt-4o',
    output_type=WeatherReport,
)

app = agent.to_a2a()
# 客户端收到的是 DataPart,包含 JSON 数据和 Schema

A2A 的典型使用场景

场景 1:微服务架构 ┌─────────────┐ A2A ┌──────────────┐ │ 主 Agent │ ────────► │ 翻译 Agent │ │ (调度中心) │ │ (专门翻译) │ │ │ ────────► │ 搜索 Agent │ │ │ │ (专门搜索) │ └─────────────┘ └──────────────┘ 场景 2:跨团队协作 ┌─────────────┐ A2A ┌──────────────┐ │ 你的 Agent │ ◄───────► │ 合作方 Agent │ │ (Pydantic) │ │ (任何框架) │ └─────────────┘ └──────────────┘ 场景 3:AI 代理市场 ┌─────────────┐ A2A ┌──────────────┐ │ 用户的Agent │ ────────► │ 法律 Agent │ │ │ ────────► │ 财务 Agent │ │ │ ────────► │ 设计 Agent │ └─────────────┘ └──────────────┘

总结:四大集成方式对比

特性业务应用(直接集成)AG-UIVercel AIA2A
对接对象外部 API/服务前端 UIReact 前端其他 Agent
通信方式HTTP/webhookSSE 事件流SSE 数据流HTTP/SSE
状态管理自行处理StateDeps 共享自行处理自动存储
典型场景Bot、自动化协作应用聊天应用多代理系统
快速接入需要完整编写AGUIApp(agent)dispatch_requestagent.to_a2a()

恭喜!你已经完成了全部八个阶段的学习!

回顾一下你掌握的技能树:

阶段一:入门基础         Agent、Tool、结构化输出
阶段二:核心概念         依赖注入、消息历史、重试
阶段三:流式响应与UI      run_stream、Web应用、Gradio
阶段四:数据与检索        SQL生成、RAG、Embedding
阶段五:多代理与工作流     委托、Toolset、Graph
阶段六:高级特性         思考模式、MCP、多模态
阶段七:生产化           测试、评估、可观测性
阶段八:集成与实战        AG-UI、Vercel AI、A2A

你现在可以用 Pydantic AI 构建从简单到复杂的任何 AI 应用了。去创造吧!

动手练习

  1. Mini 项目:写一个 A2A 服务,接收问题并用搜索工具查找答案,然后用 AG-UI 做一个前端来展示
  2. 状态管理:用 AG-UI 的 StateDeps 做一个待办事项应用,AI 可以帮你管理任务列表
  3. 数据块实验:用 Vercel AI 的 SourceUrlChunk 做一个带引用来源的问答助手

教程导航

恭喜完成全部八个阶段的学习!你可以回顾之前的内容,或查看完整学习路线。

← 阶段七:生产化 回到学习路线