💎 本文價值提示

你將獲得什麼?

  1. 從零構建:不再是寫腳本,而是構建一個可擴展的微服務架構。
  2. 企業級思維:掌握限流、熔斷、流式傳輸等生產環境必備技能。
  3. 代碼即資產:一套可直接複用的 LLM Gateway 核心代碼骨架。
  4. 轉型視角:看懂大數據高吞吐思維如何映射到 AI 高併發架構。

👋 大家好,我是你們的老朋友,那個正在從大數據轉型 AI 架構的“老司機”。

在前三篇文章中,我們已經完成了 Python 的“脱胎換骨”:

  • 第一篇:我們用 Type Hints 和 Pydantic 戒掉了“弱類型”的隨意,像寫 Java 一樣嚴謹;
  • 第二篇:我們用 Asyncio 征服了高併發 I/O,不再讓 CPU 傻等;
  • 第三篇:我們用 Generator 和 Decorator 搞定了流式輸出和 AOP 切面編程。

今天,是時候把這些散落的珍珠串成項鍊了!我們將進入 Phase 4:工程化落地與架構設計

我們要一起做一個 Capstone Project(畢業設計) —— **企業級 LLM Gateway(大模型網關)**。


🏗️ 為什麼要造一個 LLM Gateway?

在企業裏,直接讓業務代碼調用 OpenAI 或 DeepSeek 的 API 是非常危險的。這就好比讓公司的每輛車都自己去海關報關,效率低且無法管控。

我們需要一個 “海關總署” (Gateway)

  1. 統一計費:誰用了多少 Token,得算清楚。
  2. 流量控制:防止某個業務線把 API Rate Limit 刷爆。
  3. 協議轉換:前端要 SSE 流式,後端要統一接口。
  4. 安全審計:敏感詞過濾,防止數據泄露。

技術棧選型

  • 框架FastAPI (Python 界的最強黑馬,性能直逼 Go)。
  • 校驗Pydantic V2 (數據契約的守護神)。
  • 併發Asyncio (高併發的核心引擎)。

🧩 架構藍圖:數據是如何流動的?

在我們開始寫代碼之前,先像設計 Flink 拓撲圖一樣,畫出我們的架構流向。

image.png


🛠️ 第一步:立規矩 —— 定義數據契約 (Pydantic)

做大數據出身的我們,最怕數據格式亂七八糟。在 AI 架構中,Prompt 就是 SQL,Schema 就是 Table Schema

我們需要定義“輸入”和“輸出”的嚴格標準。

from pydantic import BaseModel, Field, field_validator
from typing import List, Optional, Literal

# 1. 定義消息體
class Message(BaseModel):
    role: Literal["system", "user", "assistant"]
    content: str

# 2. 定義請求契約 (Request Contract)
class ChatCompletionRequest(BaseModel):
    model: str = Field(..., description="模型名稱,如 gpt-4, deepseek-chat")
    messages: List[Message]
    temperature: float = Field(0.7, ge=0.0, le=2.0)
    stream: bool = False

    # 💡 亮點:自定義校驗,防止惡意注入過長文本
    @field_validator('messages')
    def validate_message_length(cls, v):
        total_len = sum(len(m.content) for m in v)
        if total_len > 10000:
            raise ValueError("Prompt 內容過長,請精簡後重試")
        return v

# 3. 定義響應契約 (Response Contract)
# 這裏我們模擬 OpenAI 的標準返回格式
class ChatCompletionResponse(BaseModel):
    id: str
    object: str = "chat.completion"
    created: int
    choices: List[dict]

👨‍💻 架構師旁白: 這不僅僅是代碼,這是協議。有了它,前端開發和後端開發就有了“法律依據”,不再需要口頭對齊字段。


🚀 第二步:高併發引擎 —— 異步請求 (Asyncio)

LLM 的響應通常很慢(幾秒到幾十秒)。如果用傳統的同步代碼(像 JDBC 那樣),一個請求卡住,整個服務就掛了。

我們要用 Asyncio + httpx,實現非阻塞調用。這就像 Node.js 的事件循環,或者 Netty 的 IO 線程。

import httpx
import os
from fastapi import HTTPException

# 模擬從環境變量獲取 Key
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_BASE_URL = "https://api.deepseek.com/v1"

async def call_llm_api(request: ChatCompletionRequest):
    """
    異步調用上游大模型接口
    """
    headers = {
        "Authorization": f"Bearer {LLM_API_KEY}",
        "Content-Type": "application/json"
    }
    
    # 💡 亮點:使用異步上下文管理器,自動釋放連接
    async with httpx.AsyncClient(timeout=60.0) as client:
        try:
            response = await client.post(
                f"{LLM_BASE_URL}/chat/completions",
                json=request.model_dump(), # Pydantic V2 序列化
                headers=headers
            )
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as e:
            # 記錄日誌...
            raise HTTPException(status_code=e.response.status_code, detail="上游服務報錯")

🌊 第三步:極致體驗 —— 流式透傳 (Generator)

用户不想盯着空白屏幕等 10 秒。他們想要 ChatGPT 那種“打字機”效果。 這就需要用到 Python 的 Generator (生成器) ,配合 FastAPI 的 StreamingResponse

這在大數據領域,就是 Spark StreamingFlink DataStream —— 數據來一條,處理一條,發走一條。

import json
from typing import AsyncGenerator

async def stream_llm_api(request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
    """
    流式生成器:像水管一樣接通上游和下游
    """
    headers = {
        "Authorization": f"Bearer {LLM_API_KEY}",
        "Content-Type": "application/json"
    }
    
    async with httpx.AsyncClient() as client:
        # 開啓流式請求
        async with client.stream(
            "POST", 
            f"{LLM_BASE_URL}/chat/completions", 
            json=request.model_dump(), 
            headers=headers
        ) as response:
            
            # 💡 亮點:逐行讀取上游數據,並實時 yield 給前端
            async for line in response.aiter_lines():
                if line:
                    # 這裏可以做數據清洗、計費統計等中間處理
                    yield f"{line}\n\n" # 符合 SSE 格式

🛡️ 第四步:穩定性保障 —— 熔斷器 (Decorator)

如果上游 DeepSeek 掛了,或者網絡抖動,我們不能讓請求堆積把自己的網關壓垮。我們需要一個 熔斷器 (Circuit Breaker)

利用 Python 的 Decorator (裝飾器) ,我們可以優雅地實現 AOP(面向切面編程)。

import time
from functools import wraps

# 簡單的熔斷器狀態
CIRCUIT_OPEN = False
ERROR_COUNT = 0
LAST_ERROR_TIME = 0

def circuit_breaker(threshold=5, recovery_time=60):
    """
    裝飾器:當錯誤次數超過 threshold 時,暫停服務 recovery_time 秒
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            global CIRCUIT_OPEN, ERROR_COUNT, LAST_ERROR_TIME
            
            # 1. 檢查是否熔斷
            if CIRCUIT_OPEN:
                if time.time() - LAST_ERROR_TIME > recovery_time:
                    # 嘗試恢復
                    CIRCUIT_OPEN = False
                    ERROR_COUNT = 0
                else:
                    raise HTTPException(status_code=503, detail="服務熔斷中,請稍後重試")
            
            # 2. 嘗試執行
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                ERROR_COUNT += 1
                LAST_ERROR_TIME = time.time()
                if ERROR_COUNT >= threshold:
                    CIRCUIT_OPEN = True
                    print("⚠️ 觸發熔斷保護!")
                raise e
        return wrapper
    return decorator

🏰 第五步:集大成 —— FastAPI 入口

最後,我們將所有組件組裝到 main.py 中。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI(title="Enterprise LLM Gateway")

@app.post("/v1/chat/completions")
@circuit_breaker() # 掛載熔斷器
async def chat_completions(request: ChatCompletionRequest):
    """
    網關核心接口
    """
    # 1. 打印日誌 (實際項目中應使用 logging 模塊)
    print(f"收到請求: {request.model} - {len(request.messages)} msgs")
    
    # 2. 判斷是否流式
    if request.stream:
        # 返回流式響應 (SSE)
        return StreamingResponse(
            stream_llm_api(request), 
            media_type="text/event-stream"
        )
    else:
        # 返回普通 JSON
        return await call_llm_api(request)

# 啓動命令: uvicorn main:app --reload

📝 總結與回顧

恭喜你!你已經從一個寫 ETL 腳本的大數據工程師,成功蜕變為能手寫 高併發微服務 的 AI 架構師。

讓我們回顧一下這個 LLM Gateway 涉及的核心能力:

  1. Pydantic: 你的“數據安檢員”,確保進來的數據都是合規的。
  2. Asyncio: 你的“交通指揮官”,讓單線程也能處理成千上萬的併發請求。
  3. Generator: 你的“流水線”,實現數據的實時流轉,降低內存壓力。
  4. Decorator: 你的“保鏢”,在不侵入業務邏輯的情況下,提供熔斷和重試保護。

🧠 本文思維導圖

image.png


🗣️ 互動話題

你在轉型 AI 開發的過程中,遇到的最大“坑”是什麼?

  • A. 習慣了 Java 的強類型,受不了 Python 的動態類型?
  • B. 搞不懂 async/await,代碼經常卡死?
  • C. 流式輸出 (Streaming) 總是斷斷續續?
  • D. 依賴管理太亂,pipconda 打架?

👇 在評論區告訴我你的答案,或者輸入關鍵詞【網關源碼】,獲取本文完整的項目代碼包!