三層架構設計:前端+業務+AI層的職責劃分

前言

在構建企業級AI Agent系統時,清晰的架構分層至關重要。本文將詳細介紹本項目採用的三層架構設計,以及各層的職責劃分和通信機制。

適合讀者: 架構師、全棧工程師、技術Leader


一、為什麼需要三層架構

1.1 單體架構的問題

傳統單體架構:
Frontend + Backend + AI 混在一起
├── 代碼耦合嚴重
├── 難以獨立擴展
├── 技術棧受限
└── 團隊協作困難

1.2 三層架構的優勢

三層架構:
Frontend ← → Server ← → Agent
├── 職責清晰
├── 獨立部署
├── 技術棧自由
├── 易於擴展
└── 團隊並行開發

二、架構全景圖

2.1 系統架構

┌─────────────────────────────────────────────────────────┐
│                      用户層                              │
│                   (瀏覽器/移動端)                         │
└────────────────────┬────────────────────────────────────┘
                     │ HTTPS
                     ▼
┌─────────────────────────────────────────────────────────┐
│                   Frontend Layer                         │
│              Next.js + React + TailwindCSS               │
│                   (Port: 3000)                           │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ 聊天界面 │  │ 用户認證 │  │ 對話管理 │              │
│  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────┬────────────────────────────────────┘
                     │ HTTP/SSE
                     ▼
┌─────────────────────────────────────────────────────────┐
│                   Server Layer                           │
│          FastAPI + PostgreSQL + Redis                    │
│                   (Port: 8000)                           │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ 業務邏輯 │  │ 用户管理 │  │ 數據存儲 │              │
│  └──────────┘  └──────────┘  └──────────┘              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ JWT認證  │  │ 緩存管理 │  │ 日誌記錄 │              │
│  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────┬────────────────────────────────────┘
                     │ HTTP/SSE
                     ▼
┌─────────────────────────────────────────────────────────┐
│                    Agent Layer                           │
│         LangChain + Ollama + Weaviate                    │
│                   (Port: 8001)                           │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ RAG檢索  │  │ Prompt組裝│  │ 流式生成 │              │
│  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────┬────────────────────────────────────┘
                     │
        ┌────────────┼────────────┐
        ▼            ▼            ▼
   ┌────────┐  ┌────────┐  ┌──────────┐
   │Weaviate│  │ Ollama │  │PostgreSQL│
   │ :8080  │  │:11434  │  │  :5432   │
   └────────┘  └────────┘  └──────────┘

三、Frontend Layer(前端層)

3.1 核心職責

主要職責:

  • 🎨 用户界面 - 提供友好的交互體驗
  • 🔐 身份認證 - Token管理和自動刷新
  • 📡 實時通信 - SSE流式接收AI回覆
  • 💾 狀態管理 - 對話歷史和用户狀態
  • 🎯 路由管理 - 頁面導航和權限控制

3.2 技術棧

// 技術選型
Frontend Stack:
├── Next.js 13.5.6      // React框架,支持SSR
├── React 18.2.0        // UI組件庫
├── TypeScript 5.2.2    // 類型安全
├── TailwindCSS 3.3.5   // 原子化CSS
├── Axios 1.6.0         // HTTP客户端
└── @microsoft/fetch-event-source 2.0.1  // SSE支持

3.3 核心代碼示例

// frontend/app/chat/page.tsx
'use client'

import { useState } from 'react'
import { streamChat } from '@/lib/api-client'

export default function ChatPage() {
  const [messages, setMessages] = useState<Message[]>([])
  const [inputValue, setInputValue] = useState('')
  const [isStreaming, setIsStreaming] = useState(false)
  const [currentAnswer, setCurrentAnswer] = useState('')

  const handleSend = async () => {
    if (!inputValue.trim() || isStreaming) return

    // 添加用户消息
    const userMessage = {
      id: Date.now().toString(),
      role: 'user',
      content: inputValue
    }
    setMessages(prev => [...prev, userMessage])
    setInputValue('')
    setIsStreaming(true)
    setCurrentAnswer('')

    try {
      // 調用Server API,接收流式響應
      await streamChat(userMessage.content, {
        onToken: (token) => {
          setCurrentAnswer(prev => prev + token)
        },
        onDone: (data) => {
          const aiMessage = {
            id: Date.now().toString(),
            role: 'assistant',
            content: data.answer
          }
          setMessages(prev => [...prev, aiMessage])
          setCurrentAnswer('')
          setIsStreaming(false)
        },
        onError: (error) => {
          console.error('錯誤:', error)
          setIsStreaming(false)
        }
      })
    } catch (error) {
      console.error('發送失敗:', error)
      setIsStreaming(false)
    }
  }

  return (
    <div className="flex flex-col h-screen">
      {/* 消息列表 */}
      <div className="flex-1 overflow-y-auto p-4">
        {messages.map((msg) => (
          <MessageBubble key={msg.id} message={msg} />
        ))}
        
        {/* 實時流式消息 */}
        {currentAnswer && (
          <div className="bg-gray-100 rounded-lg p-4">
            {currentAnswer}
            <span className="animate-pulse">▊</span>
          </div>
        )}
      </div>

      {/* 輸入框 */}
      <div className="p-4 border-t">
        <input
          value={inputValue}
          onChange={(e) => setInputValue(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter' && !e.shiftKey) {
              e.preventDefault()
              handleSend()
            }
          }}
          placeholder="輸入問題..."
          disabled={isStreaming}
        />
        <button onClick={handleSend} disabled={!inputValue.trim() || isStreaming}>
          {isStreaming ? '生成中...' : '發送'}
        </button>
      </div>
    </div>
  )
}

四、Server Layer(業務層)

4.1 核心職責

主要職責:

  • 🔒 認證授權 - JWT雙Token機制
  • 💼 業務邏輯 - 用户管理、對話管理
  • 🗄️ 數據持久化 - PostgreSQL存儲
  • 緩存管理 - Redis加速
  • 🔗 服務編排 - 協調Frontend和Agent
  • 📊 日誌監控 - Loguru結構化日誌

4.2 技術棧

# 技術選型
Server Stack:
├── FastAPI 0.100.0         # 現代化異步Web框架
├── Uvicorn 0.31.1          # ASGI服務器
├── SQLAlchemy 2.0.18+      # 異步ORM
├── PostgreSQL              # 關係型數據庫
├── Redis 4.6.0+            # 緩存和會話
├── Alembic 1.11.1+         # 數據庫遷移
├── Pydantic 2.0.0+         # 數據驗證
├── Python-Jose 3.3.0+      # JWT認證
├── Passlib[bcrypt] 1.7.4+  # 密碼加密
└── Loguru 0.7.0+           # 結構化日誌

4.3 核心代碼示例

# server/api/agent.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
import httpx
import json

router = APIRouter(prefix="/api/agent", tags=["agent"])

AGENT_URL = "http://localhost:8001"

@router.post("/chat/stream")
async def agent_stream(
    question: str,
    current_user = Depends(get_current_user)
):
    """
    代理Agent流式接口
    
    Server作為中間層:
    1. 驗證用户身份(JWT)
    2. 記錄請求日誌
    3. 轉發到Agent服務
    4. 保存對話歷史
    """
    
    # 記錄請求
    logger.info(f"用户 {current_user.username} 發起問答: {question}")
    
    async def event_generator():
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                f"{AGENT_URL}/stream",
                json={"question": question},
                timeout=60.0
            ) as response:
                async for line in response.aiter_lines():
                    if line:
                        yield f"{line}\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )


# server/api/auth.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter(prefix="/api/auth", tags=["auth"])

@router.post("/login")
async def login(
    username: str,
    password: str,
    db: AsyncSession = Depends(get_db)
):
    """用户登錄"""
    # 驗證用户
    user = await authenticate_user(db, username, password)
    if not user:
        raise HTTPException(status_code=401, detail="用户名或密碼錯誤")
    
    # 生成Token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=timedelta(minutes=30)
    )
    refresh_token = create_refresh_token(
        data={"sub": user.username},
        expires_delta=timedelta(days=7)
    )
    
    return {
        "code": 0,
        "msg": "登錄成功",
        "data": {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "bearer"
        }
    }


# server/api/conversations.py
@router.post("")
async def create_conversation(
    title: str,
    current_user = Depends(get_current_user),
    db: AsyncSession = Depends(get_db)
):
    """創建新對話"""
    conversation = Conversation(
        user_id=current_user.id,
        title=title
    )
    db.add(conversation)
    await db.commit()
    await db.refresh(conversation)
    
    return {
        "code": 0,
        "msg": "創建成功",
        "data": conversation
    }

五、Agent Layer(AI層)

5.1 核心職責

主要職責:

  • 🔍 向量檢索 - Weaviate相似度搜索
  • 🧠 AI推理 - Ollama LLM生成
  • 📝 Prompt工程 - 動態組裝上下文
  • 🌊 流式生成 - 實時返回Token
  • 📊 結果格式化 - 統一響應格式

5.2 技術棧

# 技術選型
Agent Stack:
├── LangChain 0.1.0+           # AI應用開發框架
├── LangChain-Community 0.0.10+ # 社區集成
├── Ollama 0.1.0+              # 本地LLM服務
├── Weaviate 1.27.1            # 向量數據庫
├── FastAPI 0.104.0+           # HTTP服務框架
└── Pandas 2.0.0+              # 數據處理

AI模型:
├── llama3.2:latest     # 對話生成模型
└── nomic-embed-text    # 文本向量化模型

5.3 核心代碼示例

# agent/http_service.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json

from agent import ServiceTicketAgent

app = FastAPI(title="Service Ticket Agent")

class ChatRequest(BaseModel):
    question: str

agent_instance = None

def get_agent():
    global agent_instance
    if agent_instance is None:
        agent_instance = ServiceTicketAgent()
    return agent_instance


@app.post("/stream")
async def stream_chat(request: ChatRequest):
    """
    流式對話接口(SSE)
    
    返回SSE格式的流式數據:
    - event: thinking (思考狀態)
    - event: sources (檢索來源)
    - event: token (逐個token)
    - event: done (完成)
    - event: error (錯誤)
    """
    async def event_generator():
        agent = get_agent()
        
        try:
            async for event in agent.ask_stream(request.question):
                event_type = event.get("type")
                event_data = event.get("data", {})
                
                # 格式化為SSE
                yield f"event: {event_type}\n"
                yield f"data: {json.dumps(event_data, ensure_ascii=False)}\n\n"
                
                if event_type in ["done", "error"]:
                    break
                    
        except Exception as e:
            error_event = {"code": 500, "msg": f"Agent錯誤: {str(e)}"}
            yield f"event: error\n"
            yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )


# agent/ticket_agent.py
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

class ServiceTicketAgent:
    def _setup_qa_chain(self):
        """設置問答鏈(LCEL)"""
        
        def retrieve_and_format(question):
            docs = self._search_similar_documents(question)
            return "\n\n".join(doc.page_content for doc in docs)
        
        # LCEL鏈式調用
        self.qa_chain = (
            {
                "context": retrieve_and_format,
                "question": RunnablePassthrough()
            }
            | self.prompt_template
            | self.llm
            | StrOutputParser()
        )
        
        return self.qa_chain
    
    async def ask_stream(self, question: str):
        """流式問答"""
        # 1. 檢索相關工單
        yield {
            "type": "thinking",
            "data": {"status": "retrieving", "message": "正在檢索相關工單..."}
        }
        
        source_docs = self._search_similar_documents(question)
        
        # 2. 返回檢索結果
        yield {
            "type": "sources",
            "data": {"sources": [...], "count": len(source_docs)}
        }
        
        # 3. 流式生成答案
        qa_chain = self._setup_qa_chain()
        
        full_answer = ""
        async for chunk in qa_chain.astream(question):
            token = str(chunk)
            full_answer += token
            
            yield {
                "type": "token",
                "data": {"token": token}
            }
        
        # 4. 完成
        yield {
            "type": "done",
            "data": {"answer": full_answer, "metadata": {...}}
        }

六、層間通信協議

6.1 Frontend ↔ Server

// HTTP請求
POST /api/auth/login
Content-Type: application/json

{
  "username": "user",
  "password": "password"
}

// HTTP響應
{
  "code": 0,
  "msg": "success",
  "data": {
    "access_token": "eyJ...",
    "refresh_token": "eyJ..."
  }
}

// SSE流式響應
POST /api/agent/chat/stream
Authorization: Bearer eyJ...

event: token
data: {"token": "根據"}

event: token
data: {"token": "歷史"}

event: done
data: {"answer": "...", "metadata": {...}}

6.2 Server ↔ Agent

# HTTP代理
POST http://localhost:8001/stream
Content-Type: application/json

{
  "question": "物流信息5天沒更新,怎麼處理?"
}

# SSE響應
event: thinking
data: {"status": "retrieving", "message": "正在檢索相關工單..."}

event: sources
data: {"sources": [...], "count": 5}

event: token
data: {"token": "根據"}

event: done
data: {"answer": "...", "metadata": {...}}

七、獨立部署與擴展

7.1 獨立部署

# docker-compose.yml
version: '3.8'

services:
  frontend:
    build: ./frontend
    ports:
      - "3000:3000"
    depends_on:
      - server

  server:
    build: ./server
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - redis
      - agent

  agent:
    build: ./agent
    ports:
      - "8001:8001"
    depends_on:
      - ollama
      - weaviate

  postgres:
    image: postgres:15
    ports:
      - "5432:5432"

  redis:
    image: redis:7
    ports:
      - "6379:6379"

  weaviate:
    image: semitechnologies/weaviate:1.27.1
    ports:
      - "8080:8080"

  ollama:
    image: ollama/ollama:latest
    ports:
      - "11434:11434"

7.2 水平擴展

# 擴展Agent層
services:
  agent:
    build: ./agent
    deploy:
      replicas: 3  # 3個Agent實例
    ports:
      - "8001-8003:8001"

  # 負載均衡
  nginx:
    image: nginx:latest
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf

八、團隊協作

8.1 職責分工

Frontend團隊:
├── UI/UX設計
├── React組件開發
├── 狀態管理
└── 前端性能優化

Server團隊:
├── 業務邏輯開發
├── 數據庫設計
├── API接口設計
└── 認證授權

Agent團隊:
├── AI模型選型
├── Prompt工程
├── RAG優化
└── 向量數據庫管理

8.2 並行開發

階段1: 接口定義(1天)
├── Frontend定義需要的API
├── Server定義Agent接口
└── 三方達成一致

階段2: 並行開發(2周)
├── Frontend: Mock數據開發UI
├── Server: 實現業務邏輯
└── Agent: 實現AI功能

階段3: 聯調測試(3天)
├── Frontend + Server聯調
├── Server + Agent聯調
└── 端到端測試

九、總結

三層架構的核心優勢:

職責清晰 - 每層專注自己的領域
獨立部署 - 可以單獨擴展和升級
技術自由 - 每層選擇最適合的技術棧
團隊協作 - 多團隊並行開發
易於維護 - 降低系統複雜度

下一篇預告: 《技術選型背後的思考:為什麼選擇Next.js+FastAPI+LangChain》

我們將深入分析每個技術棧的選型理由和替代方案對比。


作者簡介: 資深開發者,創業者。專注於視頻通訊技術領域。國內首本Flutter著作《Flutter技術入門與實戰》作者,另著有《Dart語言實戰》及《WebRTC音視頻開發》等書籍。多年從事視頻會議、遠程教育等技術研發,對於Android、iOS以及跨平台開發技術有比較深入的研究和應用,作為主要程序員開發了多個應用項目,涉及醫療、交通、銀行等領域。

學習資料:

  • 項目地址
  • 作者GitHub

歡迎交流: 如有問題歡迎在評論區討論 🚀