博客 / 詳情

返回

MCP官方Go SDK嚐鮮

前言

此前在 MCP 官網就注意到官方提供了 Go SDK,近期由於在 Python 環境下開發 MCP Server 有點"審美疲勞",因此決定使用 Go 語言嚐嚐鮮。

從個人實際體驗來看,Go 語言在併發處理方面確實具有顯著優勢:無需糾結於同步阻塞、異步事件循環、多進程多線程通信等複雜的併發問題,goroutine 一把梭哈。同時,Go 語言的部署也非常便捷,編譯後生成的靜態二進制文件具有良好的可移植性,可以在不同環境中直接運行。

然而,這種便利性也伴隨着一定的代價。相較於 Python,使用 Go 語言實現 MCP 功能相對複雜一些,開發效率略低。這就是軟件工程中的經典權衡了:運行成本與開發成本往往難以兼得,需要根據具體場景進行取捨。

MCP 協議簡介

可能都耳熟能詳了,但以防還有不熟悉的朋友,先簡單介紹下MCP

Model Context Protocol (MCP) 是一種標準化的協議,旨在為 AI 模型提供統一的工具調用接口。通過 MCP,開發者可以將各種工具、服務和數據源暴露給 AI 模型,使其能夠執行超出基礎語言模型能力範圍的操作。MCP 支持多種傳輸協議,包括 HTTP 和 Stdio,為不同場景下的集成提供了靈活性。

一個簡單的 MCP Server 示例

MCP 官方 Go SDK 在定義工具(Tool)時,要求明確指定輸入參數和輸出結果的數據結構。對於功能較為簡單的工具,也可以直接使用 any 類型。以下是一個完整的 MCP Server 示例,提供了三個實用工具:

  1. getCurrentDatetime:獲取當前時間,返回 RFC3339 格式(2006-01-02T15:04:05Z07:00)的時間戳字符串。由於不需要輸入參數,因此參數類型定義為 any,輸出同樣使用 any 類型。

  2. getComputerStatus:獲取當前系統的關鍵信息,包括 CPU 使用率、內存使用情況、系統版本等。該工具接受一個 CPUSampleTime 參數,對應的輸入結構體為 GetComputerStatusIn,輸出結構體為 GetComputerStatusOut(Go SDK 的示例中通常採用 xxxInxxxOut 的命名約定來區分工具的輸入輸出結構體)。

  3. getDiskInfo:獲取所有硬盤分區的使用信息和文件系統詳情。該工具無需輸入參數,僅定義了輸出結構體 GetDiskInfoOut

在完成所有工具邏輯的實現後,最後一步是啓動服務。以下示例採用 Streamable HTTP 模式啓動,同時也保留了 Stdio Transport 模式的註釋代碼供參考。

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"net/http"
	"time"

	"github.com/modelcontextprotocol/go-sdk/mcp"
	"github.com/shirou/gopsutil/v4/cpu"
	"github.com/shirou/gopsutil/v4/disk"
	"github.com/shirou/gopsutil/v4/host"
	"github.com/shirou/gopsutil/v4/mem"
)

func getCurrentDatetime(ctx context.Context, req *mcp.CallToolRequest, arg any) (*mcp.CallToolResult, any, error) {
	now := time.Now().Format(time.RFC3339)
	return nil, now, nil
}

type GetComputerStatusIn struct {
	CPUSampleTime time.Duration `json:"cpu_sample_time" jsonschema:"the sample time of cpu usage. Default is 1s"`
}

type GetComputerStatusOut struct {
	Hostinfo    string `json:"host info" jsonschema:"the hostinfo of the computer"`
	TimeZone    string `json:"time_zone" jsonschema:"the time zone of the computer"`
	IPAddress   string `json:"ip_address" jsonschema:"the ip address of the computer"`
	CPUUsage    string `json:"cpu_usage" jsonschema:"the cpu usage of the computer"`
	MemoryUsage string `json:"memory_usage" jsonschema:"the memory usage of the computer"`
}

func getComputerStatus(ctx context.Context, req *mcp.CallToolRequest, args GetComputerStatusIn) (*mcp.CallToolResult, GetComputerStatusOut, error) {
	if args.CPUSampleTime == 0 {
		args.CPUSampleTime = time.Second
	}
	hInfo, err := host.Info()
	if err != nil {
		return nil, GetComputerStatusOut{}, err
	}

	var resp GetComputerStatusOut
	resp.Hostinfo = fmt.Sprintf("%+v", *hInfo)

	name, offset := time.Now().Zone()
	resp.TimeZone = fmt.Sprintf("Timezone: %s (UTC%+d)\n", name, offset/3600)

	// CPU Usage
	percent, err := cpu.Percent(time.Second, false)
	if err != nil {
		return nil, GetComputerStatusOut{}, err
	}
	resp.CPUUsage = fmt.Sprintf("CPU Usage: %.2f%%\n", percent[0])

	// Memory Usage
	v, err := mem.VirtualMemory()
	if err != nil {
		return nil, GetComputerStatusOut{}, err
	}
	resp.MemoryUsage = fmt.Sprintf("Mem Usage: %.2f%% (Used: %vMB / Total: %vMB)\n",
		v.UsedPercent, v.Used/1024/1024, v.Total/1024/1024)

	// Ip Address
	conn, err := net.Dial("udp", "8.8.8.8:80")
	if err != nil {
		return nil, GetComputerStatusOut{}, err
	}
	defer conn.Close()
	localAddr := conn.LocalAddr().(*net.UDPAddr)
	resp.IPAddress = localAddr.IP.String()

	return nil, resp, nil
}

type DiskInfo struct {
	Device     string   `json:"device" jsonschema:"the device name"`
	Mountpoint string   `json:"mountpoint" jsonschema:"the mountpoint"`
	Fstype     string   `json:"fstype" jsonschema:"the filesystem type"`
	Opts       []string `json:"opts" jsonschema:"the mount options"`
	DiskTotal  uint64   `json:"disk_total" jsonschema:"the total disk space in GiB"`
	DiskUsage  float64  `json:"disk_usage" jsonschema:"the disk usage percentage"`
}

type GetDiskInfoOut struct {
	PartInfos []DiskInfo `json:"part_infos" jsonschema:"the disk partitions"`
}

func getDiskInfo(ctx context.Context, req *mcp.CallToolRequest, args any) (*mcp.CallToolResult, GetDiskInfoOut, error) {
	partInfos, err := disk.Partitions(false)
	if err != nil {
		return nil, GetDiskInfoOut{}, err
	}

	var resp []DiskInfo
	for _, part := range partInfos {
		diskUsage, err := disk.Usage(part.Mountpoint)
		if err != nil {
			continue
		}
		resp = append(resp, DiskInfo{
			Device:     part.Device,
			Mountpoint: part.Mountpoint,
			Fstype:     part.Fstype,
			Opts:       part.Opts,
			DiskTotal:  diskUsage.Total / 1024 / 1024 / 1024,
			DiskUsage:  diskUsage.UsedPercent,
		})
	}
	return nil, GetDiskInfoOut{PartInfos: resp}, nil
}

func main() {
	// ctx := context.Background()

	server := mcp.NewServer(&mcp.Implementation{Name: "MCP_Demo", Version: "0.0.1"}, &mcp.ServerOptions{
		Instructions: "日期時間相關的 Server",
	})
	mcp.AddTool(server, &mcp.Tool{
		Name:        "get_current_datetime",
		Description: "Get current datetime in RFC3339 format",
	}, getCurrentDatetime)

	mcp.AddTool(server, &mcp.Tool{
		Name:        "get_computer_status",
		Description: "Get computer status",
	}, getComputerStatus)

	mcp.AddTool(server, &mcp.Tool{
		Name:        "get_disk_info",
		Description: "Get disk information",
	}, getDiskInfo)

	// if err := server.Run(ctx, &mcp.StdioTransport{}); err != nil {
	// 	log.Fatalln(err)
	// }
	//
	handler := mcp.NewStreamableHTTPHandler(func(req *http.Request) *mcp.Server {
		path := req.URL.Path
		switch path {
		case "/api/mcp":
			return server
		default:
			return nil
		}
	}, nil)
	url := "127.0.0.1:18001"
	if err := http.ListenAndServe(url, handler); err != nil {
		log.Fatalln(err)
	}
}

MCP Server 代碼編譯通過後,可以在支持 MCP 協議的開發工具(如 VS Code)中進行測試驗證。以下是一個典型的 .vscode/mcp.json 配置示例:

{
    "servers": {
        "demo-http": {
            // "command": "/home/rainux/Documents/workspace/go-dev/mcp-dev/mcp-server-dev/mcp-server-dev"
            "type": "http",
            "url": "http://127.0.0.1:18001/api/mcp"
        }
    }
}

啓動 MCP Server 後,可以通過向 LLM 提出相關問題來驗證工具是否能夠被正確調度和執行。

一個完整的 MCP Client 實現

為了構建端到端的 MCP 應用,我們還需要實現一個 MCP Client,使其能夠與 LLM 協同工作,自動選擇並調用合適的工具。以下是一個功能完整的 MCP Client 實現,其中包含了與 OpenAI 兼容 API 的集成示例(callOpenAI 函數)。

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"net/http"
	"os/exec"
	"time"

	"github.com/modelcontextprotocol/go-sdk/mcp"
	"github.com/openai/openai-go/v3"
	"github.com/openai/openai-go/v3/option"
	"github.com/openai/openai-go/v3/packages/param"
)

var (
	FLAG_ModelName     string
	FLAG_BaseURL       string
	FLAG_APIKEY        string
	FLAG_MCP_TRANSPORT string
	FLAG_MCP_URI       string
	FLAG_QUESTION      string
	FLAG_STREAM        bool
)

func main() {
	// Parse command-line flags
	flag.StringVar(&FLAG_BaseURL, "base-url", "https://dashscope.aliyuncs.com/compatible-mode/v1", "llm base url")
	flag.StringVar(&FLAG_ModelName, "model", "qwen-plus", "LLM Model Name")
	flag.StringVar(&FLAG_MCP_TRANSPORT, "mcp-transport", "http", "MCP transport protocol (stdio or http)")
	flag.StringVar(&FLAG_MCP_URI, "mcp-uri", "", "MCP server address")
	flag.StringVar(&FLAG_APIKEY, "api-key", "", "llm api key")
	flag.StringVar(&FLAG_QUESTION, "q", "Hi", "question")
	flag.BoolVar(&FLAG_STREAM, "s", false, "stream response")

	flag.Parse()

	// Get configuration from environment variables with flag overrides
	if FLAG_APIKEY == "" {
		log.Fatalln("api key is empty")
	}

	if FLAG_QUESTION == "" {
		log.Fatalln("question is empty")
	}

	// Configure OpenAI client
	// config :=
	ctx := context.Background()

	// question := "Write me a haiku about computers"
	if FLAG_MCP_URI != "" {
		callOpenAIWithTools(ctx, FLAG_QUESTION)
	} else {
		callOpenAI(ctx, FLAG_QUESTION, FLAG_STREAM)
	}
}

// callOpenAI 調用 OpenAI API 接口處理用户問題
// 該函數支持流式(stream)和非流式(non-stream)兩種響應方式
//
// 參數:
//   - ctx: 控制操作生命週期的上下文
//   - question: 用户提出的問題字符串
//   - stream: 布爾值,指定是否使用流式響應
func callOpenAI(ctx context.Context, question string, stream bool) {
	client := openai.NewClient(option.WithAPIKey(FLAG_APIKEY), option.WithBaseURL(FLAG_BaseURL))
	systemPrompt := "請用親切熱情的風格回答用户的問題"

	if stream {
		// 創建流式響應請求
		streamResp := client.Chat.Completions.NewStreaming(ctx, openai.ChatCompletionNewParams{
			Messages: []openai.ChatCompletionMessageParamUnion{
				openai.SystemMessage(systemPrompt),
				openai.UserMessage(question),
			},
			Model: FLAG_ModelName,
		})
		// defer streamResp.Close()
		defer func() {
			err := streamResp.Close()
			if err != nil {
				log.Fatalln(err)
			}
		}()
		// 遍歷流式響應並逐塊輸出內容
		for streamResp.Next() {
			data := streamResp.Current()
			fmt.Print(data.Choices[0].Delta.Content)

			if err := streamResp.Err(); err != nil {
				log.Fatalln(err)
			}
		}

	} else {
		// 創建非流式響應請求
		chatCompletion, err := client.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{
			Messages: []openai.ChatCompletionMessageParamUnion{
				openai.SystemMessage(systemPrompt),
				openai.UserMessage(question),
			},
			Model: FLAG_ModelName,
		})
		if err != nil {
			log.Fatalln(err)
		}
		// 輸出非流式響應內容
		fmt.Println(chatCompletion.Choices[0].Message.Content)
	}
}

// callOpenAIWithTools 使用 OpenAI API 和 MCP 工具調用來處理用户問題
// 該函數創建一個 OpenAI 客户端和 MCP 客户端,將 MCP 工具轉換為 OpenAI 可使用的格式,
// 並執行完整的工具調用流程,包括初始調用和可能的後續調用
//
// 參數:
//   - ctx: 控制操作生命週期的上下文
//   - question: 用户提出的問題字符串
func callOpenAIWithTools(ctx context.Context, question string) {
	// 創建 OpenAI 客户端,使用 API 密鑰和基礎 URL 配置
	llmClient := openai.NewClient(option.WithAPIKey(FLAG_APIKEY), option.WithBaseURL(FLAG_BaseURL))
	// 創建 MCP 客户端,指定名稱和版本
	mcpClient := mcp.NewClient(&mcp.Implementation{Name: "mcp-client", Version: "0.0.1"}, nil)
	var transport mcp.Transport
	// 根據命令行標誌選擇傳輸協議(stdio 或 http)
	switch FLAG_MCP_TRANSPORT {
	case "stdio":
		transport = &mcp.CommandTransport{Command: exec.Command(FLAG_MCP_URI)}
	case "http":
		transport = &mcp.StreamableClientTransport{HTTPClient: &http.Client{Timeout: time.Second * 10}, Endpoint: FLAG_MCP_URI}
	default:
		log.Fatalf("unknown transport, %s", FLAG_MCP_TRANSPORT)
	}
	// 建立與 MCP 服務器的連接
	session, err := mcpClient.Connect(ctx, transport, nil)
	if err != nil {
		log.Fatalf("MCP client connects to mcp server failed, err: %v", err)
	}
	defer func() {
		err := session.Close()
		if err != nil {
			log.Fatalln(err)
		}
	}()

	// 獲取可用的 MCP 工具列表
	mcpTools, err := session.ListTools(ctx, &mcp.ListToolsParams{})
	if err != nil {
		log.Fatalf("List mcp tools failed, err: %v", err)
	}

	var legacyTools []openai.ChatCompletionToolUnionParam
	// 遍歷所有 MCP 工具並將其轉換為 OpenAI 兼容的工具格式
	for _, tool := range mcpTools.Tools {
		// 將 MCP 工具輸入模式轉換為 OpenAI 函數參數
		if inputSchema, ok := tool.InputSchema.(map[string]any); ok {
			legacyTools = append(legacyTools, openai.ChatCompletionFunctionTool(
				openai.FunctionDefinitionParam{
					Name:        tool.Name,
					Description: openai.String(tool.Description),
					Parameters:  openai.FunctionParameters(inputSchema),
				},
			))
		} else {
			// 如果 InputSchema 不是 map[string]any,使用空參數
			legacyTools = append(legacyTools, openai.ChatCompletionFunctionTool(
				openai.FunctionDefinitionParam{
					Name:        tool.Name,
					Description: openai.String(tool.Description),
					Parameters:  openai.FunctionParameters{},
				},
			))
		}
	}

	// 設置初始聊天消息,包括系統提示和用户問題
	messages := []openai.ChatCompletionMessageParamUnion{
		openai.SystemMessage("請用親切熱情的風格回答用户的問題。你可以使用可用的工具來獲取信息。"),
		openai.UserMessage(question),
	}

	// 調用 LLM 獲取初步響應
	chatCompletion, err := llmClient.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{
		Messages: messages,
		Model:    FLAG_ModelName,
		Tools:    legacyTools,
		ToolChoice: openai.ChatCompletionToolChoiceOptionUnionParam{
			OfAuto: param.Opt[string]{
				Value: "auto",
			},
		},
	})
	if err != nil {
		log.Fatalf("LLM call failed, err: %v", err)
	}

	choice := chatCompletion.Choices[0]
	fmt.Printf("LLM response: %s\n", choice.Message.Content)

	// 檢查是否需要調用工具
	if choice.FinishReason == "tool_calls" && len(choice.Message.ToolCalls) > 0 {
		// 遍歷所有需要調用的工具
		for _, toolCall := range choice.Message.ToolCalls {
			if toolCall.Type != "function" {
				continue
			}

			fmt.Printf("Executing tool: %s with args: %s\n", toolCall.Function.Name, toolCall.Function.Arguments)

			// 解析 JSON 參數
			var argsObj map[string]any
			args := toolCall.Function.Arguments

			if args != "" {
				if err := json.Unmarshal([]byte(args), &argsObj); err != nil {
					log.Printf("Failed to parse tool arguments: %v", err)
					argsObj = make(map[string]any)
				}
			} else {
				argsObj = make(map[string]any)
			}

			fmt.Printf("Executing tool: %s with parsed args: %v\n", toolCall.Function.Name, argsObj)

			// 執行 MCP 工具調用
			result, err := session.CallTool(ctx, &mcp.CallToolParams{
				Name:      toolCall.Function.Name,
				Arguments: argsObj,
			})
			if err != nil {
				log.Printf("Tool call failed: %v", err)
				continue
			}

			// 將 MCP 內容轉換為字符串
			var toolResult string
			if len(result.Content) > 0 {
				if textContent, ok := result.Content[0].(*mcp.TextContent); ok {
					toolResult = textContent.Text
				} else {
					// 如果不是 TextContent,轉換為 JSON
					if jsonBytes, err := json.Marshal(result.Content[0]); err == nil {
						toolResult = string(jsonBytes)
					} else {
						toolResult = "Tool executed successfully"
					}
				}
			}

			fmt.Printf("Tool result: %s\n", toolResult)

			// 添加工具調用消息和工具響應消息
			messages = append(messages, openai.ChatCompletionMessageParamUnion{
				OfAssistant: &openai.ChatCompletionAssistantMessageParam{
					Role: "assistant",
					ToolCalls: []openai.ChatCompletionMessageToolCallUnionParam{
						{
							OfFunction: &openai.ChatCompletionMessageFunctionToolCallParam{
								ID: toolCall.ID,
								Function: openai.ChatCompletionMessageFunctionToolCallFunctionParam{
									Name:      toolCall.Function.Name,
									Arguments: toolCall.Function.Arguments,
								},
							},
						},
					},
				},
			})

			messages = append(messages, openai.ToolMessage(
				toolResult,
				toolCall.ID,
			))

			// 進行後續調用以獲得最終響應
			chatCompletion, err = llmClient.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{
				Messages: messages,
				Model:    FLAG_ModelName,
			})
			if err != nil {
				log.Fatalf("LLM follow-up failed, err: %v", err)
			}

			fmt.Printf("Final response: %s\n", chatCompletion.Choices[0].Message.Content)
		}
	}
}

運行測試驗證

編譯完成後,我們可以進行多輪測試來驗證功能的正確性。

普通問答測試

./mcp-client-dev -api-key "sk-xxx" -q "how are you"

還可以加上 -s 參數啓用流式輸出:

./mcp-client-dev -api-key "sk-xxx" -q "how are you" -s

預期輸出:

Hi there! 😊 I'm absolutely wonderful—energized, curious, and *so* happy to be chatting with you! 🌟 How about you? I'd love to hear how your day's going—or what's on your heart or mind right now! 💫 (Bonus points if you share a fun fact, a tiny win, or even just your favorite emoji today! 🍦✨)

MCP 工具調用測試

./mcp-client-dev -api-key "sk-xxx" -mcp-uri "http://127.0.0.1:18001/api/mcp" -q "當前時間是什麼"

預期輸出:

LLM response: 
Executing tool: get_current_datetime with args: {}
Executing tool: get_current_datetime with parsed args: map[]
Tool result: "2026-02-02T23:12:54+08:00"
Final response: 現在是 **2026 年 2 月 2 日 晚上 11:12**(北京時間,UTC+8)✨
新年的氣息還暖暖的~你是在規劃什麼特別的事情嗎?😊 我很樂意幫你安排、提醒或一起暢想哦!

最佳實踐與注意事項

在實際項目中使用 Go 語言實現 MCP Server 時,建議考慮以下最佳實踐:

  1. 錯誤處理:確保所有工具函數都有完善的錯誤處理機制,避免因單個工具失敗導致整個服務崩潰。
  2. 性能優化:對於耗時較長的操作(如系統信息採集),考慮添加超時控制和緩存機制。(在MCP官方文檔看到有 Tasks 和 progress 這兩個新的原語, 耗時任務也可以試試這兩個)
  3. 安全性:驗證所有輸入參數,防止惡意輸入導致的安全問題。對於涉及系統操作的工具,需要特別注意權限控制。
  4. 日誌記錄:添加詳細的日誌記錄,便於調試和監控工具的使用情況。
  5. 配置管理:將服務配置(如監聽地址、端口等)提取到配置文件中,提高可維護性。

總結

本文通過一個簡單的代碼示例展示瞭如何使用 Go 語言開發 MCP Server 和 Client。雖然 Go 語言在 MCP 開發方面相比 Python 略顯複雜,但其在併發處理、性能和部署便利性方面的優勢使其成為生產環境的理想選擇。

需要注意的是,本文示例僅涵蓋了 MCP 工具調用的基本功能。在實際業務項目中使用 Go 語言實現 MCP Server 時,還需要深入研究 MCP 協議的其他特性,如 Prompt 管理、身份認證(Auth)、會話管理等高級功能的實現方案。

通過合理的設計和實現,基於 Go 語言的 MCP 服務可以為 AI 應用提供穩定、高效、安全的工具調用能力,充分發揮 Go 語言在系統編程和網絡服務方面的優勢。

參考

  • MCP 官方頁面: https://modelcontextprotocol.io/docs/getting-started/intro
  • MCP 官方 Go SDK: https://github.com/modelcontextprotocol/go-sdk
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.