動態

詳情 返回 返回

[MCP][07]logging和progress等功能説明 - 動態 詳情

前言

截至目前(2025年9月19日),除了基礎的Prompt、Resource和Tool概念,FastMCP還提供了以下功能:Sampling、Elicitation、Roots、Logging、Progress、Proxy、Middleware、Composition和Authentication等功能

  • Sampling,採樣,在server端調用client的llm,實現功能解耦
  • Elicition,徵詢,實現人工介入
  • Roots,Client告知Server可訪問的資源
  • Logging,將Server日誌發送給Client
  • Progress,Server端將進度發送給Client
  • Proxy,代理其它MCP Server
  • Middleware,攔截MCP通信中的請求和響應
  • Composition,Server端將多個servers組合成一個server對外提供
  • Authentication,Client和Server之間安全認證

其中Sampling和Elicitation在我的實際開發中用到的比較多,所以我在前面章節中單獨拎出來介紹了。FastMCP官方文檔也説了Authentication還在迅速迭代中,雖然已經有了相關文檔,但本文暫時就不涉及了,等這個功能穩定了再具體細説。剩下的功能會在本文中一次性全部介紹完,篇幅較長,可以根據章節名跳轉到自己需要關注的內容。本文大部分參考自官方文檔

Roots

Roots 是客户端向服務器告知其可訪問資源的一種機制。服務器可利用此信息調整行為或提供更相關的響應。

靜態Roots

from fastmcp import Client

client = Client(
    "my_mcp_server.py", 
    roots=["/path/to/root1", "/path/to/root2"]
)

動態Roots

from fastmcp import Client
from fastmcp.client.roots import RequestContext

async def roots_callback(context: RequestContext) -> list[str]:
    print(f"Server requested roots (Request ID: {context.request_id})")
    return ["/path/to/root1", "/path/to/root2"]

client = Client(
    "my_mcp_server.py", 
    roots=roots_callback
)

Logging

Logging,從服務器向 MCP 客户端發送消息。FastMCP提供了一個logger(fastmcp.utilities.logging.get_logger()),也可以用python標準庫的logging

服務器日誌功能允許 MCP 工具向客户端發送調試(debug)、信息(info)、警告(warning)和錯誤(error)級別的消息。這有助於用户瞭解函數執行過程,在開發和運行階段輔助調試。一般用於以下場景:

  • 調試:發送詳細的執行信息,幫助診斷問題
  • 進度可見性:讓用户瞭解工具當前正在執行的操作
  • 錯誤報告:向客户端傳達問題及其上下文
  • 審計追蹤:為合規或分析目的生成工具執行記錄

與標準 Python 日誌不同,MCP 服務器 Logging 會直接將消息發送至客户端,使其在客户端界面或日誌中可見。

Server 示例

在任意tool函數中使用Context提供的日誌方法:

from fastmcp import FastMCP, Context

mcp = FastMCP("custom")

@mcp.tool
async def analyze_data(data: list[float], ctx: Context) -> dict:
    """通過全面日誌記錄分析數值數據。"""
    await ctx.debug("開始分析數值數據")
    await ctx.info(f"正在分析 {len(data)} 個數據點")
    
    try:
        if not data:
            await ctx.warning("提供了空數據列表")
            return {"error": "空數據列表"}
        
        result = sum(data) / len(data)
        await ctx.info(f"分析完成,平均值為:{result}")
        return {"average": result, "count": len(data)}
        
    except Exception as e:
        await ctx.error(f"分析失敗:{str(e)}")
        raise

if __name__ == "__main__":
    mcp.run(transport="stdio", show_banner=False)

所有日誌方法(debuginfowarningerrorlog)現在均支持 extra 參數,該參數接受一個字典,用於傳遞任意結構化數據。這使得客户端可接收結構化日誌,便於創建豐富且可查詢的日誌記錄。

@mcp.tool
async def process_transaction(transaction_id: str, amount: float, ctx: Context):
    await ctx.info(
        f"正在處理交易 {transaction_id}",
        extra={
            "transaction_id": transaction_id,
            "amount": amount,
            "currency": "USD"
        }
    )
    # ... 處理邏輯 ...

Client 示例

import asyncio
from pathlib import Path
from fastmcp.client import Client, StdioTransport
from fastmcp.client.logging import LogMessage
import logging
import sys

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
# This mapping is useful for converting MCP level strings to Python's levels
LOGGING_LEVEL_MAP = logging.getLevelNamesMapping()

class MCPClient:
    def __init__(self):
        self.mcp_client = Client(
            StdioTransport(
                command = str(Path(__file__).parent.parent / ".venv" / "bin" / "python"),
                args = ["demo09-server.py"],
                cwd = str(Path(__file__).parent)
            ),
            log_handler=self.logging_handler,
        )

    async def logging_handler(self, message: LogMessage):
        """
        Handles incoming logs from the MCP server and forwards them
        to the standard Python logging system.
        """
        msg = message.data.get('msg')
        extra = message.data.get('extra')

        # Convert the MCP log level to a Python log level
        level = LOGGING_LEVEL_MAP.get(message.level.upper(), logging.INFO)
        logger.log(level, msg, extra=extra)


    async def generate(self):
        async with self.mcp_client:
            await self.mcp_client.ping()

            rst = await self.mcp_client.call_tool("analyze_data", arguments={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})
            print(rst)

async def main():
    client = MCPClient()
    await client.generate()

if __name__ == "__main__":
    asyncio.run(main())

client運行輸出

開始分析數值數據
正在分析 5 個數據點
分析完成,平均值為:3.0
CallToolResult(content=[TextContent(type='text', text='{"average":3.0,"count":5}', annotations=None, meta=None)], structured_content={'average': 3.0, 'count': 5}, data={'average': 3.0, 'count': 5}, is_error=False)

Progress

Progress 功能允許 MCP tool 向 Client 通知長時間運行操作的當前進度。這使得Client能夠顯示進度指示器,從而在執行耗時任務時提供更佳的用户體驗。Progress 在以下方面具有重要價值:

  • 用户體驗:讓用户瞭解長時間運行操作的當前狀態
  • 進度指示器:使客户端能夠顯示進度條或百分比
  • 防止超時:表明操作正在持續進行中,避免被誤判為無響應
  • 調試用途:追蹤執行進度,便於性能分析

Server 示例

from fastmcp import FastMCP, Context
import asyncio

mcp = FastMCP("custom")

@mcp.tool
async def process_items(items: list[str], ctx: Context) -> dict:
    """處理項目列表,併發送進度更新。"""
    total = len(items)
    results = []
    
    for i, item in enumerate(items):
        # 每處理一個項目,報告當前進度
        await ctx.report_progress(progress=i, total=total)
        
        # 模擬處理耗時
        await asyncio.sleep(0.1)
        results.append(item.upper())
    
    # 報告 100% 完成
    await ctx.report_progress(progress=total, total=total)
    
    return {"processed": len(results), "results": results}

if __name__ == "__main__":
    mcp.run(transport="stdio", show_banner=False)

Client 示例

import asyncio
from pathlib import Path
from fastmcp.client import Client, StdioTransport


class MCPClient:
    def __init__(self):
        self.mcp_client = Client(
            StdioTransport(
                command = str(Path(__file__).parent.parent / ".venv" / "bin" / "python"),
                args = ["demo09-server.py"],
                cwd = str(Path(__file__).parent)
            ),
            progress_handler=self.progress_handler,
        )

    async def progress_handler(self, progress: float, total: float | None, message: str | None) -> None:
        if total is not None:
            percentage = (progress / total) * 100
            print(f"Progress: {percentage:.1f}% - {message or ''}")
        else:
            print(f"Progress: {progress} - {message or ''}")

    async def generate(self):
        async with self.mcp_client:
            await self.mcp_client.ping()

            rst = await self.mcp_client.call_tool("process_items", arguments={"items": ["item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8", "item9", "item10", "item11", "item12", "item13", "item14", "item15"]})
            print(rst)

async def main():
    client = MCPClient()
    await client.generate()

if __name__ == "__main__":
    asyncio.run(main())

client運行輸出

Progress: 0.0% -
Progress: 6.7% -
Progress: 13.3% -
Progress: 20.0% -
Progress: 26.7% -
Progress: 33.3% -
Progress: 40.0% -
Progress: 46.7% -
Progress: 53.3% -
Progress: 60.0% -
Progress: 66.7% -
Progress: 73.3% -
Progress: 80.0% -
Progress: 86.7% -
Progress: 93.3% -
Progress: 100.0% -
CallToolResult(content=[TextContent(type='text', text='{"processed":15,"results":["ITEM1","ITEM2","ITEM3","ITEM4","ITEM5","ITEM6","ITEM7","ITEM8","ITEM9","ITEM10","ITEM11","ITEM12","ITEM13","ITEM14","ITEM15"]}', annotations=None, meta=None)], structured_content={'processed': 15, 'results': ['ITEM1', 'ITEM2', 'ITEM3', 'ITEM4', 'ITEM5', 'ITEM6', 'ITEM7', 'ITEM8', 'ITEM9', 'ITEM10', 'ITEM11', 'ITEM12', 'ITEM13', 'ITEM14', 'ITEM15']}, data={'processed': 15, 'results': ['ITEM1', 'ITEM2', 'ITEM3', 'ITEM4', 'ITEM5', 'ITEM6', 'ITEM7', 'ITEM8', 'ITEM9', 'ITEM10', 'ITEM11', 'ITEM12', 'ITEM13', 'ITEM14', 'ITEM15']}, is_error=False)

Proxy

FastMCP 的 Proxy 允許一個 FastMCP 服務器實例作為前端,代理另一個 MCP 服務器(該服務器可能是遠程的、運行在不同傳輸協議上的,甚至是另一個 FastMCP 實例)。此功能通過 FastMCP.as_proxy() 類方法實現。作為代理服務器,它本身不直接實現工具或資源。當它接收到請求(如 tools/callresources/read)時,會將該請求轉發至一個_後端_ MCP 服務器,接收其響應,再將響應原樣返回給原始客户端。

sequenceDiagram participant ClientApp as 您的客户端(如 Claude Desktop) participant FastMCPProxy as FastMCP 代理服務器 participant BackendServer as 後端 MCP 服務器(如遠程 SSE) ClientApp->>FastMCPProxy: MCP 請求(如 stdio) Note over FastMCPProxy, BackendServer: 代理轉發請求 FastMCPProxy->>BackendServer: MCP 請求(如 sse) BackendServer-->>FastMCPProxy: MCP 響應(如 sse) Note over ClientApp, FastMCPProxy: 代理轉發響應 FastMCPProxy-->>ClientApp: MCP 響應(如 stdio)

核心優勢

  • 會話隔離:每個請求擁有獨立隔離的會話,確保併發操作安全
  • 傳輸協議橋接:通過一種傳輸協議暴露運行在另一種傳輸協議上的服務器
  • 高級 MCP 功能支持:自動轉發採樣(sampling)、引導(elicitation)、日誌和進度報告
  • 安全性:作為後端服務器的受控網關
  • 簡化架構:即使後端位置或傳輸協議變更,前端仍保持單一接入點

使用代理服務器時,特別是連接到基於 HTTP 的後端服務器時,需注意延遲可能顯著增加。例如,list_tools() 操作可能耗時數百毫秒,而本地工具僅需 1–2 毫秒。掛載代理服務器時,此延遲會影響父服務器的所有操作,而不僅僅是與被代理工具的交互。
如果您的使用場景對低延遲有嚴格要求,建議使用 import_server() 方法在啓動時複製工具,而非在運行時進行代理。

快速入門

推薦使用 ProxyClient 創建代理,它提供完整的 MCP 功能支持,並自動實現會話隔離:

from fastmcp import FastMCP
from fastmcp.server.proxy import ProxyClient

# 創建支持完整 MCP 功能的代理
proxy = FastMCP.as_proxy(
    ProxyClient("backend_server.py"),
    name="MyProxy"
)

# 運行代理(例如,通過 stdio 供 Claude Desktop 使用)
if __name__ == "__main__":
    proxy.run()

此單一設置即可提供:

  • 安全的併發請求處理
  • 自動轉發高級 MCP 功能(採樣、引導等)
  • 會話隔離,防止上下文混淆
  • 與所有 MCP 客户端完全兼容

高級MCP功能支持

ProxyClient 會自動在後端服務器與連接到代理的客户端之間轉發高級 MCP 協議功能,確保完整的 MCP 兼容性。支持的功能:

  • Roots:將文件系統根目錄訪問請求轉發給客户端
  • Sampling:將後端發起的 LLM 補全請求轉發給客户端
  • Elicitation:將用户輸入請求轉發給客户端
  • Logging:將後端日誌消息轉發至客户端
  • Progress:在長時間操作中轉發進度通知

也可以自定義功能支持,比如設置為None來選擇性禁用轉發

# 禁用採樣,但保留其他功能
backend = ProxyClient(
    "backend_server.py",
    sampling_handler=None,  # 禁用 LLM 採樣轉發
    log_handler=None        # 禁用日誌轉發
)

基於配置的代理

你可以直接從符合 MCPConfig 模式的配置字典創建代理。這對於快速設置指向遠程服務器的代理非常有用,無需手動配置每個連接細節。

from fastmcp import FastMCP

# 直接從配置字典創建代理
config = {
    "mcpServers": {
        "default": {  # 對於單服務器配置,通常使用 'default'
            "url": "https://example.com/mcp  ",
            "transport": "http"
        }
    }
}

# 創建指向配置服務器的代理(自動創建 ProxyClient)
proxy = FastMCP.as_proxy(config, name="Config-Based Proxy")

# 通過 stdio 傳輸協議本地運行
if __name__ == "__main__":
    proxy.run()

多服務器的設置

你可以通過在配置中指定多個條目來創建指向多個服務器的代理。系統會自動以配置名稱作為前綴掛載它們:

# 多服務器配置
config = {
    "mcpServers": {
        "weather": {
            "url": "https://weather-api.example.com/mcp  ",
            "transport": "http"
        },
        "calendar": {
            "url": "https://calendar-api.example.com/mcp  ",
            "transport": "http"
        }
    }
}

# 創建統一的多服務器代理
composite_proxy = FastMCP.as_proxy(config, name="Composite Proxy")

# 工具和資源可通過前綴訪問:
# - weather_get_forecast, calendar_add_event
# - weather://weather/icons/sunny, calendar://calendar/events/today

顯式會話管理

在內部,FastMCP.as_proxy() 使用 FastMCPProxy 類。您通常無需直接與此類交互,但在高級場景下它可供使用。FastMCPProxy 要求顯式會話管理——不會執行任何自動檢測。您必須選擇您的會話策略:

# 在所有請求間共享會話(併發時需謹慎)
shared_client = ProxyClient("backend_server.py")
def shared_session_factory():
    return shared_client

proxy = FastMCPProxy(client_factory=shared_session_factory)

# 為每個請求創建新會話(推薦)
def fresh_session_factory():
    return ProxyClient("backend_server.py")

proxy = FastMCPProxy(client_factory=fresh_session_factory)

如需自動選擇會話策略,請使用便捷方法 FastMCP.as_proxy()

# 帶有特定配置的自定義工廠
def custom_client_factory():
    client = ProxyClient("backend_server.py")
    # 在此處添加任何自定義配置
    return client

proxy = FastMCPProxy(client_factory=custom_client_factory)

Middleware

MCP 中間件允許您在請求和響應流經服務器時對其進行攔截和修改。可以將其視為一條管道,每個中間件均可檢查當前操作、進行修改,然後將控制權傳遞給鏈中的下一個中間件。與傳統的 Web 中間件不同,MCP 中間件專為 Model Context Protocol 設計,為各類 MCP 操作(如工具調用、資源讀取和提示請求)提供專用鈎子。

MCP 中間件是一個全新概念,未來版本中可能發生破壞性變更。

MCP 中間件的常見應用場景包括:

  • 身份驗證與授權:在執行操作前驗證客户端權限
  • 日誌與監控:追蹤使用模式與性能指標
  • 速率限制:按客户端或操作類型控制請求頻率
  • 請求/響應轉換:在數據到達工具前或離開後對其進行修改
  • 緩存:存儲頻繁請求的數據以提升性能
  • 錯誤處理:為服務器提供一致的錯誤響應

中間件工作原理

FastMCP 中間件基於管道模型運行。當請求進入時,它會按添加到服務器的順序依次流經各個中間件。每個中間件均可:

檢查傳入的請求及其上下文
在傳遞給下一個中間件或處理器前修改請求
通過調用 call_next() 執行鏈中的下一個中間件/處理器
在返回前檢查並修改響應
處理執行過程中發生的錯誤

關鍵在於,中間件形成一條鏈,每個環節決定是繼續處理還是完全終止鏈的執行。

如果你熟悉 ASGI 中間件,FastMCP 中間件的基本結構會感覺似曾相識。其核心是一個可調用類,接收一個包含當前 JSON-RPC 消息信息的上下文對象,以及一個用於繼續中間件鏈的處理器函數。

重要的是要理解,MCP 基於 JSON-RPC 規範 運行。雖然 FastMCP 以熟悉的方式呈現請求和響應,但其本質是 JSON-RPC 消息,而非 Web 應用中常見的 HTTP 請求/響應對。FastMCP 中間件適用於所有 傳輸類型 ,包括本地 stdio 傳輸和 HTTP 傳輸,但並非所有中間件實現都兼容所有傳輸類型(例如,檢查 HTTP 頭部的中間件無法在 stdio 傳輸中工作)。

實現中間件最基礎的方式是重寫 Middleware 基類的 call 方法:

from fastmcp.server.middleware import Middleware, MiddlewareContext

class RawMiddleware(Middleware):
    async def __call__(self, context: MiddlewareContext, call_next):
        # 此方法接收所有消息,無論類型
        print(f"原始中間件正在處理:{context.method}")
        result = await call_next(context)
        print(f"原始中間件處理完成:{context.method}")
        return result

中間件鈎子

為便於用户針對特定類型的消息,FastMCP 中間件提供了一系列專用鈎子。您可以重寫特定的鈎子方法(而非實現原始的 __call__ 方法),這些方法僅在特定類型的操作時被調用,從而允許您精確地定位中間件邏輯所需的粒度。

鈎子層級與執行順序

FastMCP 提供多個按不同粒度調用的鈎子。理解此層級結構對有效設計中間件至關重要。

當請求進入時,同一請求可能觸發多個鈎子調用,執行順序由泛化到具體:

  1. on_message - 為所有 MCP 消息(請求和通知)調用
  2. on_requeston_notification - 根據消息類型調用
  3. 操作特定鈎子 - 為特定 MCP 操作調用,如 on_call_tool

例如,當客户端調用工具時,您的中間件將收到多次鈎子調用

  1. on_messageon_request 用於任何初始工具發現操作(如 list_tools)
  2. on_message(因為它是任何 MCP 消息)用於工具調用本身
  3. on_request(因為工具調用期望響應)用於工具調用本身
  4. on_call_tool(因為它是具體的工具執行)用於工具調用本身

請注意,MCP SDK 可能會執行額外操作(如為緩存目的列出工具),這將觸發超出直接工具執行範圍的額外中間件調用。

此層級結構允許您以適當的粒度定位中間件邏輯。對廣泛關注點(如日誌)使用 on_message,對身份驗證使用 on_request,對工具特定邏輯(如性能監控)使用 on_call_tool

可用鈎子

  • on_message: 為所有 MCP 消息(請求和通知)調用
  • on_request: 專為 MCP 請求(期望響應)調用
  • on_notification: 專為 MCP 通知(即發即棄)調用
  • on_call_tool: 在執行工具時調用
  • on_read_resource: 在讀取資源時調用
  • on_get_prompt: 在獲取提示時調用
  • on_list_tools: 在列出可用工具時調用
  • on_list_resources: 在列出可用資源時調用
  • on_list_resource_templates: 在列出資源模板時調用
  • on_list_prompts: 在列出可用提示時調用

中間件中的組件訪問

理解如何在中間件中訪問組件信息(工具、資源、提示)對構建強大的中間件功能至關重要。訪問模式在列出操作與執行操作之間存在顯著差異。

列出操作 vs 執行操作

FastMCP 中間件以不同方式處理兩種類型的操作:

列出操作 (on_list_tools, on_list_resources, on_list_prompts 等):

  • 中間件接收FastMCP 組件對象,包含完整元數據
  • 這些對象包含 FastMCP 特有屬性(如 tags),可直接從組件訪問
  • 結果在轉換為 MCP 格式前包含完整組件信息
  • 標籤包含在返回給 MCP 客户端的組件 meta 字段中

執行操作 (on_call_tool, on_read_resource, on_get_prompt):

  • 中間件在組件執行前運行
  • 中間件結果為執行結果,或組件未找到時的錯誤
  • 組件元數據在鈎子參數中不可直接訪問

在執行期間訪問組件元數據

如果需要在執行操作期間檢查組件屬性(如標籤),請使用通過上下文獲取的 FastMCP 服務器實例:

from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ToolError

class TagBasedMiddleware(Middleware):
    async def on_call_tool(self, context: MiddlewareContext, call_next):
        # 訪問工具對象以檢查其元數據
        if context.fastmcp_context:
            try:
                tool = await context.fastmcp_context.fastmcp.get_tool(context.message.name)
                
                # 檢查此工具是否帶有 "private" 標籤
                if "private" in tool.tags:
                    raise ToolError("訪問被拒絕:私有工具")
                    
                # 檢查工具是否啓用
                if not tool.enabled:
                    raise ToolError("工具當前已禁用")
                    
            except Exception:
                # 工具未找到或其他錯誤 - 讓執行繼續
                # 並自然處理錯誤
                pass
        
        return await call_next(context)

相同模式適用於資源和提示:

from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ResourceError, PromptError

class ComponentAccessMiddleware(Middleware):
    async def on_read_resource(self, context: MiddlewareContext, call_next):
        if context.fastmcp_context:
            try:
                resource = await context.fastmcp_context.fastmcp.get_resource(context.message.uri)
                if "restricted" in resource.tags:
                    raise ResourceError("訪問被拒絕:受限資源")
            except Exception:
                pass
        return await call_next(context)
    
    async def on_get_prompt(self, context: MiddlewareContext, call_next):
        if context.fastmcp_context:
            try:
                prompt = await context.fastmcp_context.fastmcp.get_prompt(context.message.name)
                if not prompt.enabled:
                    raise PromptError("提示當前已禁用")
            except Exception:
                pass
        return await call_next(context)

處理列出結果

對於列出操作,中間件 call_next 函數在組件轉換為 MCP 格式前返回 FastMCP 組件列表。您可以過濾或修改此列表並將其返回給客户端。例如:

from fastmcp.server.middleware import Middleware, MiddlewareContext

class ListingFilterMiddleware(Middleware):
    async def on_list_tools(self, context: MiddlewareContext, call_next):
        result = await call_next(context)
        
        # 過濾掉帶有 "private" 標籤的工具
        filtered_tools = [
            tool for tool in result 
            if "private" not in tool.tags
        ]
        
        # 返回修改後的列表
        return filtered_tools

此過濾在組件轉換為 MCP 格式並返回給客户端前進行。標籤在過濾期間可訪問,幷包含在最終列出響應的組件 meta 字段中。

在列出操作中過濾組件時,請確保也在相應的執行鈎子(on_call_toolon_read_resourceon_get_prompt)中阻止已過濾組件的執行,以保持一致性。

工具調用拒絕

您可以通過在中間件中拋出 ToolError 來拒絕訪問特定工具。這是阻止工具執行的正確方式,因為它與 FastMCP 錯誤處理系統正確集成

from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ToolError

class AuthMiddleware(Middleware):
    async def on_call_tool(self, context: MiddlewareContext, call_next):
        tool_name = context.message.name
        
        # 拒絕訪問受限工具
        if tool_name.lower() in ["delete", "admin_config"]:
            raise ToolError("訪問被拒絕:工具需要管理員權限")
        
        # 允許其他工具繼續執行
        return await call_next(context)

拒絕工具調用時,務必拋出 ToolError,而非返回 ToolResult 對象或其他值。ToolError 確保錯誤通過中間件鏈正確傳播,並轉換為正確的 MCP 錯誤響應格式。

工具調用修改

對於工具調用等執行操作,您可以在執行前修改參數,或在執行後轉換結果:

from fastmcp.server.middleware import Middleware, MiddlewareContext

class ToolCallMiddleware(Middleware):
    async def on_call_tool(self, context: MiddlewareContext, call_next):
        # 在執行前修改參數
        if context.message.name == "calculate":
            # 確保輸入為正數
            if context.message.arguments.get("value", 0) < 0:
                context.message.arguments["value"] = abs(context.message.arguments["value"])
        
        result = await call_next(context)
        
        # 在執行後轉換結果
        if context.message.name == "get_data":
            # 向結果添加元數據
            if result.structured_content:
                result.structured_content["processed_at"] = "2024-01-01T00:00:00Z"
        
        return result

對於更復雜的工具重寫場景,請考慮使用 工具轉換 模式,它為創建修改後的工具變體提供了更結構化的方法。

鈎子剖析

每個中間件鈎子遵循相同的模式。讓我們通過 on_message 鈎子來理解其結構:

async def on_message(self, context: MiddlewareContext, call_next):
    # 1. 預處理:檢查並可選地修改請求
    print(f"正在處理 {context.method}")
    
    # 2. 鏈式延續:調用下一個中間件/處理器
    result = await call_next(context)
    
    # 3. 後處理:檢查並可選地修改響應
    print(f"已完成 {context.method}")
    
    # 4. 返回結果(可能已修改)
    return result

每個鈎子接收兩個參數:

  1. context: MiddlewareContext - 包含當前請求信息:

    • context.method - MCP 方法名稱(如 "tools/call")
    • context.source - 請求來源("client" 或 "server")
    • context.type - 消息類型("request" 或 "notification")
    • context.message - MCP 消息數據
    • context.timestamp - 請求接收時間
    • context.fastmcp_context - FastMCP Context 對象(如可用)
  2. call_next - 用於繼續中間件鏈的函數。除非您希望完全停止處理,否則必須調用此函數。

開發者對請求流擁有完全控制權:

  • 繼續處理:調用 await call_next(context) 以繼續
  • 修改請求:在調用 call_next 前更改上下文
  • 修改響應:在調用 call_next 後更改結果
  • 停止鏈:不調用 call_next(極少需要)
  • 處理錯誤:在 try/catch 塊中包裝 call_next

除了修改請求和響應,您還可以存儲狀態數據,供工具(可選)稍後訪問。為此,請使用 FastMCP Context 適當調用 set_stateget_state

創建中間件

FastMCP 中間件通過繼承 Middleware 基類並重寫所需鈎子來實現。

from fastmcp import FastMCP
from fastmcp.server.middleware import Middleware, MiddlewareContext

class LoggingMiddleware(Middleware):
    """記錄所有 MCP 操作的中間件。"""
    
    async def on_message(self, context: MiddlewareContext, call_next):
        """為所有 MCP 消息調用。"""
        print(f"正在處理來自 {context.source} 的 {context.method}")
        
        result = await call_next(context)
        
        print(f"{context.method} 處理完成")
        return result

# 將中間件添加到您的服務器
mcp = FastMCP("MyServer")
mcp.add_middleware(LoggingMiddleware())

向服務器添加中間件

中間件按添加到服務器的順序執行。最先添加的中間件在進入時最先運行,在退出時最後運行:

mcp = FastMCP("MyServer")

mcp.add_middleware(AuthenticationMiddleware("secret-token"))
mcp.add_middleware(PerformanceMiddleware())
mcp.add_middleware(LoggingMiddleware())

這將創建以下執行流:

  1. AuthenticationMiddleware(預處理)
  2. PerformanceMiddleware(預處理)
  3. LoggingMiddleware(預處理)
  4. 實際工具/資源處理器
  5. LoggingMiddleware(後處理)
  6. PerformanceMiddleware(後處理)
  7. AuthenticationMiddleware(後處理)

組合服務器與中間件

當使用 服務器組合(下面提的Composition) (如 mountimport_server)時,中間件行為遵循以下規則:

  1. 父服務器中間件為所有請求運行,包括路由到掛載服務器的請求
  2. 掛載服務器中間件僅為由該特定服務器處理的請求運行
  3. 中間件順序在每個服務器內保持不變
# 帶有中間件的父服務器
parent = FastMCP("Parent")
parent.add_middleware(AuthenticationMiddleware("token"))

# 帶有自身中間件的子服務器
child = FastMCP("Child")
child.add_middleware(LoggingMiddleware())

@child.tool
def child_tool() -> str:
    return "from child"

# 掛載子服務器
parent.mount(child, prefix="child")

當客户端調用 "child_tool" 時,請求將首先流經父服務器的身份驗證中間件,然後路由到子服務器,在子服務器中再經過其日誌中間件。

內置中間件

FastMCP 包含多箇中間件實現,展示了最佳實踐並提供立即可用的功能。讓我們通過構建簡化版本來探索每種類型的工作原理,然後瞭解如何使用完整實現。

計時中間件

性能監控對於理解服務器行為和識別瓶頸至關重要。FastMCP 在 fastmcp.server.middleware.timing 中包含計時中間件。

以下是其工作方式的示例:

import time
from fastmcp.server.middleware import Middleware, MiddlewareContext

class SimpleTimingMiddleware(Middleware):
    async def on_request(self, context: MiddlewareContext, call_next):
        start_time = time.perf_counter()
        
        try:
            result = await call_next(context)
            duration_ms = (time.perf_counter() - start_time) * 1000
            print(f"請求 {context.method} 在 {duration_ms:.2f}ms 內完成")
            return result
        except Exception as e:
            duration_ms = (time.perf_counter() - start_time) * 1000
            print(f"請求 {context.method} 在 {duration_ms:.2f}ms 後失敗:{e}")
            raise

要使用具有正確日誌和配置的完整版本:

from fastmcp.server.middleware.timing import (
    TimingMiddleware, 
    DetailedTimingMiddleware
)

# 對所有請求進行基礎計時
mcp.add_middleware(TimingMiddleware())

# 詳細的操作級計時(工具、資源、提示)
mcp.add_middleware(DetailedTimingMiddleware())

內置版本包括自定義日誌支持、正確格式化,且 DetailedTimingMiddleware 提供 on_call_toolon_read_resource 等操作特定鈎子,以實現精細計時。

日誌中間件

請求和響應日誌記錄對於調試、監控和理解 MCP 服務器中的使用模式至關重要。FastMCP 在 fastmcp.server.middleware.logging 中提供全面的日誌中間件。

以下是其工作方式的示例:

from fastmcp.server.middleware import Middleware, MiddlewareContext

class SimpleLoggingMiddleware(Middleware):
    async def on_message(self, context: MiddlewareContext, call_next):
        print(f"正在處理來自 {context.source} 的 {context.method}")
        
        try:
            result = await call_next(context)
            print(f"{context.method} 處理完成")
            return result
        except Exception as e:
            print(f"{context.method} 失敗:{e}")
            raise

要使用具有高級功能的完整版本:

from fastmcp.server.middleware.logging import (
    LoggingMiddleware, 
    StructuredLoggingMiddleware
)

# 支持負載的人類可讀日誌
mcp.add_middleware(LoggingMiddleware(
    include_payloads=True,
    max_payload_length=1000
))

# 用於日誌聚合工具的 JSON 結構化日誌
mcp.add_middleware(StructuredLoggingMiddleware(include_payloads=True))

內置版本包括負載日誌、結構化 JSON 輸出、自定義日誌支持、負載大小限制以及用於精細控制的操作特定鈎子。

速率限制中間件

速率限制對於保護服務器免受濫用、確保公平資源使用以及在負載下保持性能至關重要。FastMCP 在 fastmcp.server.middleware.rate_limiting 中包含複雜的速率限制中間件。

以下是其工作方式的示例:

import time
from collections import defaultdict
from fastmcp.server.middleware import Middleware, MiddlewareContext
from mcp import McpError
from mcp.types import ErrorData

class SimpleRateLimitMiddleware(Middleware):
    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.client_requests = defaultdict(list)
    
    async def on_request(self, context: MiddlewareContext, call_next):
        current_time = time.time()
        client_id = "default"  # 實際中,從頭部或上下文中提取
        
        # 清理舊請求並檢查限制
        cutoff_time = current_time - 60
        self.client_requests[client_id] = [
            req_time for req_time in self.client_requests[client_id]
            if req_time > cutoff_time
        ]
        
        if len(self.client_requests[client_id]) >= self.requests_per_minute:
            raise McpError(ErrorData(code=-32000, message="超出速率限制"))
        
        self.client_requests[client_id].append(current_time)
        return await call_next(context)

要使用具有高級算法的完整版本:

from fastmcp.server.middleware.rate_limiting import (
    RateLimitingMiddleware, 
    SlidingWindowRateLimitingMiddleware
)

# 令牌桶速率限制(允許受控突發)
mcp.add_middleware(RateLimitingMiddleware(
    max_requests_per_second=10.0,
    burst_capacity=20
))

# 滑動窗口速率限制(精確的基於時間的控制)
mcp.add_middleware(SlidingWindowRateLimitingMiddleware(
    max_requests=100,
    window_minutes=1
))

內置版本包括令牌桶算法、按客户端識別、全局速率限制以及具有可配置客户端識別功能的異步安全實現。

錯誤處理中間件

一致的錯誤處理和恢復對於健壯的 MCP 服務器至關重要。FastMCP 在 fastmcp.server.middleware.error_handling 中提供全面的錯誤處理中間件。

以下是其工作方式的示例:

import logging
from fastmcp.server.middleware import Middleware, MiddlewareContext

class SimpleErrorHandlingMiddleware(Middleware):
    def __init__(self):
        self.logger = logging.getLogger("errors")
        self.error_counts = {}
    
    async def on_message(self, context: MiddlewareContext, call_next):
        try:
            return await call_next(context)
        except Exception as error:
            # 記錄錯誤並跟蹤統計信息
            error_key = f"{type(error).__name__}:{context.method}"
            self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
            
            self.logger.error(f"{context.method} 中發生錯誤:{type(error).__name__}: {error}")
            raise

要使用具有高級功能的完整版本:

from fastmcp.server.middleware.error_handling import (
    ErrorHandlingMiddleware, 
    RetryMiddleware
)

# 全面的錯誤日誌和轉換
mcp.add_middleware(ErrorHandlingMiddleware(
    include_traceback=True,
    transform_errors=True,
    error_callback=my_error_callback
))

# 帶指數退避的自動重試
mcp.add_middleware(RetryMiddleware(
    max_retries=3,
    retry_exceptions=(ConnectionError, TimeoutError)
))

內置版本包括錯誤轉換、自定義回調、可配置的重試邏輯以及正確的 MCP 錯誤格式化。

組合中間件

from fastmcp import FastMCP
from fastmcp.server.middleware.timing import TimingMiddleware
from fastmcp.server.middleware.logging import LoggingMiddleware
from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware

mcp = FastMCP("Production Server")

# 按邏輯順序添加中間件
mcp.add_middleware(ErrorHandlingMiddleware())  # 首先處理錯誤
mcp.add_middleware(RateLimitingMiddleware(max_requests_per_second=50))
mcp.add_middleware(TimingMiddleware())  # 計時實際執行
mcp.add_middleware(LoggingMiddleware())  # 記錄所有內容

@mcp.tool
def my_tool(data: str) -> str:
    return f"已處理:{data}"

Composition

隨着MCP 應用規模擴大,你可能希望將工具、資源和提示按邏輯模塊組織,或複用現有的服務器組件。FastMCP 通過兩種方法支持服務器組合:

  • import_server:一次性複製組件並添加前綴(靜態組合)。
  • mount:創建實時鏈接,主服務器在運行時將請求委託給子服務器(動態組合)。

為什麼要組合服務器

  • 模塊化:將大型應用拆分為更小、更專注的服務器(例如 WeatherServerDatabaseServerCalendarServer)。
  • 可複用性:創建通用工具服務器(例如 TextProcessingServer),並在需要時掛載。
  • 團隊協作:不同團隊可分別開發獨立的 FastMCP 服務器,後期再進行組合。
  • 邏輯組織:將相關功能按邏輯分組,便於管理。

導入vs掛載

選擇導入還是掛載取決於您的具體用例和需求。

特性 導入 掛載
方法 FastMCP.import_server(server, prefix=None) FastMCP.mount(server, prefix=None)
組合類型 一次性複製(靜態) 實時鏈接(動態)
更新同步 子服務器的變更不會反映到主服務器 子服務器的變更立即反映到主服務器
性能 快速 — 無運行時委託開銷 較慢 — 受最慢掛載服務器影響
前綴 可選 — 省略則保留原名稱 可選 — 省略則保留原名稱
適用場景 打包最終組件、性能敏感場景 運行時模塊化組合

導入

import_server() 方法將一個 FastMCP 實例(子服務器)中的所有組件(工具、資源、模板、提示)複製到另一個實例(主服務器)中。可選提供 prefix 以避免命名衝突。若未提供前綴,組件將按原樣導入。當多個服務器使用相同前綴(或無前綴)導入時,最後導入的服務器組件將覆蓋先前導入的同名組件。

from fastmcp import FastMCP
import asyncio

# 定義子服務器
weather_mcp = FastMCP(name="WeatherService")

@weather_mcp.tool
def get_forecast(city: str) -> dict:
    """獲取天氣預報。"""
    return {"city": city, "forecast": "Sunny"}

@weather_mcp.resource("data://cities/supported")
def list_supported_cities() -> list[str]:
    """列出支持天氣查詢的城市。"""
    return ["London", "Paris", "Tokyo"]

# 定義主服務器
main_mcp = FastMCP(name="MainApp")

# 導入子服務器
async def setup():
    await main_mcp.import_server(weather_mcp, prefix="weather")

# 結果:main_mcp 現包含帶前綴的組件:
# - 工具: "weather_get_forecast"
# - 資源: "data://weather/cities/supported" 

if __name__ == "__main__":
    asyncio.run(setup())
    main_mcp.run()

導入的工作原理

當你調用 await main_mcp.import_server(subserver, prefix={whatever}) 時:

  1. 工具subserver 的所有工具被添加到 main_mcp,名稱前綴為 {prefix}_
    • subserver.tool(name="my_tool") 變為 main_mcp.tool(name="{prefix}_my_tool")
  2. 資源:所有資源的 URI 和名稱均被添加前綴。
    • URI: subserver.resource(uri="data://info") 變為 main_mcp.resource(uri="data://{prefix}/info")
    • 名稱: resource.name 變為 "{prefix}_{resource.name}"
  3. 資源模板:模板的前綴規則與資源類似。
    • URI: subserver.resource(uri="data://{id}") 變為 main_mcp.resource(uri="data://{prefix}/{id}")
    • 名稱: template.name 變為 "{prefix}_{template.name}"
  4. 提示:所有提示的名稱被添加前綴 {prefix}_
    • subserver.prompt(name="my_prompt") 變為 main_mcp.prompt(name="{prefix}_my_prompt")

請注意,import_server 執行的是一次性複製。在導入之後subserver 所做的更改不會反映在 main_mcp 中。subserverlifespan 上下文也不會由主服務器執行。

prefix 參數是可選的。如果省略,組件將按原樣導入,不進行修改,這樣組件將保留其原始名稱。當導入多個具有相同前綴或無前綴的服務器時,最後導入的服務器的組件將優先。

掛載

mount() 方法在 main_mcp 服務器與 subserver 之間創建一個實時鏈接。它不復制組件,而是在運行時將匹配可選 prefix 的組件請求委託subserver 處理。若未提供前綴,則子服務器的組件可通過原始名稱直接訪問。當多個服務器使用相同前綴(或無前綴)掛載時,對於衝突的組件名稱,最後掛載的服務器將優先。

import asyncio
from fastmcp import FastMCP, Client

# 定義子服務器
dynamic_mcp = FastMCP(name="DynamicService")

@dynamic_mcp.tool
def initial_tool():
    """初始工具演示。"""
    return "Initial Tool Exists"

# 掛載子服務器(同步操作)
main_mcp = FastMCP(name="MainAppLive")
main_mcp.mount(dynamic_mcp, prefix="dynamic")

# 在掛載後添加工具 — 仍可通過 main_mcp 訪問
@dynamic_mcp.tool
def added_later():
    """掛載後添加的工具。"""
    return "Tool Added Dynamically!"

# 測試訪問已掛載的工具
async def test_dynamic_mount():
    tools = await main_mcp.get_tools()
    print("可用工具:", list(tools.keys()))
    # 輸出:['dynamic_initial_tool', 'dynamic_added_later']
    
    async with Client(main_mcp) as client:
        result = await client.call_tool("dynamic_added_later")
        print("結果:", result.data)
        # 輸出:"Tool Added Dynamically!"

if __name__ == "__main__":
    asyncio.run(test_dynamic_mount())

掛載的工作原理

配置掛載後:

  1. 實時鏈接:父服務器與掛載的服務器建立連接。
  2. 動態更新:對掛載服務器的更改在通過父服務器訪問時立即生效。
  3. 前綴訪問:父服務器使用前綴將請求路由到掛載的服務器。
  4. 委託:對匹配前綴的組件的請求在運行時委託給掛載的服務器處理。

命名工具、資源、模板和提示的前綴規則與 import_server 相同。這包括為資源和模板的 URI/鍵及名稱添加前綴,以便在多服務器配置中更好地識別。

由於“實時鏈接”的存在,父服務器上的 list_tools() 等操作會受到最慢掛載服務器速度的影響。特別是,基於 HTTP 的掛載服務器可能引入顯著延遲(300-400ms,而本地工具僅需 1-2ms),並且這種減速會影響整個服務器,而不僅僅是與 HTTP 代理工具的交互。如果性能至關重要,通過 import_server() 導入工具可能是更合適的解決方案,因為它在啓動時一次性複製組件,而不是在運行時委託請求。

user avatar yunpan-plus 頭像 OBCE666 頭像
點贊 2 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.