欢迎来到ai agent的新时代!随着大型语言模型(llms)如ernie的飞速发展,我们不再仅仅满足于ai的问答能力,而是期望它们能像智能助手一样,主动理解任务、拆解问题、调用工具、并与其他ai协作,最终完成复杂的目标。这就是 ai agent 的核心思想。
简单来说,AI Agent 是一个能够感知其环境、进行决策并采取行动以达成特定目标的智能实体。在当前大模型的背景下,AI Agent 通常具备以下特征:
现实世界中的复杂问题往往需要多个不同角色、不同能力的个体协同合作才能解决。AI领域也是如此。一个全能的、无所不知的单一Agent模型往往难以构建且维护成本高昂。
多Agent协作的意义:
挑战:
为了解决上述挑战中的通信和发现问题,Google提出了 Agent-to-Agent (A2A) 协议。A2A协议旨在为AI Agent之间的交互建立一套标准的通信格式和规范。它使得由不同开发者、不同技术栈构建的Agent能够"说同一种语言",从而实现互操作和协作。
核心理念包括:
python-a2a 是A2A协议的一个官方Python实现,它极大地简化了遵循A2A协议的Agent的开发。
Ernie 是由百度研发的知识增强大语言模型。在中文的自然语言理解、生成、对话等方面表现出色。在本教程中,我们将探讨如何将Ernie的智能融入到A2A Agent中,使其具备更强的任务处理和交互能力。
我们将使用如下方式与Ernie API进行交互(请确保你已获得有效的API Key和正确的Base URL):
import osfrom openai import OpenAI
client = OpenAI(
api_key="Your-sdk", # 替换为你的 AI Studio 访问令牌
base_url="https://aistudio.baidu.com/llm/lmapi/v3", # AI Studio 大模型 API 服务域名)# 示例:简单对话# chat_completion = client.chat.completions.create(# messages=[# {'role': 'system', 'content': '你是 AI Studio 实训AI开发平台的开发者助理,你精通开发相关的知识,负责给开发者提供搜索帮助建议。'},# {'role': 'user', 'content': '你好,请介绍一下AI Studio'}# ],# model="ernie-3.5-8k", # 或者 ernie-4.0-8k 等其他适用模型# )# print(chat_completion.choices[0].message.content)注意:上述代码中的API Key是一个示例,请替换为您自己的有效密钥。在后续教程中,我们会将实际的API调用注释掉或用模拟数据替代,以确保教程的通用性,但会指明在何处可以集成真实的LLM调用。
本教程旨在引导您:
在深入实践之前,让我们先打下坚实的理论基础。理解A2A协议和MCP是构建高效协作Agent系统的关键。
A2A协议为AI Agent之间的异步通信和协作提供了一个标准化的框架。它的设计目标是简单、可扩展且独立于特定的AI技术。
核心组件:
Agent Card (代理名片):
Message (消息):
Task (任务):
通信流程(简化版):
python-a2a 库提供了这些组件的Python类实现(如 AgentCard, Message, Task, A2AServer, A2AClient),使得开发者可以方便地创建和管理这些交互。
虽然A2A协议定义了Agent之间如何"对话",但它本身并不关心Agent内部如何完成任务,特别是当Agent需要使用外部工具或数据源(如API、数据库、代码执行器等)时。
Model Context Protocol (MCP) 应运而生,它是一种开放标准,旨在让语言模型(或基于语言模型的Agent)能够以标准化的方式访问外部工具和数据源,从而获取"上下文"知识并执行动作。
MCP 的作用:
核心概念:
理解A2A和MCP的区别与联系至关重要,它们共同构成了构建高级AI Agent系统的基石。
区别:
| 特性 | A2A (Agent-to-Agent) Protocol | MCP (Model Context Protocol) |
|---|---|---|
| 主要目标 | 定义 Agent之间 的通信和协作标准。 | 定义 Agent/LLM与外部工具/数据源之间 的交互标准。 |
| 关注点 | Agent的发现、对话管理、任务流转、Agent间互操作性。 | 工具的定义、发现、调用、参数传递、结果返回,增强Agent的执行能力。 |
| 交互对象 | 通常是其他自主的AI Agent。 | 通常是API、数据库、代码解释器、或其他形式的"工具服务"。 |
| 协议层级 | 更侧重于Agent间的"社交"和"协作"框架。 | 更侧重于Agent执行具体动作时的"能力扩展"机制。 |
协作与协同:
A2A和MCP并非互斥,而是可以完美协同工作的。
A2A Agent 使用 MCP 工具: 一个遵循A2A协议的Agent(我们称之为"主Agent")在执行其接收到的A2A任务时,可能会发现自己需要某些特定能力(如获取实时股票价格、翻译文本、执行一段Python代码)。这时,这个"主Agent"可以扮演MCP Client的角色,去调用一个或多个实现了MCP协议的"工具Agent"或"工具服务"。
MCP 工具本身也可以是 A2A Agent: 一个提供MCP工具的服务,其内部逻辑也可能非常复杂,甚至它本身就是一个小型的A2A Agent网络。但从外部调用者的角度看,它只是一个MCP Tool Provider。
增强Agent的自主性和能力边界:
一个形象的比喻:
通过这种方式,A2A负责宏观的Agent协作流程,而MCP则在微观层面增强了每个Agent的实际执行能力。Ernie大模型既可以直接作为A2A Agent的核心智能,也可以被封装成一个MCP工具供其他Agent调用。
python-a2a 库为开发者提供了在Python中轻松实现A2A和MCP兼容Agent所需的一切。
主要组件与功能:
在接下来的实战演练中,我们将大量使用这些组件。
理论学习完毕,让我们动手实践!我们将构建一个简单的"本地智能旅行助手"。用户输入一个城市名称,AI助手将查询该城市的天气,并根据天气情况推荐合适的室内或室外活动。这个例子将主要借鉴 官方的basic_workflow.py 的思路,并融入Ernie的思考。
首先,确保你已经安装了 python-a2a 库。如果还没有,可以通过pip安装(建议安装all extras以便体验全部功能):
pip install "python-a2a[all]"
接下来,是Ernie API的对接代码。我们再次列出,并假设它保存在一个名为 ernie_client.py 的文件中,或者在Notebook的初始化代码块中定义。
# ernie_client.py (或Notebook初始化块)import osfrom openai import OpenAI# AI Studio Ernie API 配置# 请确保使用你自己的有效API Key和正确的Base URLASTUDIO_API_KEY = os.getenv("AISTUDIO_API_KEY", "") # 从环境变量读取或直接填写ASTUDIO_BASE_URL = os.getenv("AISTUDIO_BASE_URL", "https://aistudio.baidu.com/llm/lmapi/v3")
client = OpenAI(
api_key=ASTUDIO_API_KEY,
base_url=ASTUDIO_BASE_URL,
)def get_ernie_response(user_prompt, system_prompt="你是一个乐于助人的AI助手。", model="ernie-3.5-8k"):
"""调用Ernie API获取回复"""
try:
chat_completion = client.chat.completions.create(
messages=[
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': user_prompt}
],
model=model,
) return chat_completion.choices[0].message.content except Exception as e: print(f"调用Ernie API失败: {e}") return f"无法获取Ernie的回应,错误: {e}" # 返回错误信息,而不是None# 测试一下 (可选)# if __name__ == "__main__":# response = get_ernie_response("你好,请介绍一下北京的天气如何?")# print(response)重要提示:
我们将创建两个Agent:WeatherAgent 和 ActivityAgent。每个Agent都会在一个独立的Python脚本中定义,并通过Flask启动为HTTP服务。这是A2A Agent的标准运行方式,使得它们可以被网络中的其他Agent或工作流发现和调用。
WeatherAgent 负责提供天气信息。创建一个名为 weather_agent_server.py 的文件。
# weather_agent_server.pyimport timefrom flask import Flask, request, jsonifyfrom python_a2a import A2AServer, AgentCard, AgentSkill, Task, TaskStatus, TaskState, Message, TextContent, MessageRoleclass WeatherAgent(A2AServer):
def __init__(self, port=5001):
self.port = port
agent_card = AgentCard(
name="Weather Agent",
description="提供指定城市的天气信息",
url=f"http://localhost:{self.port}",
version="1.0.0",
skills=[
AgentSkill(
name="GetCityWeather",
description="获取一个城市的当前天气",
tags=["weather", "location", "forecast"]
)
]
) super().__init__(agent_card=agent_card) def _get_simulated_weather(self, city):
print(f"[WeatherAgent] 正在为城市 '{city}' 查询模拟天气...")
time.sleep(1) # 模拟网络延迟
weather_conditions = { "北京": "晴朗,25°C,微风。", "上海": "多云转小雨,22°C,东南风3级。", "广州": "雷阵雨,28°C,注意防范。", "深圳": "晴间多云,29°C,空气质量良。", "伦敦": "持续小雨,15°C,请带好雨具。", "巴黎": "阳光明媚,23°C,适合户外活动。"
} return weather_conditions.get(city, f"抱歉,暂时无法获取'{city}'的天气信息。也许可以问问Ernie?") # 如何集成Ernie (示例性注释,实际使用时需要取消注释并确保 ernie_client.py 可导入)
# from ernie_client import get_ernie_response
# def _get_weather_from_ernie(self, city):
# print(f"[WeatherAgent] 正在通过Ernie查询城市 '{city}' 的天气...\")
# prompt = f"请告诉我{city}今天的天气怎么样?请简要描述。\"
# weather_info = get_ernie_response(prompt, system_prompt="你是一个天气预报助手。")
# return weather_info
def handle_task(self, task: Task) -> Task:
query_text = ""
if task.message and task.message.content and hasattr(task.message.content, 'text'):
query_text = task.message.content.text
city = query_text # 假设查询文本直接是城市名
# **核心逻辑:获取天气**
# 默认使用模拟数据
weather_data = self._get_simulated_weather(city) # # 如果想使用Ernie获取天气 (取消下面一行的注释,并注释掉上面一行):
# weather_data = self._get_weather_from_ernie(city)
print(f"[WeatherAgent] 为 '{city}' 获取到的天气: {weather_data}")
task.artifacts = [{ "parts": [{"type": "text", "text": weather_data}]
}]
task.status = TaskStatus(state=TaskState.COMPLETED) return task# ---- Flask App Setup ----def create_app(agent_instance):
app = Flask(__name__) @app.route('/agent.json', methods=['GET'])
def get_agent_card():
return jsonify(agent_instance.agent_card.to_dict()) @app.route('/a2a/agent.json', methods=['GET'])
def get_a2a_agent_card(): # A2A标准兼容端点
return jsonify(agent_instance.agent_card.to_dict()) @app.route('/tasks/send', methods=['POST'])
def handle_send_task():
try:
task_data = request.json # 兼容JSON-RPC风格的请求
if "jsonrpc" in task_data and "method" in task_data and task_data["method"] == "tasks/send":
task_dict = task_data.get("params", {}) else:
task_dict = task_data
task = Task.from_dict(task_dict)
updated_task = agent_instance.handle_task(task)
response_data = updated_task.to_dict() if "jsonrpc" in task_data: return jsonify({"jsonrpc": "2.0", "id": task_data.get("id"), "result": response_data}) return jsonify(response_data) except Exception as e:
error_response = {"code": -32603, "message": str(e)} if "jsonrpc" in request.json: return jsonify({"jsonrpc": "2.0", "id": request.json.get("id"), "error": error_response}), 500
return jsonify({"error": error_response}), 500
# 兼容旧版Message端点 (可选,推荐使用Task) @app.route('/', methods=['POST'])
def handle_root_message():
try:
data = request.json # 尝试将整个请求体作为Task处理
if isinstance(data, dict) and "id" in data and "status" in data: # 看起来像Task
return handle_send_task() # 否则,尝试作为Message处理 (简化版)
text_content = data.get("content", {}).get("text", "") if isinstance(data.get("content"), dict) else str(data)
message = Message(content=TextContent(text=text_content), role=MessageRole.USER) # 模拟任务转换
task = Task(id=f"task-{time.time_ns()}", message=message, status=TaskStatus(state=TaskState.PENDING))
updated_task = agent_instance.handle_task(task) # 从Task结果中提取文本给Message响应
response_text = "处理完成。"
if updated_task.artifacts and updated_task.artifacts[0].get("parts"):
response_text = updated_task.artifacts[0]["parts"][0].get("text", response_text)
response_message = Message(
content=TextContent(text=response_text),
role=MessageRole.AGENT,
message_id=f"response-{time.time_ns()}",
parent_message_id=data.get("message_id")
) return jsonify(response_message.to_dict()) except Exception as e: return jsonify({"error": str(e)}), 400
return appif __name__ == "__main__":
PORT = 5001
weather_agent = WeatherAgent(port=PORT)
app = create_app(weather_agent) print(f"WeatherAgent 正在启动,监听端口: http://localhost:{PORT}")
app.run(host='0.0.0.0', port=PORT, debug=False)终端运行 WeatherAgent:
打开一个新的终端窗口,进入该文件所在目录,然后运行:
python weather_agent_server.py
你应该能看到类似 "WeatherAgent 正在启动,监听端口: http://localhost:5001" 的输出。这个Agent现在就在后台运行了,等待请求。
ActivityAgent 负责根据天气推荐活动。创建一个名为 activity_agent_server.py 的文件。
# activity_agent_server.pyimport timeimport re # 用于从查询中提取天气信息from flask import Flask, request, jsonifyfrom python_a2a import A2AServer, AgentCard, AgentSkill, Task, TaskStatus, TaskState, Message, TextContent, MessageRoleclass ActivityAgent(A2AServer):
def __init__(self, port=5002):
self.port = port
agent_card = AgentCard(
name="Activity Agent",
description="根据城市和天气推荐活动",
url=f"http://localhost:{self.port}",
version="1.0.0",
skills=[
AgentSkill(
name="GetActivityRecommendations",
description="获取基于天气和地点的活动推荐",
tags=["activity", "recommendation", "travel", "leisure"]
)
]
) super().__init__(agent_card=agent_card) def _get_simulated_activities(self, city, weather_condition):
print(f"[ActivityAgent] 正在为 '{city}' (天气: {weather_condition}) 生成模拟活动推荐...")
time.sleep(1.5) # 模拟处理时间
is_rainy_or_bad_weather = any(kw in weather_condition.lower() for kw in ["雨", "雷", "雪", "糟", "霾"])
recommendations = { "北京": { "indoor": "故宫博物院深度游、国家博物馆看展、798艺术区感受艺术氛围。", "outdoor": "颐和园漫步、长城徒步(天气好时)、什刹海胡同骑行。"
}, "上海": { "indoor": "上海博物馆探索历史、中华艺术宫赏析艺术、体验密室逃脱。", "outdoor": "外滩漫步欣赏万 国建筑群、南京路步行街购物、豫园游览。"
}, "广州": { "indoor": "广东省博物馆、广州塔室内观光层、正佳广场极地海洋世界。", "outdoor": "白云山登高望远、越秀公园五羊石像、珠江夜游。"
}, "深圳": { "indoor": "深圳博物馆、当代艺术与城市规划馆、万象城购物体验。", "outdoor": "世界之窗(天气好时)、莲花山公园放风筝、深圳湾公园骑行。"
}, "伦敦": { "indoor": "大英博物馆、国家美术馆、自然历史博物馆。", "outdoor": "海德公园野餐、伦敦眼俯瞰城市、泰晤士河畔散步。"
}, "巴黎": { "indoor": "卢浮宫、奥赛博物馆、蓬皮杜艺术中心。", "outdoor": "埃菲尔铁塔下草坪休闲、塞纳河畔漫步、蒙马特高地探索。"
}
}
city_activities = recommendations.get(city, { "indoor": f"在{city}可以逛逛当地的博物馆或购物中心。", "outdoor": f"在{city}可以探索当地的公园或著名地标。"
})
if is_rainy_or_bad_weather: return f"由于{city}的天气是'{weather_condition}',推荐室内活动:{city_activities['indoor']}"
else: return f"{city}天气'{weather_condition}',很棒!推荐户外活动:{city_activities['outdoor']}"
# 如何集成Ernie (示例性注释)
# from ernie_client import get_ernie_response
# def _get_activities_from_ernie(self, city, weather_condition):
# print(f"[ActivityAgent] 正在通过Ernie为 '{city}' (天气: {weather_condition}) 生成活动推荐...\")
# prompt = f"我现在在{city},天气是{weather_condition}。请为我推荐一些合适的活动,户外和室内都可以考虑,并简要说明理由。\"
# activities = get_ernie_response(prompt, system_prompt="你是一个旅行活动推荐助手。")
# return activities
def _extract_info_from_query(self, query_text):
# 这是一个非常简化的提取逻辑,实际应用中可能需要更复杂的NLP
# 假设查询格式是 "为[城市]在[天气状况]下推荐活动"
# 或者直接从上下文中获取城市和天气
city = "未知城市"
weather = "未知天气"
# 尝试从文本中提取城市,例如北京、上海等
known_cities = ["北京", "上海", "广州", "深圳", "伦敦", "巴黎"] for c in known_cities: if c in query_text:
city = c break
# 尝试提取天气信息,例如包含"晴朗"、"雨"等关键词
# 这里我们假设天气信息在查询文本的后半部分,或者作为独立参数传入
# 在我们的工作流中,天气信息会由WeatherAgent提供,并作为输入的一部分
# 所以这里的提取逻辑更多是针对直接调用此Agent的情况
weather_keywords = { "晴朗": ["晴", "太阳", "sunny"], "多云": ["多云", "cloudy"], "阴": ["阴天", "overcast"], "雨": ["雨", "rainy", "雷阵雨"], "雪": ["雪", "snowy"]
} for condition, kws in weather_keywords.items(): if any(kw in query_text for kw in kws):
weather = condition break
# 如果没有通过关键词匹配到,则尝试取查询文本中关于天气的描述
# 比如,如果query_text是 "北京的天气是晴朗,25°C,微风。" 中的后半段
# 这里的逻辑需要根据实际的输入格式来完善
if weather == "未知天气": #如果上面没匹配到,尝试更通用的提取
match = re.search(r"天气是([^,。]+)|状况是([^,。]+)", query_text) if match:
weather = match.group(1) or match.group(2) or "宜人"
elif "天气" in query_text: # 退化情况,如果只说了天气,但没描述
weather = "宜人(具体情况未知)"
return city, weather def handle_task(self, task: Task) -> Task:
query_text = ""
if task.message and task.message.content and hasattr(task.message.content, 'text'):
query_text = task.message.content.text # 例如 "为北京在晴朗,25°C,微风。下推荐活动"
# 从查询文本中解析出城市和天气状况
# 在我们的工作流中,城市是初始输入,天气是上一个Agent的输出
# query_text的格式会是 Flow 构建的,例如 "Recommend activities in Beijing given weather: 晴朗,25°C,微风。"
city_match = re.search(r"in (\w+)", query_text, re.IGNORECASE)
city = city_match.group(1) if city_match else "北京" # 默认北京
weather_match = re.search(r"weather: (.+)", query_text, re.IGNORECASE)
weather_condition = weather_match.group(1) if weather_match else "天气不错" # 默认天气不错
if not weather_match: # 尝试从更简单的格式提取天气,如果上一个Agent直接返回天气描述
if "推荐活动" in query_text: # 假设我们构建的查询是 "为[城市]在[天气描述]下推荐活动"
parts = query_text.split("在") if len(parts) > 1:
potential_weather = parts[-1].replace("下推荐活动","").strip() if potential_weather:
weather_condition = potential_weather # **核心逻辑:获取活动推荐**
# 默认使用模拟数据
activity_data = self._get_simulated_activities(city, weather_condition) # # 如果想使用Ernie获取活动 (取消下面一行的注释,并注释掉上面一行):
# activity_data = self._get_activities_from_ernie(city, weather_condition)
print(f"[ActivityAgent] 为 '{city}' (天气: {weather_condition}) 推荐的活动: {activity_data}")
task.artifacts = [{ "parts": [{"type": "text", "text": activity_data}]
}]
task.status = TaskStatus(state=TaskState.COMPLETED) return task# ---- Flask App Setup (与WeatherAgent类似) ----def create_app(agent_instance):
app = Flask(__name__) @app.route('/agent.json', methods=['GET'])
def get_agent_card():
return jsonify(agent_instance.agent_card.to_dict()) @app.route('/a2a/agent.json', methods=['GET'])
def get_a2a_agent_card():
return jsonify(agent_instance.agent_card.to_dict()) @app.route('/tasks/send', methods=['POST'])
def handle_send_task():
try:
task_data = request.json if "jsonrpc" in task_data and "method" in task_data and task_data["method"] == "tasks/send":
task_dict = task_data.get("params", {}) else:
task_dict = task_data
task = Task.from_dict(task_dict)
updated_task = agent_instance.handle_task(task)
response_data = updated_task.to_dict() if "jsonrpc" in task_data: return jsonify({"jsonrpc": "2.0", "id": task_data.get("id"), "result": response_data}) return jsonify(response_data) except Exception as e:
error_response = {"code": -32603, "message": str(e)} if "jsonrpc" in request.json: return jsonify({"jsonrpc": "2.0", "id": request.json.get("id"), "error": error_response}), 500
return jsonify({"error": error_response}), 500
@app.route('/', methods=['POST'])
def handle_root_message(): # 兼容旧版
try:
data = request.json if isinstance(data, dict) and "id" in data and "status" in data: return handle_send_task()
text_content = data.get("content", {}).get("text", "") if isinstance(data.get("content"), dict) else str(data)
message = Message(content=TextContent(text=text_content), role=MessageRole.USER)
task = Task(id=f"task-{time.time_ns()}", message=message, status=TaskStatus(state=TaskState.PENDING))
updated_task = agent_instance.handle_task(task)
response_text = "处理完成。"
if updated_task.artifacts and updated_task.artifacts[0].get("parts"):
response_text = updated_task.artifacts[0]["parts"][0].get("text", response_text)
response_message = Message(
content=TextContent(text=response_text),
role=MessageRole.AGENT,
message_id=f"response-{time.time_ns()}",
parent_message_id=data.get("message_id")
) return jsonify(response_message.to_dict()) except Exception as e: return jsonify({"error": str(e)}), 400
return appif __name__ == "__main__":
PORT = 5002
activity_agent = ActivityAgent(port=PORT)
app = create_app(activity_agent) print(f"ActivityAgent 正在启动,监听端口: http://localhost:{PORT}")
app.run(host='0.0.0.0', port=PORT, debug=False)终端运行 ActivityAgent:
打开 另一个新的 终端窗口(保持WeatherAgent的终端仍在运行),进入该文件所在目录,然后运行:
python activity_agent_server.py
你应该能看到类似 "ActivityAgent 正在启动,监听端口: http://localhost:5002" 的输出。现在,我们有了两个独立的Agent服务在后台运行了。
注意:上述Agent代码中的Flask部分是为了能独立运行和被调用,python-a2a也支持在其他Web框架中集成,或者不通过HTTP直接在代码中实例化和调用Agent Client(例如 agents_workflow.py 示例中的LLM Client)。为了清晰展示A2A的分布式特性,我们这里采用了独立服务的方式。
现在两个Agent都已就绪,我们可以在Jupyter Notebook(或一个新的Python脚本 run_workflow.py)中定义和执行工作流了。这部分代码不需要在终端预先运行,而是直接在Notebook中执行。
# 在Jupyter Notebook单元格中或 run_workflow.py 中执行import timefrom python_a2a import AgentNetwork, Flow, Message, TextContent, MessageRole,A2AClient# 0. (可选) 导入Ernie客户端,如果要在工作流本身或结果处理中使用from ernie_client import get_ernie_response# 1. 定义Agent网络# 这里的URL应与我们启动Agent服务时指定的端口一致WEATHER_AGENT_URL = "http://localhost:5001"ACTIVITY_AGENT_URL = "http://localhost:5002"# 确保WeatherAgent和ActivityAgent服务已在终端启动并正在运行!# 可以简单测试一下Agent是否可达 (可选)import requeststry: print(f"尝试连接WeatherAgent: {WEATHER_AGENT_URL}/agent.json")
weather_card = requests.get(f"{WEATHER_AGENT_URL}/agent.json", timeout=3).json() print(f"WeatherAgent连接成功: {weather_card.get('name')}") print(f"尝试连接ActivityAgent: {ACTIVITY_AGENT_URL}/agent.json")
activity_card = requests.get(f"{ACTIVITY_AGENT_URL}/agent.json", timeout=3).json() print(f"ActivityAgent连接成功: {activity_card.get('name')}")except requests.exceptions.ConnectionError as e: print(f"连接Agent失败! 请确保 weather_agent_server.py 和 activity_agent_server.py 正在运行。错误: {e}") # 如果连接失败,后续工作流会出错,可以根据情况决定是否中止
# raise SystemExit("Agent服务未运行,工作流无法执行。") from eagent_network = AgentNetwork(name="Local Travel Assistant Network")# 直接使用URL添加Agent,AgentNetwork.add内部会创建A2AClient# 第二个参数名为 agent_or_url,可以直接传递URL字符串agent_network.add(name="weather_service", agent_or_url=WEATHER_AGENT_URL)
agent_network.add(name="activity_service", agent_or_url=ACTIVITY_AGENT_URL)print("\nAgent网络已配置:")for agent_info in agent_network.list_agents(): print(f"- {agent_info['name']}: {agent_info['url']}")# 2. 定义工作流# 用户希望查询的城市user_city_input = "北京"# user_city_input = "伦敦"# user_city_input = "火星" # 测试一下Agent的边界情况print(f"\n准备为城市 '{user_city_input}'规划行程。")# 使用Flow API定义工作流程# Flow的每一步都像是在构建一个请求链travel_flow = Flow(agent_network=agent_network, name="Smart Travel Planning Flow")# 第一步:调用 WeatherAgent 获取天气# .ask(agent_name_in_network, query_string_or_Task_Message_or_lambda)# 如果是字符串,则会被自动包装travel_flow.ask(
agent_name="weather_service",
query=user_city_input
)
# 第二步:条件分支,根据天气推荐活动travel_flow.if_contains("雨") # 如果天气结果包含"雨"# 如果包含"雨" (IF分支):# 2a. 使用 FunctionStep 构造特定于此分支的查询字符串travel_flow.execute_function( lambda weather_report_from_step1: f"Recommend indoor activities in {user_city_input} given weather: {weather_report_from_step1}", "{1}" # 参数:传递天气服务步骤的结果给 lambda)# 2b. 调用 ActivityAgent,使用 FunctionStep 生成的查询 (现在在 latest_result 中)travel_flow.ask(
agent_name="activity_service",
query="{latest_result}" # QueryStep 将从 context.data['latest_result'] 获取查询字符串)# 如果不包含"雨" (ELSE分支):travel_flow.else_branch()# 2c. 使用 FunctionStep 构造特定于此分支的查询字符串travel_flow.execute_function( lambda weather_report_from_step1: f"Recommend outdoor activities in {user_city_input} given weather: {weather_report_from_step1}", "{1}" # 参数:传递天气服务步骤的结果给 lambda)# 2d. 调用 ActivityAgent,使用 FunctionStep 生成的查询 (现在在 latest_result 中)travel_flow.ask(
agent_name="activity_service",
query="{latest_result}" # QueryStep 将从 context.data['latest_result'] 获取查询字符串)# 结束条件分支travel_flow.end_if()# 3. 执行工作流print("\n---- 开始执行工作流 ----")
start_time = time.time()# flow.run_sync() 会同步执行整个流程并返回最终步骤的结果# 对于更复杂的场景,可以使用 flow.run() (异步) 或 flow.stream() (流式处理)# 在Jupyter Notebook等已运行asyncio事件循环的环境中,应使用 await 调用异步方法try:
final_result = await travel_flow.run() # context可以传递初始数据,这里我们简单处理
print("---- 工作流执行完毕 ----") print(f"总耗时: {time.time() - start_time:.2f} 秒")
print("\n========= 最终行程推荐 ==========") if final_result: print(final_result) # 你也可以在这里用Ernie对结果进行润色或总结
ernie_summary = get_ernie_response(f"请帮我总结以下旅行建议,使其更吸引人:\n{final_result}") print("\n========= Ernie优化后的建议 ==========") print(ernie_summary) else: print("未能获取到最终结果。") print("\n工作流各步骤结果:") for step_num, step_result in travel_flow.results.items(): print(f"步骤 {step_num}: {step_result}")except Exception as e: print(f"工作流执行失败: {e}") print("详细错误信息:") import traceback
traceback.print_exc() print("\n工作流部分结果(如果存在):") if hasattr(travel_flow, 'results'): for step_num, step_result in travel_flow.results.items(): print(f"步骤 {step_num}: {step_result}")INFO:python_a2a.client.network:Added agent 'weather_service' from URL: http://localhost:5001 INFO:python_a2a.client.network:Added agent 'activity_service' from URL: http://localhost:5002
尝试连接WeatherAgent: http://localhost:5001/agent.json
WeatherAgent连接成功: Weather Agent
尝试连接ActivityAgent: http://localhost:5002/agent.json
ActivityAgent连接成功: Activity Agent
Agent网络已配置:
- weather_service: http://localhost:5001
- activity_service: http://localhost:5002
准备为城市 '北京'规划行程。
---- 开始执行工作流 ----
---- 工作流执行完毕 ----
总耗时: 2.57 秒
========= 最终行程推荐 ==========
{"artifacts":[{"parts":[{"text":"\u5317\u4eac\u5929\u6c14'\u5929\u6c14\u4e0d\u9519'\uff0c\u5f88\u68d2\uff01\u63a8\u8350\u6237\u5916\u6d3b\u52a8\uff1a\u9890\u548c\u56ed\u6f2b\u6b65\u3001\u957f\u57ce\u5f92\u6b65\uff08\u5929\u6c14\u597d\u65f6\uff09\u3001\u4ec0\u5239\u6d77\u80e1\u540c\u9a91\u884c\u3002","type":"text"}]}],"id":"9bf71300-715e-4f76-9890-28a92c928a6d","metadata":{"message_id":"c8fdf577-28ba-4d79-b7d9-8099d7cb8315"},"sessionId":"76cef800-1e60-42f9-afb3-19d1f82c27b1","status":{"state":"completed","timestamp":"2025-05-16T15:07:10.761995"}}INFO:httpx:HTTP Request: POST https://aistudio.baidu.com/llm/lmapi/v3/chat/completions "HTTP/1.1 200 OK"
========= Ernie优化后的建议 ========== 探索北京,尽享绝美天气!北京的天蓝云白,简直棒极了!强烈推荐你体验户外活动的乐趣: - **饕餮和园漫步**:在颐和园的美丽风光中悠闲散步,感受皇家园林的宏伟与细腻。 - **长城徒步**(天气晴好时):踏上长城,挑战自我,俯瞰壮丽山河,领略历史的厚重。 - **什刹海胡同骑行**:穿梭在北京的老胡同中,体验地道的京味儿文化,享受骑行的乐趣。 快来北京,让每一次呼吸都成为享受,让每一步行走都充满惊喜!
在上面的 WeatherAgent 和 ActivityAgent 的代码中,我们已经用注释标出了可以集成 get_ernie_response 函数的地方。
如何操作:
思考与挑战:
这个实战演练展示了A2A Agent和工作流的基本构建方法。通过将特定功能封装到独立的Agent中,并通过工作流将它们串联起来,我们可以构建出模块化、可扩展的AI应用。而Ernie的集成则为这些Agent注入了更高级的智能。
通过前面的实战,我们已经体验了如何构建基础的A2A Agent并用工作流将它们组织起来。现在,让我们深入探讨A2A与MCP如何协同工作,以及如何借鉴 python-a2a 库中 agents_workflow.py 和 parallel_workflow.py 的思想,来构建更复杂、更智能、更高效的系统。
在 agents_workflow.py 示例中,展示了如何根据任务类型(如创作、技术、分析)将用户查询路由到不同的LLM Agent(如GPT-4, Claude等)。这种模式对于充分发挥不同模型的优势、控制成本以及处理特定领域的任务非常有用。
如何应用于Ernie生态?
百度AI Studio平台通常会提供不同版本、不同能力的Ernie模型(例如 ernie-speed, ernie-lite, ernie-3.5-8k, ernie-x1 等)。它们在性能、成本、上下文长度、特定任务的擅长程度上可能有所不同。
我们可以构建一个类似的智能路由系统:
创建多个Ernie Agent: 将不同版本的Ernie模型封装成独立的A2A Agent。每个Agent的 AgentCard 可以描述其特点(如"快速响应型Ernie"、"长文本处理型Ernie"、"高级创作型Ernie")。
终端运行 :这些Agent同样可以作为独立服务运行,每个监听不同端口。
# 假设你有 ernie_lite_agent_server.py 和 ernie_pro_agent_server.pypython ernie_lite_agent_server.py # 例如运行在 5003 端口python ernie_pro_agent_server.py # 例如运行在 5004 端口
创建"任务分类与路由Agent" (RouterAgent) : 这个Agent的核心职责是接收用户原始查询,然后决定哪个Ernie Agent最适合处理这个查询。
# RouterAgent 的 handle_task 逻辑片段# from ernie_client import get_ernie_response # 假设已配置user_query = task.message.content.text# prompt_for_routing = f"用户的问题是:'{user_query}'。请判断这个问题应该由 轻量级问答模型 还是 高级分析模型 处理?请只回答 '轻量级' 或 '高级'。"# routing_decision = get_ernie_response(prompt_for_routing, model="ernie-lite") # # 根据 routing_decision 选择目标Agent的名称或标识# if "高级" in routing_decision:# target_agent_name = "ErnieProAgent"# else:# target_agent_name = "ErnieLiteAgent"# task.artifacts = [{"parts": [{"type": "text", "text": target_agent_name}]}] # 返回目标Agent的名字定义路由工作流 (Flow) :
# 在Jupyter Notebook或主控脚本中# agent_network.add("router", "http://localhost:5005")# agent_network.add("ernie_lite", "http://localhost:5003")# agent_network.add("ernie_pro", "http://localhost:5004")# routing_flow = Flow(agent_network)# routing_flow.ask("router", user_initial_query) # 第一步,让RouterAgent决定用哪个Ernie# # 第二步,根据RouterAgent的结果,动态调用相应的Ernie Agent# # 使用 .route_to_agent() 或更通用的 .execute_function() 结合 .ask()# def decide_next_agent(results, context):# chosen_agent_name = results['1'] # RouterAgent返回的目标Agent名# if chosen_agent_name == "ErnieProAgent":# return {"agent": "ernie_pro", "query": context["original_query"]}# else:# return {"agent": "ernie_lite", "query": context["original_query"]}# routing_flow.execute_function(decide_next_agent, context={"original_query": user_initial_query})# routing_flow.ask(lambda res, ctx: res['2']['agent'], lambda res, ctx: res['2']['query'])# final_answer = routing_flow.run_sync()python-a2a 的 AIAgentRouter 类提供了更成熟的LLM驱动路由方案,它能根据Agent的描述自动选择最合适的Agent。这里我们用 Flow 的基本功能来示意这个概念。
意义:
parallel_workflow.py 示例通过模拟延迟展示了并行执行多个独立Agent任务的优势,可以显著减少总等待时间。
如何应用?
假设我们需要为一个复杂的报告搜集多方面信息:
这三个任务是相对独立的,可以并行执行。
# 在Jupyter Notebook或主控脚本中# 假设 agent_network 中已注册了 ErnieKnowledgeAgent, ErnieAnalysisAgent, ErnieTranslationAgent# 它们都在各自的端口(如5006, 5007, 5008)上独立运行# parallel_flow = Flow(agent_network)# (parallel_flow.parallel() # 开始并行块# .ask("ErnieKnowledgeAgent", "关于A2A协议的背景资料")# .branch() # 新建一个并行分支# .ask("ErnieAnalysisAgent", "分析这份销售数据:[数据...],总结趋势。")# .branch() # 再新建一个并行分支# .ask("ErnieTranslationAgent", "请将这段英文翻译成中文:'The A2A protocol facilitates interoperability between AI agents.'")# .end_parallel()) # 结束并行块,等待所有分支完成# # 并行块的结果是一个字典,key是分支的序号(默认从'1'开始)或自定义名称,value是该分支最后一个操作的结果# # 例如 results_from_parallel['1'] 是知识问答结果, results_from_parallel['2'] 是数据分析结果等# # 接下来可以有一个串行步骤,汇总并行结果# def combine_results(results, context):# knowledge_info = results['1'] # 上一步并行块整体是第1步# analysis_report = knowledge_info["2"] # 获取并行块中第二个分支的结果# translation = knowledge_info["3"] # 第三个分支的结果# # ... 结合这些信息生成最终报告# # final_report = f"背景资料:{knowledge_info["1"]}\n数据分析:{analysis_report}\n翻译内容:{translation}"# # return final_report# # 注意:这里获取并行结果的方式需要根据实际 end_parallel() 返回的嵌套结构来调整# # 通常 results['step_number_of_parallel_block'] 会是一个字典,包含各分支的结果# # 例如: results_from_parallel = results['1'] # 假设并行块是第一步# # background = results_from_parallel['1'] # 并行块的第一个分支# # analysis = results_from_parallel['2'] # 并行块的第二个分支# # translation_text = results_from_parallel['3'] # 并行块的第三个分支# # return f"... {background} ... {analysis} ... {translation_text} ..."# pass # 实际的组合逻辑# parallel_flow.execute_function(combine_results)# report = parallel_flow.run_sync()注意:上述并行Flow的写法是一个示意。 python-a2a 的 Flow.parallel() 和 Flow.branch() 提供了强大的并行控制能力。并行执行Agent调用时,每个Agent服务必须能够处理并发请求(Flask默认是线程安全的,可以处理一定程度的并发)。
终端运行:确保所有参与并行的Agent (ErnieKnowledgeAgent, ErnieAnalysisAgent, ErnieTranslationAgent) 都在各自的服务器上运行。
意义:
现在,让我们聚焦于A2A和MCP如何真正地融合,以打造能力更全面的Agent。
想象一个"超级研究助理Agent (SuperResearchAssistant)"。它是一个A2A Agent,负责接收用户的复杂研究任务。
A2A层面:
MCP层面: 为了完成这个任务,SuperResearchAssistant (作为MCP Client) 需要调用一系列MCP工具:
实现这个MCP Tool的终端运行方式:
每个MCP工具都可以是一个独立的Python服务。例如,ErnieSearchTool 可能是一个Flask应用,它接收MCP格式的请求,内部调用百度搜索API或Ernie自身的搜索增强能力,然后返回MCP格式的响应。
# 假设你有 ernie_search_mcp_server.pypython ernie_search_mcp_server.py # 例如运行在 6001 端口# 假设你有 pdf_extractor_mcp_server.pypython pdf_extractor_mcp_server.py # 例如运行在 6002 端口
python-a2a 提供了 FastMCP 类来帮助快速创建这样的MCP工具服务器。
# 简化版 ernie_search_mcp_server.py 示例# from python_a2a.mcp import FastMCP, text_response, error_response# from ernie_client import get_ernie_response # 假设你想用Ernie来增强搜索# mcp_search_server = FastMCP(name="Ernie Search Tool", description="Performs web searches using Ernie")# @mcp_search_server.tool(# name="web_search",# description="Searches the web for a given query and returns top results."# )# def search_online(query: str, num_results: int = 3):# """MCP tool function"""# try:# # 实际的搜索逻辑,例如调用搜索引擎API或 get_ernie_response
SuperResearchAssistant的内部工作流(伪代码思路):
# SuperResearchAssistant 内部逻辑 - 简化示意# class SuperResearchAssistant(A2AServer):# # ... 初始化时传入 mcp_client ...# def handle_task(self, task: Task) -> Task:# user_query = task.message.content.text# keywords = self._extract_keywords(user_query) # 可能用Ernie# # 1. MCP调用:搜索工具 (如 ErnieSearchTool on port 6001)# try:# raw_search_results = self.mcp_client.call("http://localhost:6001", "web_search", {"query": keywords})# # search_items = parse_results(raw_search_results) # search_items = [{"type":"pdf", "url":"url1", "title":"PDF1"}, {"type":"text", "snippet":"text1"}] # 模拟# except Exception as e:# return self._fail_task(task, f"搜索失败: {e}")# # 2. MCP调用:内容提取 (如 PDFExtractor on port 6002 for PDFs)# processed_content = []# for item in search_items:# if item["type"] == "pdf":# try:# # pdf_text = self.mcp_client.call("http://localhost:6002", "extract_pdf", {"url": item["url"]})# pdf_text = "模拟PDF文本 for " + item["title"]# processed_content.append(pdf_text)# except Exception:# processed_content.append(f"(无法提取PDF: {item['title']})")# else:# processed_content.append(item["snippet"])# full_text = "\n".join(processed_content)# # 3. MCP调用:总结工具 (如 ErnieSummarizer on port 6003)# try:# # summary = self.mcp_client.call("http://localhost:6003", "summarize", {"text": full_text})# # 为了演示,直接调用本地Ernie函数 (实际应通过MCP)# from ernie_client import get_ernie_response# summary = get_ernie_response(f"总结以下内容:{full_text[:1000]}", model="ernie-lite")# except Exception as e:# return self._fail_task(task, f"总结失败: {e}")# task.artifacts = [{"parts": [{"type": "text", "text": summary}]}]# task.status = TaskStatus(state=TaskState.COMPLETED)# return task# def _fail_task(self, task, error_message):# task.status = TaskStatus(state=TaskState.FAILED, message=error_message)# return taskpython-a2a 的 A2AClient 可以配置 mcp_servers 参数,使其能够直接调用MCP工具。或者,你也可以使用 python_a2a.mcp.MCPClient 来独立调用MCP工具。
协同的威力:
通过这种A2A指挥、MCP执行的模式,我们可以构建出既有宏观协作能力,又有微观执行能力的强大Agent系统。这使得Agent开发更加模块化:A2A层面关注流程和协作逻辑,MCP层面关注具体能力的实现和工具化封装。学生们可以基于此理念,为自己的Ernie应用设计更富有想象力的Agent架构!
恭喜你完成了这篇"基于Ernie&A2A协议:构建智能协作的AI应用新范式"教程!我们一起探索了AI Agent的世界,从基本概念到实战演练,再到A2A与MCP协同的深入探讨。
A2A协议:为AI Agent之间的互操作性提供了坚实的基础。它使得我们可以构建模块化的、可独立开发和部署的Agent,并通过标准化的接口(Agent Card, Message, Task)进行通信和协作。python-a2a库的Flow引擎更是简化了复杂Agent交互流程的编排。
MCP协议:极大地扩展了Agent的能力边界。通过将外部API、数据源、代码执行环境等封装为MCP工具,Agent(尤其是基于LLM的Agent)可以超越单纯的文本生成,与真实世界进行交互,获取实时信息,执行具体动作。
A2A与MCP的协同:这是构建高级AI应用的关键。A2A负责Agent间的"社交"与任务委派,形成宏观的协作网络;MCP则赋予每个Agent调用工具的"超能力",解决微观的执行问题。这种分层架构使得系统设计更加清晰、灵活和强大。
Ernie的角色:作为先进的中文大语言模型,Ernie在A2A/MCP生态中可以扮演多种角色:
将Ernie的强大语言能力与A2A/MCP的标准化协作框架相结合,为我们打开了构建下一代智能应用的大门。想象一下:
本教程仅仅是一个开始。python-a2a库还有许多高级功能值得探索:
动手实践是最好的学习方式!
尝试基于本教程的示例进行扩展:
我们正处在AI Agent技术爆发的前夜。掌握A2A和MCP这样的标准化协议,结合Ernie这样强大的大模型能力,将使你具备构建未来复杂智能系统的核心竞争力。希望本教程能为你点燃探索的热情,祝你在AI Agent的开发之路上不断进步,创造出令人惊叹的应用!
以上就是【Workflow】基于Ernie&A2A协议Workflow篇的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号