SSE vs WebSocket:實時AI對話的最佳實踐
前言
在構建AI對話系統時,選擇合適的實時通信協議至關重要。本文將深入對比SSE和WebSocket,並分享我們在生產環境中的實踐經驗。
適合讀者: 全棧開發者、後端工程師、架構師
一、實時通信的需求
1.1 AI對話的特點
用户發送問題
↓
AI開始思考(需要實時反饋)
↓
檢索知識庫(需要顯示進度)
↓
逐字生成答案(打字機效果)
↓
完成回答
1.2 技術要求
✅ 低延遲 - 毫秒級響應
✅ 單向推送 - 服務器→客户端
✅ 流式傳輸 - 逐Token返回
✅ 自動重連 - 網絡斷開後恢復
✅ 簡單易用 - 降低開發成本
二、SSE vs WebSocket 深度對比
2.1 技術對比表
|
特性
|
SSE
|
WebSocket
|
|
通信方向 |
單向(服務器→客户端)
|
雙向
|
|
協議 |
HTTP/HTTPS
|
WS/WSS
|
|
瀏覽器支持 |
所有現代瀏覽器
|
所有現代瀏覽器
|
|
自動重連 |
✅ 內置
|
❌ 需要手動實現
|
|
消息格式 |
文本(UTF-8)
|
文本/二進制
|
|
代理友好 |
✅ 標準HTTP
|
❌ 需要特殊配置
|
|
實現複雜度 |
⭐⭐ 簡單
|
⭐⭐⭐⭐ 複雜
|
|
適用場景 |
服務器推送、實時通知
|
聊天、遊戲、協作
|
2.2 連接建立過程
SSE連接建立:
Client Server
| |
|--- GET /stream HTTP/1.1 ---→ |
| Accept: text/event-stream |
| |
|←-- HTTP/1.1 200 OK ----------|
| Content-Type: text/event-stream
| Cache-Control: no-cache |
| |
|←------ data: hello ----------|
|←------ data: world ----------|
| |
WebSocket連接建立:
Client Server
| |
|--- GET /ws HTTP/1.1 --------→|
| Upgrade: websocket |
| Connection: Upgrade |
| |
|←-- HTTP/1.1 101 Switching ---|
| Upgrade: websocket |
| Connection: Upgrade |
| |
|←====== WebSocket Frame ======|
|====== WebSocket Frame ======→|
| |
三、SSE實現詳解
3.1 服務端實現(FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(question: str):
"""SSE流式對話接口"""
async def event_generator():
try:
# 1. 思考狀態
yield format_sse_message(
event="thinking",
data={"status": "retrieving", "message": "正在檢索知識庫..."}
)
await asyncio.sleep(0.5)
# 2. 檢索結果
docs = await search_knowledge_base(question)
yield format_sse_message(
event="sources",
data={"count": len(docs), "sources": [doc.metadata for doc in docs]}
)
# 3. 流式生成答案
async for token in llm.astream(question):
yield format_sse_message(
event="token",
data={"token": token}
)
# 4. 完成
yield format_sse_message(
event="done",
data={"status": "completed"}
)
except Exception as e:
yield format_sse_message(
event="error",
data={"error": str(e)}
)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用Nginx緩衝
}
)
def format_sse_message(event: str, data: dict) -> str:
"""格式化SSE消息"""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
3.2 客户端實現(TypeScript)
// services/chat.ts
import { fetchEventSource } from '@microsoft/fetch-event-source';
interface StreamCallbacks {
onThinking?: (data: any) => void;
onSources?: (data: any) => void;
onToken?: (token: string) => void;
onDone?: () => void;
onError?: (error: string) => void;
}
export async function sendMessageStream(
conversationId: string,
message: string,
callbacks: StreamCallbacks
) {
const ctrl = new AbortController();
try {
await fetchEventSource(`${API_URL}/chat/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getAccessToken()}`
},
body: JSON.stringify({
conversation_id: conversationId,
message: message
}),
signal: ctrl.signal,
// 處理不同類型的事件
onmessage(event) {
const data = JSON.parse(event.data);
switch (event.event) {
case 'thinking':
callbacks.onThinking?.(data);
break;
case 'sources':
callbacks.onSources?.(data);
break;
case 'token':
callbacks.onToken?.(data.token);
break;
case 'done':
callbacks.onDone?.();
break;
case 'error':
callbacks.onError?.(data.error);
break;
}
},
// 錯誤處理
onerror(err) {
console.error('SSE Error:', err);
ctrl.abort();
throw err;
},
// 自動重連
openWhenHidden: true
});
} catch (error) {
console.error('Stream error:', error);
throw error;
}
}
3.3 React組件使用
// components/ChatInterface.tsx
'use client'
import { useState } from 'react'
import { sendMessageStream } from '@/services/chat'
export default function ChatInterface() {
const [messages, setMessages] = useState<Message[]>([])
const [isThinking, setIsThinking] = useState(false)
const [currentAssistantMessage, setCurrentAssistantMessage] = useState('')
const handleSend = async (text: string) => {
// 添加用户消息
setMessages(prev => [...prev, {
role: 'user',
content: text
}])
// 重置助手消息
setCurrentAssistantMessage('')
setIsThinking(true)
try {
await sendMessageStream(conversationId, text, {
onThinking: (data) => {
console.log('思考中:', data.message)
},
onSources: (data) => {
console.log('檢索到', data.count, '條相關文檔')
setIsThinking(false)
},
onToken: (token) => {
// 實時追加Token
setCurrentAssistantMessage(prev => prev + token)
},
onDone: () => {
// 完成,保存消息
setMessages(prev => [...prev, {
role: 'assistant',
content: currentAssistantMessage
}])
setCurrentAssistantMessage('')
},
onError: (error) => {
console.error('錯誤:', error)
setIsThinking(false)
}
})
} catch (error) {
console.error('發送失敗:', error)
}
}
return (
<div className="chat-container">
{/* 消息列表 */}
{messages.map((msg, i) => (
<MessageBubble key={i} message={msg} />
))}
{/* 實時顯示助手回覆 */}
{currentAssistantMessage && (
<MessageBubble
message={{
role: 'assistant',
content: currentAssistantMessage
}}
isStreaming={true}
/>
)}
{/* 思考指示器 */}
{isThinking && <ThinkingIndicator />}
{/* 輸入框 */}
<ChatInput onSend={handleSend} />
</div>
)
}
四、WebSocket實現詳解
4.1 服務端實現(FastAPI)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict
import json
app = FastAPI()
# 連接管理器
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
async def send_message(self, client_id: str, message: dict):
if client_id in self.active_connections:
await self.active_connections[client_id].send_json(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket, client_id)
try:
while True:
# 接收客户端消息
data = await websocket.receive_json()
if data['type'] == 'chat':
# 發送思考狀態
await manager.send_message(client_id, {
'type': 'thinking',
'data': {'status': 'retrieving'}
})
# 流式生成答案
async for token in llm.astream(data['message']):
await manager.send_message(client_id, {
'type': 'token',
'data': {'token': token}
})
# 完成
await manager.send_message(client_id, {
'type': 'done',
'data': {'status': 'completed'}
})
except WebSocketDisconnect:
manager.disconnect(client_id)
4.2 客户端實現(TypeScript)
// services/websocket.ts
export class ChatWebSocket {
private ws: WebSocket | null = null
private reconnectAttempts = 0
private maxReconnectAttempts = 5
constructor(
private url: string,
private callbacks: {
onMessage: (data: any) => void
onError: (error: any) => void
onClose: () => void
}
) {}
connect() {
this.ws = new WebSocket(this.url)
this.ws.onopen = () => {
console.log('WebSocket connected')
this.reconnectAttempts = 0
}
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data)
this.callbacks.onMessage(data)
}
this.ws.onerror = (error) => {
console.error('WebSocket error:', error)
this.callbacks.onError(error)
}
this.ws.onclose = () => {
console.log('WebSocket closed')
this.callbacks.onClose()
// 自動重連
if (this.reconnectAttempts < this.maxReconnectAttempts) {
setTimeout(() => {
this.reconnectAttempts++
this.connect()
}, 1000 * this.reconnectAttempts)
}
}
}
send(message: any) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message))
}
}
close() {
this.ws?.close()
}
}
五、為什麼我們選擇SSE?
5.1 AI對話的特點分析
AI對話的通信模式:
- 用户發送問題(HTTP POST)
- AI流式返回答案(服務器推送)
- 不需要客户端主動推送數據
結論:單向通信,SSE完全滿足需求
5.2 SSE的優勢
1. 自動重連
// SSE自動重連(內置)
fetchEventSource(url, {
openWhenHidden: true // 頁面隱藏時也保持連接
})
// WebSocket需要手動實現
ws.onclose = () => {
setTimeout(() => reconnect(), 1000)
}
2. 代理友好
# Nginx配置SSE(簡單)
location /api/ {
proxy_pass http://backend;
proxy_buffering off; # 關鍵
}
# Nginx配置WebSocket(複雜)
location /ws/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
3. 實現簡單
# SSE實現(10行代碼)
async def event_generator():
for token in tokens:
yield f"data: {token}\n\n"
return StreamingResponse(event_generator())
# WebSocket實現(50+行代碼)
# 需要連接管理、心跳檢測、錯誤處理等
5.3 性能對比
測試場景:1000個併發連接,每秒推送100條消息
SSE:
- 內存佔用: 500MB
- CPU佔用: 20%
- 延遲: 10-20ms
WebSocket:
- 內存佔用: 800MB
- CPU佔用: 35%
- 延遲: 5-10ms
結論:SSE性能足夠,且資源佔用更低
六、生產環境最佳實踐
6.1 Nginx配置
server {
listen 80;
server_name api.example.com;
# SSE配置
location /api/chat/stream {
proxy_pass http://backend:8000;
# 關鍵配置
proxy_buffering off; # 禁用緩衝
proxy_cache off; # 禁用緩存
proxy_set_header Connection ''; # 保持連接
proxy_http_version 1.1; # HTTP/1.1
chunked_transfer_encoding on; # 分塊傳輸
# 超時配置
proxy_read_timeout 3600s; # 1小時超時
proxy_connect_timeout 60s;
# 頭部設置
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Accel-Buffering no; # 禁用加速緩衝
}
}
6.2 錯誤處理
// 完善的錯誤處理
export async function sendMessageStream(
message: string,
callbacks: StreamCallbacks
) {
const ctrl = new AbortController()
let retryCount = 0
const maxRetries = 3
const attemptStream = async () => {
try {
await fetchEventSource(url, {
signal: ctrl.signal,
async onopen(response) {
if (response.ok) {
retryCount = 0 // 重置重試計數
return
}
// 處理HTTP錯誤
if (response.status >= 400 && response.status < 500) {
throw new Error(`HTTP ${response.status}`)
}
},
onmessage(event) {
// 處理消息
},
onerror(err) {
// 重試邏輯
if (retryCount < maxRetries) {
retryCount++
console.log(`重試 ${retryCount}/${maxRetries}`)
return 1000 * retryCount // 返回重試延遲
}
// 超過重試次數,拋出錯誤
throw err
}
})
} catch (error) {
callbacks.onError?.(error)
}
}
await attemptStream()
// 返回取消函數
return () => ctrl.abort()
}
6.3 心跳檢測
# 服務端定期發送心跳
async def event_generator():
last_heartbeat = time.time()
async for token in llm.astream(question):
yield f"data: {token}\n\n"
# 每30秒發送心跳
if time.time() - last_heartbeat > 30:
yield ": heartbeat\n\n" # 註釋行,客户端會忽略
last_heartbeat = time.time()
// 客户端心跳檢測
let lastMessageTime = Date.now()
fetchEventSource(url, {
onmessage(event) {
lastMessageTime = Date.now()
// 處理消息
}
})
// 檢測超時
setInterval(() => {
if (Date.now() - lastMessageTime > 60000) {
console.warn('連接可能已斷開')
// 重新連接
}
}, 10000)
七、性能優化
7.1 連接池管理
# 限制併發連接數
from fastapi import HTTPException
import asyncio
active_connections = 0
MAX_CONNECTIONS = 1000
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
global active_connections
if active_connections >= MAX_CONNECTIONS:
raise HTTPException(
status_code=503,
detail="服務器繁忙,請稍後重試"
)
active_connections += 1
try:
async def event_generator():
# 生成事件
pass
return StreamingResponse(event_generator())
finally:
active_connections -= 1
7.2 緩衝優化
# 批量發送Token
async def event_generator():
buffer = []
buffer_size = 5 # 每5個Token發送一次
async for token in llm.astream(question):
buffer.append(token)
if len(buffer) >= buffer_size:
# 批量發送
yield format_sse_message("token", {
"tokens": buffer
})
buffer = []
# 發送剩餘Token
if buffer:
yield format_sse_message("token", {
"tokens": buffer
})
八、踩坑經驗
8.1 Nginx緩衝問題
❌ 問題: SSE消息不實時,延遲很大
# 錯誤配置
location /api/ {
proxy_pass http://backend;
# 默認開啓緩衝
}
✅ 解決: 禁用緩衝
location /api/ {
proxy_pass http://backend;
proxy_buffering off;
proxy_set_header X-Accel-Buffering no;
}
8.2 CORS問題
❌ 問題: SSE請求被CORS攔截
# 錯誤:未配置CORS
app = FastAPI()
✅ 解決: 正確配置CORS
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
8.3 內存泄漏
❌ 問題: 長時間運行後內存佔用越來越高
# 錯誤:未清理資源
async def event_generator():
async for chunk in llm.astream(question):
yield chunk
# 未清理LLM資源
✅ 解決: 及時清理資源
async def event_generator():
try:
async for chunk in llm.astream(question):
yield chunk
finally:
# 清理資源
await llm.cleanup()
九、總結
9.1 選擇建議
選擇SSE的場景:
✅ 服務器→客户端單向推送
✅ 實時通知、日誌流
✅ AI對話、流式生成
✅ 需要自動重連
✅ 需要簡單實現
選擇WebSocket的場景:
✅ 雙向實時通信
✅ 在線遊戲
✅ 協作編輯
✅ 視頻會議
✅ 需要二進制傳輸
9.2 核心要點
✅ SSE更簡單 - 基於HTTP,實現簡單
✅ 自動重連 - 內置重連機制
✅ 代理友好 - 無需特殊配置
✅ 性能足夠 - 滿足AI對話需求
✅ 成本更低 - 開發和維護成本低
下一篇預告: 《本地化部署的優勢:Ollama + Weaviate保護數據隱私》
作者簡介: 資深開發者,創業者。專注於視頻通訊技術領域。國內首本Flutter著作《Flutter技術入門與實戰》作者,另著有《Dart語言實戰》及《WebRTC音視頻開發》等書籍。多年從事視頻會議、遠程教育等技術研發,對於Android、iOS以及跨平台開發技術有比較深入的研究和應用,作為主要程序員開發了多個應用項目,涉及醫療、交通、銀行等領域。
學習資料:
- 項目地址
- 作者GitHub
歡迎交流: 如有問題歡迎在評論區討論 🚀