SSE vs WebSocket:實時AI對話的最佳實踐

前言

在構建AI對話系統時,選擇合適的實時通信協議至關重要。本文將深入對比SSE和WebSocket,並分享我們在生產環境中的實踐經驗。

適合讀者: 全棧開發者、後端工程師、架構師


一、實時通信的需求

1.1 AI對話的特點

用户發送問題
   ↓
AI開始思考(需要實時反饋)
   ↓
檢索知識庫(需要顯示進度)
   ↓
逐字生成答案(打字機效果)
   ↓
完成回答

1.2 技術要求

✅ 低延遲 - 毫秒級響應
✅ 單向推送 - 服務器→客户端
✅ 流式傳輸 - 逐Token返回
✅ 自動重連 - 網絡斷開後恢復
✅ 簡單易用 - 降低開發成本

二、SSE vs WebSocket 深度對比

2.1 技術對比表

特性

SSE

WebSocket

通信方向

單向(服務器→客户端)

雙向

協議

HTTP/HTTPS

WS/WSS

瀏覽器支持

所有現代瀏覽器

所有現代瀏覽器

自動重連

✅ 內置

❌ 需要手動實現

消息格式

文本(UTF-8)

文本/二進制

代理友好

✅ 標準HTTP

❌ 需要特殊配置

實現複雜度

⭐⭐ 簡單

⭐⭐⭐⭐ 複雜

適用場景

服務器推送、實時通知

聊天、遊戲、協作

2.2 連接建立過程

SSE連接建立:

Client                          Server
  |                               |
  |--- GET /stream HTTP/1.1 ---→ |
  |    Accept: text/event-stream  |
  |                               |
  |←-- HTTP/1.1 200 OK ----------|
  |    Content-Type: text/event-stream
  |    Cache-Control: no-cache   |
  |                               |
  |←------ data: hello ----------|
  |←------ data: world ----------|
  |                               |

WebSocket連接建立:

Client                          Server
  |                               |
  |--- GET /ws HTTP/1.1 --------→|
  |    Upgrade: websocket         |
  |    Connection: Upgrade        |
  |                               |
  |←-- HTTP/1.1 101 Switching ---|
  |    Upgrade: websocket         |
  |    Connection: Upgrade        |
  |                               |
  |←====== WebSocket Frame ======|
  |====== WebSocket Frame ======→|
  |                               |

三、SSE實現詳解

3.1 服務端實現(FastAPI)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(question: str):
    """SSE流式對話接口"""
    
    async def event_generator():
        try:
            # 1. 思考狀態
            yield format_sse_message(
                event="thinking",
                data={"status": "retrieving", "message": "正在檢索知識庫..."}
            )
            await asyncio.sleep(0.5)
            
            # 2. 檢索結果
            docs = await search_knowledge_base(question)
            yield format_sse_message(
                event="sources",
                data={"count": len(docs), "sources": [doc.metadata for doc in docs]}
            )
            
            # 3. 流式生成答案
            async for token in llm.astream(question):
                yield format_sse_message(
                    event="token",
                    data={"token": token}
                )
            
            # 4. 完成
            yield format_sse_message(
                event="done",
                data={"status": "completed"}
            )
            
        except Exception as e:
            yield format_sse_message(
                event="error",
                data={"error": str(e)}
            )
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用Nginx緩衝
        }
    )

def format_sse_message(event: str, data: dict) -> str:
    """格式化SSE消息"""
    return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"

3.2 客户端實現(TypeScript)

// services/chat.ts
import { fetchEventSource } from '@microsoft/fetch-event-source';

interface StreamCallbacks {
  onThinking?: (data: any) => void;
  onSources?: (data: any) => void;
  onToken?: (token: string) => void;
  onDone?: () => void;
  onError?: (error: string) => void;
}

export async function sendMessageStream(
  conversationId: string,
  message: string,
  callbacks: StreamCallbacks
) {
  const ctrl = new AbortController();
  
  try {
    await fetchEventSource(`${API_URL}/chat/stream`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${getAccessToken()}`
      },
      body: JSON.stringify({
        conversation_id: conversationId,
        message: message
      }),
      signal: ctrl.signal,
      
      // 處理不同類型的事件
      onmessage(event) {
        const data = JSON.parse(event.data);
        
        switch (event.event) {
          case 'thinking':
            callbacks.onThinking?.(data);
            break;
          case 'sources':
            callbacks.onSources?.(data);
            break;
          case 'token':
            callbacks.onToken?.(data.token);
            break;
          case 'done':
            callbacks.onDone?.();
            break;
          case 'error':
            callbacks.onError?.(data.error);
            break;
        }
      },
      
      // 錯誤處理
      onerror(err) {
        console.error('SSE Error:', err);
        ctrl.abort();
        throw err;
      },
      
      // 自動重連
      openWhenHidden: true
    });
  } catch (error) {
    console.error('Stream error:', error);
    throw error;
  }
}

3.3 React組件使用

// components/ChatInterface.tsx
'use client'

import { useState } from 'react'
import { sendMessageStream } from '@/services/chat'

export default function ChatInterface() {
  const [messages, setMessages] = useState<Message[]>([])
  const [isThinking, setIsThinking] = useState(false)
  const [currentAssistantMessage, setCurrentAssistantMessage] = useState('')
  
  const handleSend = async (text: string) => {
    // 添加用户消息
    setMessages(prev => [...prev, {
      role: 'user',
      content: text
    }])
    
    // 重置助手消息
    setCurrentAssistantMessage('')
    setIsThinking(true)
    
    try {
      await sendMessageStream(conversationId, text, {
        onThinking: (data) => {
          console.log('思考中:', data.message)
        },
        
        onSources: (data) => {
          console.log('檢索到', data.count, '條相關文檔')
          setIsThinking(false)
        },
        
        onToken: (token) => {
          // 實時追加Token
          setCurrentAssistantMessage(prev => prev + token)
        },
        
        onDone: () => {
          // 完成,保存消息
          setMessages(prev => [...prev, {
            role: 'assistant',
            content: currentAssistantMessage
          }])
          setCurrentAssistantMessage('')
        },
        
        onError: (error) => {
          console.error('錯誤:', error)
          setIsThinking(false)
        }
      })
    } catch (error) {
      console.error('發送失敗:', error)
    }
  }
  
  return (
    <div className="chat-container">
      {/* 消息列表 */}
      {messages.map((msg, i) => (
        <MessageBubble key={i} message={msg} />
      ))}
      
      {/* 實時顯示助手回覆 */}
      {currentAssistantMessage && (
        <MessageBubble 
          message={{
            role: 'assistant',
            content: currentAssistantMessage
          }}
          isStreaming={true}
        />
      )}
      
      {/* 思考指示器 */}
      {isThinking && <ThinkingIndicator />}
      
      {/* 輸入框 */}
      <ChatInput onSend={handleSend} />
    </div>
  )
}

四、WebSocket實現詳解

4.1 服務端實現(FastAPI)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict
import json

app = FastAPI()

# 連接管理器
class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
    
    async def connect(self, websocket: WebSocket, client_id: str):
        await websocket.accept()
        self.active_connections[client_id] = websocket
    
    def disconnect(self, client_id: str):
        if client_id in self.active_connections:
            del self.active_connections[client_id]
    
    async def send_message(self, client_id: str, message: dict):
        if client_id in self.active_connections:
            await self.active_connections[client_id].send_json(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket, client_id)
    
    try:
        while True:
            # 接收客户端消息
            data = await websocket.receive_json()
            
            if data['type'] == 'chat':
                # 發送思考狀態
                await manager.send_message(client_id, {
                    'type': 'thinking',
                    'data': {'status': 'retrieving'}
                })
                
                # 流式生成答案
                async for token in llm.astream(data['message']):
                    await manager.send_message(client_id, {
                        'type': 'token',
                        'data': {'token': token}
                    })
                
                # 完成
                await manager.send_message(client_id, {
                    'type': 'done',
                    'data': {'status': 'completed'}
                })
                
    except WebSocketDisconnect:
        manager.disconnect(client_id)

4.2 客户端實現(TypeScript)

// services/websocket.ts
export class ChatWebSocket {
  private ws: WebSocket | null = null
  private reconnectAttempts = 0
  private maxReconnectAttempts = 5
  
  constructor(
    private url: string,
    private callbacks: {
      onMessage: (data: any) => void
      onError: (error: any) => void
      onClose: () => void
    }
  ) {}
  
  connect() {
    this.ws = new WebSocket(this.url)
    
    this.ws.onopen = () => {
      console.log('WebSocket connected')
      this.reconnectAttempts = 0
    }
    
    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data)
      this.callbacks.onMessage(data)
    }
    
    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error)
      this.callbacks.onError(error)
    }
    
    this.ws.onclose = () => {
      console.log('WebSocket closed')
      this.callbacks.onClose()
      
      // 自動重連
      if (this.reconnectAttempts < this.maxReconnectAttempts) {
        setTimeout(() => {
          this.reconnectAttempts++
          this.connect()
        }, 1000 * this.reconnectAttempts)
      }
    }
  }
  
  send(message: any) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message))
    }
  }
  
  close() {
    this.ws?.close()
  }
}

五、為什麼我們選擇SSE?

5.1 AI對話的特點分析

AI對話的通信模式:
- 用户發送問題(HTTP POST)
- AI流式返回答案(服務器推送)
- 不需要客户端主動推送數據

結論:單向通信,SSE完全滿足需求

5.2 SSE的優勢

1. 自動重連

// SSE自動重連(內置)
fetchEventSource(url, {
  openWhenHidden: true  // 頁面隱藏時也保持連接
})

// WebSocket需要手動實現
ws.onclose = () => {
  setTimeout(() => reconnect(), 1000)
}

2. 代理友好

# Nginx配置SSE(簡單)
location /api/ {
    proxy_pass http://backend;
    proxy_buffering off;  # 關鍵
}

# Nginx配置WebSocket(複雜)
location /ws/ {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}

3. 實現簡單

# SSE實現(10行代碼)
async def event_generator():
    for token in tokens:
        yield f"data: {token}\n\n"

return StreamingResponse(event_generator())

# WebSocket實現(50+行代碼)
# 需要連接管理、心跳檢測、錯誤處理等

5.3 性能對比

測試場景:1000個併發連接,每秒推送100條消息

SSE:
- 內存佔用: 500MB
- CPU佔用: 20%
- 延遲: 10-20ms

WebSocket:
- 內存佔用: 800MB
- CPU佔用: 35%
- 延遲: 5-10ms

結論:SSE性能足夠,且資源佔用更低

六、生產環境最佳實踐

6.1 Nginx配置

server {
    listen 80;
    server_name api.example.com;
    
    # SSE配置
    location /api/chat/stream {
        proxy_pass http://backend:8000;
        
        # 關鍵配置
        proxy_buffering off;                    # 禁用緩衝
        proxy_cache off;                        # 禁用緩存
        proxy_set_header Connection '';         # 保持連接
        proxy_http_version 1.1;                 # HTTP/1.1
        chunked_transfer_encoding on;           # 分塊傳輸
        
        # 超時配置
        proxy_read_timeout 3600s;               # 1小時超時
        proxy_connect_timeout 60s;
        
        # 頭部設置
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Accel-Buffering no;  # 禁用加速緩衝
    }
}

6.2 錯誤處理

// 完善的錯誤處理
export async function sendMessageStream(
  message: string,
  callbacks: StreamCallbacks
) {
  const ctrl = new AbortController()
  let retryCount = 0
  const maxRetries = 3
  
  const attemptStream = async () => {
    try {
      await fetchEventSource(url, {
        signal: ctrl.signal,
        
        async onopen(response) {
          if (response.ok) {
            retryCount = 0  // 重置重試計數
            return
          }
          
          // 處理HTTP錯誤
          if (response.status >= 400 && response.status < 500) {
            throw new Error(`HTTP ${response.status}`)
          }
        },
        
        onmessage(event) {
          // 處理消息
        },
        
        onerror(err) {
          // 重試邏輯
          if (retryCount < maxRetries) {
            retryCount++
            console.log(`重試 ${retryCount}/${maxRetries}`)
            return 1000 * retryCount  // 返回重試延遲
          }
          
          // 超過重試次數,拋出錯誤
          throw err
        }
      })
    } catch (error) {
      callbacks.onError?.(error)
    }
  }
  
  await attemptStream()
  
  // 返回取消函數
  return () => ctrl.abort()
}

6.3 心跳檢測

# 服務端定期發送心跳
async def event_generator():
    last_heartbeat = time.time()
    
    async for token in llm.astream(question):
        yield f"data: {token}\n\n"
        
        # 每30秒發送心跳
        if time.time() - last_heartbeat > 30:
            yield ": heartbeat\n\n"  # 註釋行,客户端會忽略
            last_heartbeat = time.time()
// 客户端心跳檢測
let lastMessageTime = Date.now()

fetchEventSource(url, {
  onmessage(event) {
    lastMessageTime = Date.now()
    // 處理消息
  }
})

// 檢測超時
setInterval(() => {
  if (Date.now() - lastMessageTime > 60000) {
    console.warn('連接可能已斷開')
    // 重新連接
  }
}, 10000)

七、性能優化

7.1 連接池管理

# 限制併發連接數
from fastapi import HTTPException
import asyncio

active_connections = 0
MAX_CONNECTIONS = 1000

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    global active_connections
    
    if active_connections >= MAX_CONNECTIONS:
        raise HTTPException(
            status_code=503,
            detail="服務器繁忙,請稍後重試"
        )
    
    active_connections += 1
    
    try:
        async def event_generator():
            # 生成事件
            pass
        
        return StreamingResponse(event_generator())
    finally:
        active_connections -= 1

7.2 緩衝優化

# 批量發送Token
async def event_generator():
    buffer = []
    buffer_size = 5  # 每5個Token發送一次
    
    async for token in llm.astream(question):
        buffer.append(token)
        
        if len(buffer) >= buffer_size:
            # 批量發送
            yield format_sse_message("token", {
                "tokens": buffer
            })
            buffer = []
    
    # 發送剩餘Token
    if buffer:
        yield format_sse_message("token", {
            "tokens": buffer
        })

八、踩坑經驗

8.1 Nginx緩衝問題

問題: SSE消息不實時,延遲很大

# 錯誤配置
location /api/ {
    proxy_pass http://backend;
    # 默認開啓緩衝
}

解決: 禁用緩衝

location /api/ {
    proxy_pass http://backend;
    proxy_buffering off;
    proxy_set_header X-Accel-Buffering no;
}

8.2 CORS問題

問題: SSE請求被CORS攔截

# 錯誤:未配置CORS
app = FastAPI()

解決: 正確配置CORS

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

8.3 內存泄漏

問題: 長時間運行後內存佔用越來越高

# 錯誤:未清理資源
async def event_generator():
    async for chunk in llm.astream(question):
        yield chunk
    # 未清理LLM資源

解決: 及時清理資源

async def event_generator():
    try:
        async for chunk in llm.astream(question):
            yield chunk
    finally:
        # 清理資源
        await llm.cleanup()

九、總結

9.1 選擇建議

選擇SSE的場景:
✅ 服務器→客户端單向推送
✅ 實時通知、日誌流
✅ AI對話、流式生成
✅ 需要自動重連
✅ 需要簡單實現

選擇WebSocket的場景:
✅ 雙向實時通信
✅ 在線遊戲
✅ 協作編輯
✅ 視頻會議
✅ 需要二進制傳輸

9.2 核心要點

SSE更簡單 - 基於HTTP,實現簡單
自動重連 - 內置重連機制
代理友好 - 無需特殊配置
性能足夠 - 滿足AI對話需求
成本更低 - 開發和維護成本低

下一篇預告: 《本地化部署的優勢:Ollama + Weaviate保護數據隱私》


作者簡介: 資深開發者,創業者。專注於視頻通訊技術領域。國內首本Flutter著作《Flutter技術入門與實戰》作者,另著有《Dart語言實戰》及《WebRTC音視頻開發》等書籍。多年從事視頻會議、遠程教育等技術研發,對於Android、iOS以及跨平台開發技術有比較深入的研究和應用,作為主要程序員開發了多個應用項目,涉及醫療、交通、銀行等領域。

學習資料:

  • 項目地址
  • 作者GitHub

歡迎交流: 如有問題歡迎在評論區討論 🚀