博客 / 詳情

返回

AI 智能體高可靠設計模式:層級代理組

本系列介紹增強現代智能體系統可靠性的設計模式,以直觀方式逐一介紹每個概念,拆解其目的,然後實現簡單可行的版本,演示其如何融入現實世界的智能體系統。本系列一共 14 篇文章,這是第 5 篇。原文:Building the 14 Key Pillars of Agentic AI

優化智能體解決方案需要軟件工程確保組件協調、並行運行並與系統高效交互。例如預測執行,會嘗試處理可預測查詢以降低時延,或者進行冗餘執行,即對同一智能體重複執行多次以防單點故障。其他增強現代智能體系統可靠性的模式包括:

  • 並行工具:智能體同時執行獨立 API 調用以隱藏 I/O 時延。
  • 層級智能體:管理者將任務拆分為由執行智能體處理的小步驟。
  • 競爭性智能體組合:多個智能體提出答案,系統選出最佳。
  • 冗餘執行:即兩個或多個智能體解決同一任務以檢測錯誤並提高可靠性。
  • 並行檢索和混合檢索:多種檢索策略協同運行以提升上下文質量。
  • 多跳檢索:智能體通過迭代檢索步驟收集更深入、更相關的信息。

還有很多其他模式。

本系列將實現最常用智能體模式背後的基礎概念,以直觀方式逐一介紹每個概念,拆解其目的,然後實現簡單可行的版本,演示其如何融入現實世界的智能體系統。

所有理論和代碼都在 GitHub 倉庫裏:🤖 Agentic Parallelism: A Practical Guide 🚀

代碼庫組織如下:

agentic-parallelism/
    ├── 01_parallel_tool_use.ipynb
    ├── 02_parallel_hypothesis.ipynb
    ...
    ├── 06_competitive_agent_ensembles.ipynb
    ├── 07_agent_assembly_line.ipynb
    ├── 08_decentralized_blackboard.ipynb
    ...
    ├── 13_parallel_context_preprocessing.ipynb
    └── 14_parallel_multi_hop_retrieval.ipynb

層級代理組,追求卓越質量

到目前為止,我們已經探討了智能體如何同時生成並評估想法。

但複雜任務往往會意料之外或不可預見,需要智能體決定執行什麼以及何時執行,從而導致計劃與行動之間的延遲。

專業化與解耦架構模式是解決問題的正確方法。

  1. 複雜任務被分配給高層編排器(或管理器)代理,代理本身並不執行任務,職責是進行規劃。
  2. 代理將複雜任務分解為更小、更明確的子任務,並委派給一組專業執行器代理。
  3. 執行器通常可以同時完成任務。最後,編排器將執行器的結果綜合成統一的輸出。

我們將直接比較單體代理分層組在投資報告生成任務中的表現,以證明分層式方法不僅更快,而且在細節、結構和準確性上都更優於最終報告。

首先,需要定義作為代理之間通信協議的結構化數據模型,結構化輸出是將多智能體系統粘合在一起的紐帶。

from langchain_core.pydantic_v1 import BaseModel, Field
from typing import Optional, List

class FinancialData(BaseModel):
    """金融分析代理的結構化輸出 Pydantic 模型。"""
    price: float = Field(description="Current stock price.")
    market_cap: int = Field(description="Total market capitalization.")
    pe_ratio: float = Field(description="Price-to-Earnings ratio.")
    volume: int = Field(description="Average trading volume.")

class NewsAndMarketAnalysis(BaseModel):
    """新聞與市場分析代理的結構化輸出 Pydantic 模型。"""
    summary: str = Field(description="A concise summary of the most important recent news and market trends.")
    competitors: List[str] = Field(description="A list of the company's main competitors.")

class FinalReport(BaseModel):
    """首席分析師最終綜合投資報告的 Pydantic 模型。"""
    company_name: str = Field(description="The name of the company.")
    financial_summary: str = Field(description="A paragraph summarizing the key financial data.")
    news_and_market_summary: str = Field(description="A paragraph summarizing the news, market trends, and competitive landscape.")
    recommendation: str = Field(description="A final investment recommendation (e.g., 'Strong Buy', 'Hold', 'Sell') with a brief justification.")

這些 Pydantic 模型是定義信息如何在專業代理與最終協調器之間傳遞的正式合約。例如,FinancialData 模型確保金融分析代理始終提供四個具體的數值數據點。這些結構化數據比簡單的文本塊更可靠,也更容易讓最終合成器代理工作。

接下來為分層組定義 GraphState,跟蹤每個專業執行器的輸出。

from typing import TypedDict, Annotated

class TeamGraphState(TypedDict):
    company_symbol: str
    company_name: str
    # 'financial_data' 將保存金融分析代理的結構化輸出
    financial_data: Optional[FinancialData]
    # 'news_analysis' 將保存新聞和市場分析代理的結構化輸出
    news_analysis: Optional[NewsAndMarketAnalysis]
    # 'final_report' 是合成器的最終產物
    final_report: Optional[FinalReport]
    performance_log: Annotated[List[str], operator.add]

TeamGraphState 是分析代理組的共享工作空間,為每個專業代理(financial_datanews_analysis)的交付物設置了具體字段,確保當最終合成器代理激活時,擁有一套乾淨、組織良好的輸入可供工作。

我們定義一下“專業執行器代理”,每個代理都是自成一體、使用工具的代理,且提示非常聚焦。我們創建一下金融分析代理節點。

from langchain.agents import create_tool_calling_agent, AgentExecutor
import time

# 為金融分析代理創建獨立代理執行器
# 提示符高度集中在單一任務上
financial_analyst_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are an expert financial analyst. Your sole job is to use the provided tool to get key financial metrics for a company and return them in a structured format."),
    ("human", "Get the financial data for the company with stock symbol: {symbol}")
])

# 該代理只能訪問 'get_financial_data' 工具
financial_agent = create_tool_calling_agent(llm, [get_financial_data], financial_analyst_prompt)

# 最後強制代理輸出到 'FinancialData' Pydantic 模型中
financial_executor = AgentExecutor(agent=financial_agent, tools=[get_financial_data]) | llm.with_structured_output(FinancialData)

def financial_analyst_node(state: TeamGraphState):
    """用於獲取和構造金融數據的專門節點"""
    print("--- [Financial Analyst] Starting analysis... ---")
    start_time = time.time()
    result = financial_executor.invoke({"symbol": state['company_symbol']})
    execution_time = time.time() - start_time
    log = f"[Financial Analyst] Completed in {execution_time:.2f}s."
    print(log)
    return {"financial_data": result, "performance_log": [log]}

financial_analyst_node 提示詞範圍狹窄,工具集有限。通過將代理限制在單一任務中,大大提高了輸出的可靠性。最後的 .with_structured_output(FinancialData) 調用是一個關鍵質量門檻,確保其交付內容始終以正確的格式呈現。

news_analyst_node 遵循完全相同的模式,但配備了自己的專用提示和工具。

最後定義編排器代理。該代理(即 report_synthesizer_node),接收並行執行器的結構化輸出,執行最終的綜合步驟。

# 為合成器/編排器創建鏈
report_synthesizer_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are the Chief Investment Analyst. Your job is to synthesize the structured financial data and market analysis provided by your specialist team into a final, comprehensive investment report, including a justified recommendation."),
    ("human", "Please create the final report for {company_name}.\n\nFinancial Data:\n{financial_data}\n\nNews and Market Analysis:\n{news_analysis}")
])
synthesizer_chain = report_synthesizer_prompt | llm.with_structured_output(FinalReport)

def report_synthesizer_node(state: TeamGraphState):
    """接受結構化執行器輸出併合成最終報告的編排器節點"""
    print("--- [Chief Analyst] Synthesizing final report... ---")
    start_time = time.time()
    
    # 該節點從狀態中讀取結構化數據
    report = synthesizer_chain.invoke({
        "company_name": state['company_name'],
        "financial_data": state['financial_data'].json(),
        "news_analysis": state['news_analysis'].json()
    })
    
    execution_time = time.time() - start_time
    log = f"[Chief Analyst] Completed report in {execution_time:.2f}s."
    print(log)
    return {"final_report": report, "performance_log": [log]}

report_synthesizer_node 是負責組裝最終產品的管理器,不需要調用任何工具,執行單純的綜合工作。

通過從狀態中獲取清晰、結構化的 FinancialDataNewsAndMarketAnalysis 對象,可以專注於構建連貫敍述並做出最終且有根據的推薦的高級任務。

現在,用“扇出扇入”架構來組裝圖。

from langgraph.graph import StateGraph, END

# 初始化圖
workflow = StateGraph(TeamGraphState)

# 為兩個專業執行器和最後的合成器添加節點
workflow.add_node("financial_analyst", financial_analyst_node)
workflow.add_node("news_analyst", news_analyst_node)
workflow.add_node("report_synthesizer", report_synthesizer_node)

# 入口點是一個列表,告訴 LangGraph 並行運行兩個專業執行器
workflow.set_entry_point(["financial_analyst", "news_analyst"])

# 節點列表中的一條邊意味着圖將等待所有節點完成後再繼續
# 這是“扇入”或同步步驟
workflow.add_edge(["financial_analyst", "news_analyst"], "report_synthesizer")

# 合成器是最後一步
workflow.add_edge("report_synthesizer", END)

# 編譯圖
app = workflow.compile()

# 執行流
inputs = {
    "company_symbol": "TSLA",
    "company_name": "Tesla",
    "performance_log": []
}

start_time = time.time()
team_result = None
for output in app.stream(inputs, stream_mode="values"):
    team_result = output
end_time = time.time()
team_time = end_time - start_time

層級代理

現在進行最後的一對一分析,比較最終報告的質量和兩套系統的性能。

print("="*60)
print("                AGENT OUTPUT COMPARISON")
print("="*60)
print("\n" + "-"*60)
print("            MONOLITHIC AGENT REPORT")
print("-"*60 + "\n")
print(f"'{monolithic_result['output']}'")


print("\n" + "-"*60)
print("            HIERARCHICAL TEAM REPORT")
print("-"*60 + "\n")
print(json.dumps(team_result['final_report'], indent=4, default=lambda o: o.dict()))
print("\n" + "="*60)
print("                      ACCURACY & QUALITY ANALYSIS")
print("="*60 + "\n")

print("="*60)
print("                      PERFORMANCE ANALYSIS")
print("="*60 + "\n")
print(f"Monolithic Agent Total Time: {monolithic_time:.2f} seconds") # (Assuming monolithic_time is from the notebook run)
print(f"Hierarchical Team Total Time: {team_time:.2f} seconds\n") # (Assuming team_time is from the notebook run)
time_saved = monolithic_time - team_time
print(f"Time Saved: {time_saved:.2f} seconds ({time_saved/monolithic_time*100:.0f}% faster)\n")
print("Analysis of Parallelism:")

# (從筆記本的性能日誌中提取工作時間)
worker_times = [6.89, 8.12] 
parallel_worker_time = max(worker_times)
sequential_worker_time = sum(worker_times)

這是得到的輸出……

#### 輸出 ####
============================================================
                AGENT OUTPUT COMPARISON
============================================================

------------------------------------------------------------
            MONOLITHIC AGENT REPORT
------------------------------------------------------------
Tesla (TSLA) is currently trading at around $177.48. Recent news suggests the company is facing competition from other EV makers but is also expanding its Gigafactory network. The recommendation is to Hold the stock and monitor the competitive landscape.
------------------------------------------------------------
            HIERARCHICAL TEAM REPORT
------------------------------------------------------------
{
    "final_report": {
        "company_name": "Tesla",
        "financial_summary": "Tesla's current stock price is 177.48, with a total market capitalization of 566,310,215,680. It exhibits a trailing Price-to-Earnings (P/E) ratio of 45.4...",
...ndation based on synthesizing multiple data points. The Monolithic agents analysis was superficial by comparison.
**Conclusion:** The decomposition of the task and the use of specialist agents led to a provably higher-quality and more accurate output. The structure imposed by the hierarchy and Pydantic models forced a more rigorous and detailed analysis.
============================================================
                      PERFORMANCE ANALYSIS
============================================================
Monolithic Agent Total Time: 18.34 seconds
Hierarchical Team Total Time: 13.57 seconds
Time Saved: 4.77 seconds (26% faster)
Analysis of Parallelism:
The two specialist workers ran in parallel. If run sequentially, this stage would have taken 15.01 seconds. By running them in parallel, the stage took only 8.12 seconds (the time of the longest worker). This parallelism is the primary reason the more complex, higher-quality hierarchical system was also significantly faster.

兩個專業執行器並行運行……

金融分析代理用了 6.89s,新聞分析代理用了 8.12s,最終報告更為詳細,如果按順序進行,這一階段大約需要 15.01s。

通過並行運行,該階段僅用時 8.12s(最長執行器的時間)。這種並行性是更復雜、更高質量的層級系統速度顯著更快的主要原因。


Hi,我是俞凡,一名兼具技術深度與管理視野的技術管理者。曾就職於 Motorola,現任職於 Mavenir,多年帶領技術團隊,聚焦後端架構與雲原生,持續關注 AI 等前沿方向,也關注人的成長,篤信持續學習的力量。在這裏,我會分享技術實踐與思考。歡迎關注公眾號「DeepNoMind」,星標不迷路。也歡迎訪問獨立站 www.DeepNoMind.com,一起交流成長。

本文由mdnice多平台發佈

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.