動態

詳情 返回 返回

fastify-sse-v2搭配EventSource實現SSE中的AI流式回覆打字機效果 - 動態 詳情

本文不贅述具體概念,通過具體案例效果,學習sse (Server-Sent Events)的具體實現,以react框架為例

SSE具體應用場景

SSE(Server-Sent Events,服務器推送事件)是一種基於 HTTP 的單向實時通信協議,核心特點是服務器主動向客户端推送數據,客户端僅被動接收,無需頻繁輪詢,且天然支持斷線重連、事件標識等特性。其應用場景主要集中在 “服務器需主動向客户端推送實時數據,且客户端無需向服務器發送高頻請求” 的場景

具體場景

  • 服務器監控到系統異常如負載過高、數據庫連接失敗等,通過SSE將告警信息推送給運維人員的管理後台界面(物聯網設備故障也可以通過SSE進行告警信息的推送)
  • 審批類:業務系統中審批流程通過或者駁回時,向申請人推送審批結果通知
  • 消息推送類:用户收到新私信、點贊、評論、關注請求時,服務器通過 SSE 將通知推送給對應用户的客户端,實現實時提醒
  • 訂單狀態變更(如商家接單、快遞攬收、派送中)時,通過 SSE 推送狀態通知;商品降價、補貨時,向訂閲該商品的用户推送提醒
  • 等等...
總而言之,就是及時推送消息,無需用户手動刷新獲取最新數據

效果圖

  • 本文給到的效果示例
  • 對應線上演示效果地址:https://ashuai.site/reactExamples/sse
  • github倉庫:https://github.com/shuirongshuifu/react-examples

需求場景邏輯流程

  • 有一段文章,當用户點擊開始接收按鈕的時候
  • 前端需要使用new EventSource去向後端的sse接口建立鏈接
  • 鏈接建立以後,後端就會讀取逐字逐段地去掃描文章文本,然後開始發送message消息事件

    • 假設我們的文章是const article = 亮答曰:"自董卓以來,豪傑並起..."
    • 單純的message消息事件有些籠統,我們可以把消息事件自定義細分成為消息開始start事件、消息傳輸chunk事件、消息發送完畢end事件
    • 這樣控制能夠更好地讓前端區分,做對應的UI操作
  • 每個消息中,會帶着一段文章文本字符串,交給前端去處理

    • 實際上,如果是單純的文本回復需求,前端直接拿到chunk事件中的文本字符串即可使用
    • 但是,如果回覆的內容中除了純文本之外,還要回復鏈接、圖片、甚至代碼等類型的話
    • 剛剛提到的chunk事件就不夠用了,所以就可以新增一些事件類型
    • 比如chunk事件裏面存放純文本
    • imgChunk事件裏面存放圖片
    • linkChunk事件裏面存放鏈接等
    • 大家可以根據自己的業務需求,自定義很多的事件類型
    • 無論是什麼事件類型,前端都可以通過eventSource監聽到
  • 前端可以監聽到對應事件後,不斷地把信息渲染到UI視圖上(實現光標跟隨的打字機效果)

前端監聽對應自定義事件類型比如:

// 創建SSE連接
const eventSource = new EventSource('https://ashuai.site/fastify-api/sse/article/66')

// 監聽後端自定義的start事件
eventSource.addEventListener('start', (event) => {
    console.log('SSE開始:', event)
})

// 監聽後端自定義的chunk事件
eventSource.addEventListener('chunk', (event) => {
    console.log('接收到的數據:', event)
})

// 監聽後端自定義的end事件
eventSource.addEventListener('end', (event) => {
    console.log('SSE結束:', event)
})

// 監聽後端自定義的error事件
eventSource.addEventListener('error', (event) => {
    console.error('SSE錯誤:', event)
})

因為原生事件有些籠統,不便於前端更精細化控制

const eventSource = new EventSource('https://ashuai.site/fastify-api/sse/article/66')

eventSource.onmessage = (event) => {  }

eventSource.onerror = (event) => {  }

這個接口https://ashuai.site/fastify-api/sse/article/66大家可以直接使用,可以直接將其複製粘貼到地址欄並回車,或者使用curl命令

後端代碼

後端使用fastify框架,搭配fastify-sse-v2這個sse的包

Controller層

  • Controller負責接收用户發送的http的Request請求
  • 並處理請求參數與數據校驗
  • 校驗通過後,會調用Service層進行業務邏輯處理,
  • 處理Service層返回結果並構造響應Response異常處理
const createSseService = require('../services/sseService')
const ResponseUtils = require('../utils/response')
const { validateId } = require('../utils/validation')

// 根據ID獲取對應文章(SSE分段輸出)
const getArticleById = async (request, reply) => {
    try {
        const { id } = request.params

        // 參數校驗
        const idValidation = validateId(id)
        if (!idValidation.isValid) {
            return ResponseUtils.sendValidationError(reply, 'ID參數無效', idValidation.errors)
        }

        const sseService = createSseService()
        
        // 設置自定義響應頭
        reply.header('hello', 'world')

        // 定義處理每個數據塊的回調函數
        const handleChunk = async (chunk, isLast) => {
            reply.sse({
                event: chunk.type,
                data: JSON.stringify(chunk.data)
            })
        }

        // 調用service層的流式傳輸方法
        const result = await sseService.streamArticleWithSSE(
            idValidation.data,
            handleChunk,
            { chunkSize: 10, delayMs: 500 }
        )

        if (result === null) {
            return ResponseUtils.sendNotFound(reply, '文章不存在')
        }

    } catch (error) {
        request.log.error(error)
        // SSE錯誤處理
        reply.sse({
            event: 'error',
            data: JSON.stringify({
                code: 500,
                message: '查詢失敗',
                error: error.message
            })
        })
    }
}

Service層

  • Service層主要是處理核心業務邏輯
  • 協調數據訪問,就是處理數據
  • 比如調用DAO層(或Repository層) 來完成mysql數據查詢
  • 並進行數據處理與轉換等
  • 所以sse數據的核心加工處理在這一層

假設我們的文章是這個

const article = `亮答曰:"自董卓以來,豪傑並起,跨州連郡者不可勝數。曹操比於袁紹,則名微而眾寡。然操遂能克紹,以弱為強者,非惟天時,抑亦人謀也。
今操已擁百萬之眾,挾天子而令諸侯,此誠不可與爭鋒。孫權據有江東,已歷三世,國險而民附,賢能為之用,此可以為援而不可圖也。
荊州北據漢、沔,利盡南海,東連吳會,西通巴、蜀,此用武之國,而其主不能守,此殆天所以資將軍,將軍豈有意乎?
益州險塞,沃野千里,天府之土,高祖因之以成帝業。劉璋闇弱,張魯在北,民殷國富而不知存恤,智能之士思得明君。
將軍既帝室之胄,信義著於四海,總攬英雄,思賢如渴,若跨有荊、益,保其巖阻,西和諸戎,南撫夷越,外結好孫權,內修政理;
天下有變,則命一上將將荊州之軍以向宛、洛,將軍身率益州之眾出於秦川,百姓孰敢不簞食壺漿以迎將軍者乎?誠如是,則霸業可成,漢室可興矣。"`

對應Service層代碼

const createSseService = () => {

    // 根據前端參數ID獲取對應文章(這裏模擬從數據庫撈取數據)
    const getArticleById = (id) => {
        return { id, article, // 直接使用上述文章 }
    }

    /**
     * SSE流式傳輸文章
     * @param {number} id - 文章ID
     * @param {Function} onChunk - 處理每個數據塊的回調函數 (chunk, isLast) => void
     * @param {Object} options - 配置選項
     * @param {number} options.chunkSize - 每個分段的字符數,默認10
     * @param {number} options.delayMs - 每個分段之間的延遲時間(毫秒),默認500
     * @returns {Promise<boolean|null>} 成功返回true,文章不存在返回null
     */
    const streamArticleWithSSE = async (id, onChunk, options = {}) => {
        const { chunkSize = 10, delayMs = 500 } = options
        
        const articleData = getArticleById(id)
        const { article } = articleData

        // 發送開始事件
        await onChunk({
            type: 'start',
            data: {
                id: articleData.id,
                totalLength: article.length
            }
        }, false)

        // 等待延遲後開始發送內容
        await new Promise(resolve => setTimeout(resolve, delayMs))

        // 分段發送文章內容
        let currentIndex = 0
        const totalChunks = Math.ceil(article.length / chunkSize)

        while (currentIndex < article.length) {
            const chunk = article.slice(currentIndex, currentIndex + chunkSize)
            
            await onChunk({
                type: 'chunk',
                data: {
                    index: Math.floor(currentIndex / chunkSize),
                    content: chunk,
                    progress: Math.round(((currentIndex + chunkSize) / article.length) * 100)
                }
            }, false)
            
            currentIndex += chunkSize

            // 如果還有下一段,等待延遲
            if (currentIndex < article.length) {
                await new Promise(resolve => setTimeout(resolve, delayMs))
            }
        }

        // 發送結束事件
        await onChunk({
            type: 'end',
            data: {
                totalChunks,
                message: '文章發送完成'
            }
        }, true)

        return true
    }

    // 返回所有方法
    return { getArticleById, streamArticleWithSSE }
}

module.exports = createSseService

這樣的話,我們就有了一個sse的接口了

Nginx設置對應響應頭

注意,sse需要設置特殊的響應頭,這裏我們使用nginx代理,可以配置如下

# SSE專用配置,接口調用
location /fastify-api/sse/ {
    proxy_pass http://localhost:33333/fastify/sse/;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
    proxy_set_header X-Forwarded-Host $server_name;

    # SSE特殊配置
    proxy_buffering off; #避免阻塞流式輸出
    proxy_cache off; #防止緩存干擾實時數據
    proxy_set_header Connection ""; #維持長連接

    # SSE超時
    proxy_connect_timeout 30s;
    proxy_send_timeout 30s;
    proxy_read_timeout 86400s;  # 24小時,適合SSE
}

前端

光標跟隨實現

  • 如果只是展示普通的文本,那麼可以創建一個元素
  • 將這個元素擺放在文字的最後
  • 通過動畫的方式,模擬出來光標閃爍的效果
  • 如下
<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Document</title>
    <style>
        .cursor {
            display: inline-block;
            width: 2px;
            height: 1.2em;
            background-color: #1677ff;
            margin-left: 2px;
            /* 光標元素的底部與父元素的文本底部對齊 */
            vertical-align: text-bottom;
            animation: blink 1s infinite;
        }

        @keyframes blink {

            0%,
            50% {
                opacity: 1;
            }

            51%,
            100% {
                opacity: 0;
            }
        }
    </style>
</head>

<body>
    <p>你好,這個世界 <span class="cursor"></span> </p>
</body>

</html>

效果圖

  • 如果要展示覆雜的帶層級的結構,比如代碼、鏈接等,就要通過js去動態控制
  • 找到最後一個文本接口、追加一個問題,並獲取文字的位置,再設置光標到文字為止

可以參考這個視頻的實現方案:https://www.douyin.com/video/7553576140888116516

使用Promise.resolve()的鏈式調用,實現隊列方案

案例效果中,因為是不斷地完成打字機渲染數據的效果,所以我們要確保上一次渲染完成後,再執行下一次的打字機渲染,這裏我們採用Promise.resolve()的鏈式調用,實現隊列方案(保證順序)

案例分析,假設我們要發送五個請求(通過id傳參的形式區分),下方的寫法,無法保證輸入的結果,對應的是參數 1 2 3 4 5的結果

<!DOCTYPE html>
<html lang="en">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <script src="https://cdn.bootcdn.net/ajax/libs/axios/1.3.0/axios.js"></script>
</head>

<body>
  <script>
    const base = 'https://ashuai.work/api/xyj?id='
    const ids = ['1', '2', '3', '4', '5']

    ids.forEach(async (id) => {
      const res = await axios.get(`${base}${id}`)
      console.log('res', res.data.data)
    })
  </script>
</body>

</html>

但是,如果我們使用使用Promise的鏈式調用,就能夠實現通過.then()實現入隊操作,而Promise會自動執.then的代碼做到自動出隊的效果,如下

<script>
    const base = 'https://ashuai.work/api/xyj?id='
    const ids = ['1', '2', '3', '4', '5']

    // 初始化一個"空的已完成Promise"作為起點
    let promiseChain = Promise.resolve()

    // 用forEach遍歷
    ids.forEach(id => {
        // 將新任務追加到Promise鏈的末尾(變量重新指向新創建的Promise)
        promiseChain = promiseChain.then(async () => {
            const res = await axios.get(`${base}${id}`)
            console.log('res', res.data.data)
            return res.data.data
        })
    })
</script>

或者使用for of也行,也能保證順序

const base = 'https://ashuai.work/api/xyj?id='
const ids = ['1', '2', '3', '4', '5']

async function fetchSequentially() {
  for (const id of ids) {
    const res = await axios.get(`${base}${id}`)
    console.log('res', res.data.data)
  }
  console.log('全部完成')
}

// 調用函數
fetchSequentially()

完整前端代碼

前端控制流程

用户點擊"開始接收"
  ↓
startSSE() - 重置狀態 + 建立連接
  ├─ articleText='', progress=0
  ├─ isStreaming=true (顯示光標█)
  └─ new EventSource(url)
  ↓
━━━━ 服務器: start 事件 ━━━━
  ↓
console.log('SSE開始')
  ↓
━━━━ 服務器: chunk 事件 (多次) ━━━━
  ↓
chunk1 到達 → Promise鏈
  └─ typeWriter("文本1") 
      └─ 逐字顯示 (50ms/字) ⏰
          └─ 完成後 setProgress(20)
  ↓
chunk2 到達 → Promise鏈 (等待chunk1完成)
  └─ typeWriter("文本2")
      └─ 逐字顯示 (50ms/字) ⏰
          └─ 完成後 setProgress(40)
  ↓
chunk3 到達 → Promise鏈 (等待chunk2完成)
  └─ typeWriter("文本3")
      └─ 逐字顯示 (50ms/字) ⏰
          └─ 完成後 setProgress(100)
  ↓
━━━━ 服務器: end 事件 ━━━━
  ↓
Promise鏈 (等待所有typeWriter完成)
  └─ setIsStreaming(false) (隱藏光標)
      └─ eventSource.close()
          └─ 完成 ✓

注意,這裏是使用Promise鏈式調用保證順序的

【Promise 鏈保證順序】

服務器快速發送:
  chunk1 → chunk2 → chunk3 → end

前端執行順序:
  typeWriter(chunk1) ✓
    ↓ 等待完成
  typeWriter(chunk2) ✓
    ↓ 等待完成
  typeWriter(chunk3) ✓
    ↓ 等待完成
  隱藏光標 ✓

核心: promiseChainRef 確保串行執行,不會亂序

React代碼

import React, { useState, useEffect, useRef } from 'react'
import { Button, Progress } from 'antd'
import './Sse.css'

export default function Sse() {
    const [articleText, setArticleText] = useState('') // 文章內容
    const currentTextRef = useRef('') // 存儲打字機效果中累積的文本內容
    const [progress, setProgress] = useState(0) // sse返回的進度
    const [isStreaming, setIsStreaming] = useState(false) // 是否正在接收
    const eventSourceRef = useRef(null) // sse鏈接實例
    const typeWriterTimerRef = useRef(null) // 定時器的id,用於清理定時器
    const promiseChainRef = useRef(Promise.resolve()) // Promise鏈

    // 清理函數
    useEffect(() => {
        return () => {
            // 組件卸載清理關閉sse連接和定時器
            if (eventSourceRef.current) {
                eventSourceRef.current.close()
            }
            if (typeWriterTimerRef.current) {
                clearInterval(typeWriterTimerRef.current)
            }
        }
    }, [])

    // 打字機效果返回Promise,確保順序執行
    const typeWriter = (text) => {
        return new Promise((resolve) => {
            let index = 0
            const interval = setInterval(() => {
                // 組件被卸載了,停止打字機效果
                if (!typeWriterTimerRef.current) {
                    clearInterval(interval)
                    return
                }
                // 定時器循環賦值刷新UI,直到賦值完成,再執行resolve
                if (index < text.length) {
                    currentTextRef.current += text[index]
                    setArticleText(currentTextRef.current)
                    index++
                } else {
                    clearInterval(interval)
                    typeWriterTimerRef.current = null
                    resolve()
                }
            }, 50)
            typeWriterTimerRef.current = interval
        })
    }

    // 開始SSE連接
    const startSSE = () => {
        if (eventSourceRef.current) {
            eventSourceRef.current.close()
        }

        // 重置狀態
        currentTextRef.current = ''
        setArticleText('')
        setProgress(0)
        setIsStreaming(true)
        promiseChainRef.current = Promise.resolve() // 重置Promise鏈

        // 創建SSE連接
        const eventSource = new EventSource('https://ashuai.site/fastify-api/sse/article/66')
        eventSourceRef.current = eventSource

        // 監聽後端自定義的start事件
        eventSource.addEventListener('start', (event) => {
            const data = JSON.parse(event.data)
            console.log('SSE開始:', data)
        })

        // 監聽後端自定義的chunk事件
        eventSource.addEventListener('chunk', (event) => {
            const data = JSON.parse(event.data)
            console.log('接收到的數據:', data)

            // 鏈式調用:等前一個完成後再執行,這樣能夠確保前一句話完成,再執行下一句話
            promiseChainRef.current = promiseChainRef.current
                .then(() => typeWriter(data.content))
                .then(() => setProgress(data.progress))
        })

        // 監聽後端自定義的end事件
        eventSource.addEventListener('end', (event) => {
            const data = JSON.parse(event.data)
            console.log('SSE結束:', data)

            // 等待所有Promise鏈完成後再設置isStreaming為false
            // 防止see結束了,文字還沒有打完,光標消失了
            promiseChainRef.current = promiseChainRef.current.then(() => {
                setIsStreaming(false)
            })

            // 關閉連接
            eventSource.close()
            eventSourceRef.current = null
        })

        // 監聽後端自定義的error事件
        eventSource.addEventListener('error', (event) => {
            console.error('SSE錯誤:', event)
            setIsStreaming(false)

            if (eventSourceRef.current) {
                eventSourceRef.current.close()
                eventSourceRef.current = null
            }
            // 也可以加上定時器重連機制
        })
    }

    // 取消SSE連接
    const cancelSSE = () => {
        if (eventSourceRef.current) {
            eventSourceRef.current.close()
            eventSourceRef.current = null
        }
        if (typeWriterTimerRef.current) {
            clearInterval(typeWriterTimerRef.current)
            typeWriterTimerRef.current = null
        }
        setIsStreaming(false)
        promiseChainRef.current = Promise.resolve()
    }

    return (
        <div style={{ width: '720px' }}>
            <div className="article-container">
                <div className="article-text">
                    {articleText}
                    {/* 當正在接收時,使用span元素模擬鼠標光標 */}
                    {isStreaming && <span className="cursor"></span>}
                </div>
            </div>
            <Progress percent={progress} />
            <Button
                type="primary"
                onClick={startSSE}
                disabled={isStreaming}
            >
                {isStreaming ? '正在接收...' : '開始接收'}
            </Button>
            <Button onClick={cancelSSE} disabled={!isStreaming}>{isStreaming ? '取消鏈接' : '未曾鏈接'}</Button>
        </div>
    )
}

Css代碼

.article-container {
    background: #f7fafc;
    border-radius: 8px;
    padding: 20px;
    border-left: 4px solid #1677ff;
    min-height: 240px;
}

.article-text {
    font-size: 16px;
    line-height: 1.8;
    color: #2d3748;
    white-space: pre-wrap;
    word-wrap: break-word;
}

.cursor {
    display: inline-block;
    width: 2px;
    height: 1.2em;
    background-color: #1677ff;
    margin-left: 2px;
    /* 光標元素的底部與父元素的文本底部對齊 */
    vertical-align: text-bottom;
    animation: blink 1s infinite;
}

@keyframes blink {

    0%,
    50% {
        opacity: 1;
    }

    51%,
    100% {
        opacity: 0;
    }
}

原生new EventSource的不足之處

  • 原生new EventSource主要有以下不足之處
  • 首先無法自定義請求頭,比如我想要在接口的請求頭中添加'Authorization': 'myToken'是不好控制的,畢竟可能業務中接口要有鑑權才行
  • 然後,僅支持get方法,post不能用,參數只能通過 URL 傳遞,比如上述筆者提供的接口,就是通過get請求的params拿到的參數,即 new EventSource('https://ashuai.site/fastify-api/sse/article/66') 這裏的參數比如是66或者67等
  • 當然 我們也可以把token拼接到params中去,比如new EventSource('https://ashuai.site/fastify-api/sse/article/66/user_token')
  • 或者也可以使用cookie去帶着token
  • 但是這畢竟不優雅,前人已經提供好了一些包解決方案供我們使用
  • 比如event-source-polyfill和fetch-event-source

event-source-polyfill

示例代碼

import { EventSourcePolyfill } from 'event-source-polyfill';

// 初始化配置項多多
const es = new EventSourcePolyfill('/api/sse', {
  method: 'POST', // 支持POST
  headers: {
    'Authorization': 'user_token',
    'Content-Type': 'application/json'
  },
  body: JSON.stringify({ id: 66 }) // POST請求體
});

// 監聽服務器發送的消息
es.onmessage = (event) => { console.log('收到數據:', event.data) };

// 監聽連接打開
es.onopen = () => { console.log('連接已建立') };

// 監聽錯誤
es.onerror = (error) => { console.error('連接錯誤:', error) };

或者使用fetch-event-source

fetch-event-source

示例代碼

import { fetchEventSource } from '@microsoft/fetch-event-source';

fetchEventSource('/api/sse', {
  method: 'POST',
  headers: {
    'Authorization': 'user_token',
    'Content-Type': 'application/json'
  },
  body: JSON.stringify({ id: 66 }),
  onopen: (response) => {
    if (!response.ok) {
      console.error('連接失敗:', response.status);
    } else {
      console.log('連接已建立');
    }
  },
  onmessage: (event) => {
    console.log('收到數據:', event.data);
  },
  onclose: () => {
    console.log('連接關閉');
  },
  onerror: (error) => {
    console.error('連接錯誤:', error);
    // 可以返回true表示重試連接
    return true;
  }
});
特性 event-source-polyfill fetch-event-source
底層依賴 模擬原生 EventSource 基於現代 Fetch API
兼容性 支持舊瀏覽器(如 IE9+) 依賴 Fetch,需兼容現代瀏覽器
靈活性 有限(接近原生 API) 更高(支持 Fetch 所有特性)
重試與控制 簡單重試邏輯 自定義重試策略、取消機制

筆者推薦fetch-event-source這個包,筆者在實際項目中,使用的也是這個,沒有用原生EventSource寫法

F12的Network中請求類型的區別

注意,如果是使用原生的EventSource,那麼F12的Network中請求type類型是eventssource

如果是使用fetch方式的話,那麼請求type類型則是fetch,我們打開ChatGpt發現其類型正是fetch

但是,無論哪種,後端的響應頭,都要返回Content-Type text/event-stream; charset=utf-8來告訴瀏覽器,這是一個事件流,這樣的話才能有如下圖的EventStream

筆者的返回EventStream

ChatGpt的返回EventStream

大家可以打開豆包、通義千文,大家會發現這兩家也是用的fetch技術,並沒有直接使用EventSource,不過deepseek特殊一些用的是xhr方式回覆消息,但同樣也是Content-Type text/event-stream; charset=utf-8

除此之外,還可以使用Fetch ReadableStream讀取Chunked流實現see功能如下

使用Fetch ReadableStream讀取Chunked分塊流實現see功能

核心:

rawReply.writeHead(200, {
    'Transfer-Encoding': 'chunked',  // 👈 關鍵!告訴瀏覽器用分塊方式接收
    'Content-Type': 'text/plain; charset=utf-8',
    ...
})

瀏覽器收到這個頭後會

  • 知道數據會分多次發送過來
  • 自動解析每個chunk前面的十六進制長度
  • 把解析後的純數據交給JavaScript
  • 持續讀取直到收到 0\r\n\r\n(結束標記)
這種方式稍微複雜一些,應對特殊的需求能用上 一般來説,fetch-event-source 夠用了

Fetch + ReadableStream

  • 是更底層、更靈活的流式數據接收方式
  • 可以接收任意格式的流式數據(JSON、純文本、二進制等)
  • 需要自己處理數據解析、緩衝區等
  • 可以實現 SSE 的效果,甚至更強大

效果圖

線上示例地址:https://ashuai.site/reactExamples/sse2

Controller層

// 根據ID獲取對應文章2(chunked傳輸)
const getArticleById2 = async (request, reply) => {
    try {
        const { id } = request.params

        // 參數校驗
        const idValidation = validateId(id)
        if (!idValidation.isValid) {
            return ResponseUtils.sendValidationError(reply, 'ID參數無效', idValidation.errors)
        }

        const sseService = createSseService()

        // 獲取原始響應對象
        const rawReply = reply.raw

        // 設置響應頭
        rawReply.writeHead(200, {
            'Content-Type': 'text/plain; charset=utf-8',
            'Transfer-Encoding': 'chunked',
            'Cache-Control': 'no-cache',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
            'Access-Control-Allow-Headers': 'Content-Type, Authorization',
            'Connection': 'keep-alive'
        })
        let isConnectionClosed = false

        // 監聽連接關閉事件
        rawReply.on('close', () => {
            isConnectionClosed = true
        })
        rawReply.on('finish', () => {
            isConnectionClosed = true
        })

        // 定義寫入chunk的輔助函數
        const writeChunk = (data) => {
            if (isConnectionClosed) return
            
            const dataStr = JSON.stringify(data)
            const bufferChunk = Buffer.from(dataStr, 'utf8')
            rawReply.write(`${bufferChunk.length.toString(16)}\r\n`)
            rawReply.write(dataStr)
            rawReply.write('\r\n')
        }

        // 定義處理每個數據塊的回調函數
        const handleChunk = async (chunk, isLast) => {
            if (isConnectionClosed) return

            writeChunk(chunk)

            if (isLast) {
                rawReply.write('0\r\n\r\n')
                rawReply.end()
            }
        }

        // 調用service層的流式傳輸方法
        const result = await sseService.streamArticleWithChunked(
            idValidation.data,
            handleChunk,
            { chunkSize: 10, delayMs: 500 }
        )

        if (result === null) {
            return ResponseUtils.sendNotFound(reply, '文章不存在')
        }

    } catch (error) {
        request.log.error(error)
        // chunked傳輸錯誤處理
        const errorData = JSON.stringify({
            code: 500,
            message: '查詢失敗',
            error: error.message,
            type: 'error'
        })
        const errorChunk = Buffer.from(errorData, 'utf8')
        reply.raw.write(`${errorChunk.length.toString(16)}\r\n`)
        reply.raw.write(errorData)
        reply.raw.write('\r\n0\r\n\r\n')
        reply.raw.end()
    }
}

Service層

數據

const article2 = `臣本布衣,躬耕於南陽,苟全性命於亂世,不求聞達於諸侯。
先帝不以臣卑鄙,猥自枉屈,三顧臣於草廬之中,諮臣以當世之事,由是感激,遂許先帝以驅馳。
後值傾覆,受任於敗軍之際,奉命於危難之間,爾來二十有一年矣。
先帝知臣謹慎,故臨崩寄臣以大事也。受命以來,夙夜憂嘆,恐託付不效,以傷先帝之明;
故五月渡瀘,深入不毛。今南方已定,兵甲已足,當獎率三軍,北定中原,庶竭駑鈍,攘除奸兇,興復漢室,還於舊都。
此臣所以報先帝而忠陛下之職分也。至於斟酌損益,進盡忠言,則攸之、禕、允之任也。
願陛下託臣以討賊興復之效,不效,則治臣之罪,以告先帝之靈。若無興德之言,則責攸之、禕、允等之慢,以彰其咎;
陛下亦宜自謀,以諮諏善道,察納雅言,深追先帝遺詔。臣不勝受恩感激。今當遠離,臨表涕零,不知所言。`

業務邏輯處理


const createSseService = () => {

    // 根據ID獲取對應文章2(用於chunked傳輸)
    const getArticleById2 = (id) => {
        return {
            id,
            article: article2,
        }
    }

    /**
     * Chunked傳輸文章
     * @param {number} id - 文章ID
     * @param {Function} onChunk - 處理每個數據塊的回調函數 (chunk, isLast) => void
     * @param {Object} options - 配置選項
     * @param {number} options.chunkSize - 每個分段的字符數,默認10
     * @param {number} options.delayMs - 每個分段之間的延遲時間(毫秒),默認500
     * @returns {Promise<boolean|null>} 成功返回true,文章不存在返回null
     */
    const streamArticleWithChunked = async (id, onChunk, options = {}) => {
        const { chunkSize = 10, delayMs = 500 } = options
        
        const articleData = getArticleById2(id)
        const { article } = articleData

        // 發送開始標記
        await onChunk({
            id: articleData.id,
            totalLength: article.length,
            type: 'chunked'
        }, false)

        // 等待初始延遲
        await new Promise(resolve => setTimeout(resolve, 100))

        // 分段發送文章內容
        let currentIndex = 0
        const totalChunks = Math.ceil(article.length / chunkSize)

        while (currentIndex < article.length) {
            const chunk = article.slice(currentIndex, currentIndex + chunkSize)
            
            await onChunk({
                index: Math.floor(currentIndex / chunkSize),
                content: chunk,
                progress: Math.round(((currentIndex + chunkSize) / article.length) * 100),
                type: 'chunk'
            }, false)
            
            currentIndex += chunkSize

            // 如果還有下一段,等待延遲
            if (currentIndex < article.length) {
                await new Promise(resolve => setTimeout(resolve, delayMs))
            }
        }

        // 發送結束標記
        await onChunk({
            totalChunks,
            message: '文章發送完成',
            type: 'end'
        }, true)

        return true
    }

    // 返回所有方法
    return {
        getArticleById2,
        streamArticleWithChunked
    }
}

module.exports = createSseService

前端代碼

import React, { useState, useRef, useEffect } from 'react'
import { Button, Progress } from 'antd'
import './Sse2.css'

export default function Sse2() {
    const [articleText, setArticleText] = useState('') // 顯示在界面上的文章內容
    const [progress, setProgress] = useState(0) // 當前加載進度(0-100)
    const [isStreaming, setIsStreaming] = useState(false) // 標記是否正在接收數據流
    const currentTextRef = useRef('') // 打字機效果中實時累積的文本內容(緩存)
    const typeWriterTimerRef = useRef(null) // 打字機定時器的 ID,用於在需要時清理定時器
    const promiseChainRef = useRef(Promise.resolve()) // Promise 鏈,確保多個打字機效果按順序執行,不會亂序
    const abortControllerRef = useRef(null) // 用於中斷 fetch 請求的控制器

    useEffect(() => {
        return () => {
            if (abortControllerRef.current) {
                abortControllerRef.current.abort()
            }
            if (typeWriterTimerRef.current) {
                clearInterval(typeWriterTimerRef.current)
            }
        }
    }, [])

    const typeWriter = (text) => {
        return new Promise((resolve) => {
            let index = 0 // 當前要顯示的字符索引
            const interval = setInterval(() => {
                // 安全檢查:如果組件已卸載(定時器被清除),停止執行
                if (!typeWriterTimerRef.current) {
                    clearInterval(interval)
                    return
                }
                
                // 還有字符沒顯示完,繼續逐個添加
                if (index < text.length) {
                    currentTextRef.current += text[index] // 累積文本到 ref 中
                    setArticleText(currentTextRef.current) // 更新界面顯示
                    index++ // 移動到下一個字符
                } else {
                    // 所有字符都顯示完了,清理定時器並通知 Promise 完成
                    clearInterval(interval)
                    typeWriterTimerRef.current = null
                    resolve() // 告訴外部:這段文本顯示完了,可以顯示下一段了
                }
            }, 50) // 每 50ms 顯示一個字符
            
            typeWriterTimerRef.current = interval // 保存定時器 ID,方便後續清理
        })
    }

    /**
     * 處理從服務器接收到的分塊數據
     * @param {Object} data - 服務器返回的數據對象
     * 
     * 數據類型説明:
     * - type: 'chunk' -> 文章內容片段,需要用打字機效果顯示
     * - type: 'end' -> 傳輸結束標記
     * - type: 'error' -> 錯誤信息
     */
    const handleChunkedData = async (data) => {
        // 情況1:接收到一段文章內容
        if (data.type === 'chunk') {
            promiseChainRef.current = promiseChainRef.current
                .then(() => typeWriter(data.content))   // 先等上一段顯示完
                .then(() => setProgress(data.progress)) // 再更新進度條
            return
        }

        // 情況2:接收到結束標記
        if (data.type === 'end') {
            // 等待所有文本都顯示完成後,再標記傳輸結束
            // 這樣不會出現文本還沒顯示完,按鈕就變成可點擊的情況
            promiseChainRef.current = promiseChainRef.current.then(() => {
                setIsStreaming(false)
            })
            return
        }

        // 情況3:服務器返回了錯誤
        if (data.type === 'error') {
            throw new Error(data.message || '服務器返回錯誤')
        }
    }

    /**
     * 使用 fetch + ReadableStream 接收服務器的流式數據
     * 
     * 流式傳輸的優勢:
     * 1. 數據邊接收邊顯示,不用等所有數據到達後才顯示
     * 2. 類似於 ChatGPT 的效果,用户體驗更好
     * 3. 可以處理大量數據,不會一次性佔用太多內存
     */
    const startChunkedStream = async () => {
        // 如果之前有未完成的請求,先取消掉
        if (abortControllerRef.current) {
            abortControllerRef.current.abort()
        }

        // 創建新的取消控制器,用於取消這次請求
        abortControllerRef.current = new AbortController()

        // 重置所有狀態,準備接收新數據
        currentTextRef.current = ''      // 清空緩存的文本
        setArticleText('')               // 清空界面顯示
        setProgress(0)                   // 進度歸零
        setIsStreaming(true)             // 標記為正在接收
        promiseChainRef.current = Promise.resolve() // 重置 Promise 鏈

        try {
            // 向服務器發起請求
            const response = await fetch('https://ashuai.site/fastify-api/sse/article2/1', {
                signal: abortControllerRef.current.signal // 傳入取消控制器,可隨時中斷請求
            })

            // 檢查請求是否成功
            if (!response.ok) {
                throw new Error(`HTTP ${response.status}: ${response.statusText}`)
            }

            // 獲取響應體的讀取器(用於讀取流式數據)
            const reader = response.body.getReader()
            // 創建文本解碼器(將二進制數據轉換為文本)
            const decoder = new TextDecoder()
            // 緩衝區:用於存儲不完整的數據
            let buffer = ''

            // 循環讀取數據流
            while (true) {
                // 讀取一塊數據
                const { done, value } = await reader.read()

                // 如果數據讀取完畢,退出循環
                if (done) {
                    break
                }

                // 將二進制數據解碼成文本
                const chunk = decoder.decode(value, { stream: true })
                buffer += chunk // 累積到緩衝區

                // 按行分割數據(服務器每次發送一個完整的 JSON 對象,以換行符分隔)
                const lines = buffer.split('\n')
                
                // 處理每一行數據(除了最後一行,因為最後一行可能不完整)
                for (let i = 0; i < lines.length - 1; i++) {
                    const line = lines[i].trim()
                    // 忽略空行和 '0'('0' 是 chunked 編碼的結束標記)
                    if (line && line !== '0') {
                        try {
                            // 將 JSON 字符串解析為對象
                            const data = JSON.parse(line)
                            // 處理這條數據(顯示文本、更新進度等)
                            await handleChunkedData(data)
                        } catch (e) {
                            // 如果解析失敗,忽略這條數據,繼續處理下一條
                        }
                    }
                }

                // 將不完整的行保留在緩衝區中,等待下次接收數據時拼接
                const lastNewlineIndex = buffer.lastIndexOf('\n')
                if (lastNewlineIndex !== -1) {
                    buffer = buffer.substring(lastNewlineIndex + 1)
                }
            }

        } catch (error) {
            // 處理錯誤
            if (error.name === 'AbortError') {
                console.log('傳輸已取消')
            } else {
                console.error('Chunked傳輸錯誤:', error)
            }
            setIsStreaming(false)
        } finally {
            // 無論成功或失敗,都清空取消控制器
            abortControllerRef.current = null
        }
    }

    /**
     * 取消正在進行的流式傳輸
     * 需要清理三個東西:
     * 1. 網絡請求
     * 2. 打字機定時器
     * 3. Promise 鏈
     */
    const cancelStream = () => {
        // 1. 取消網絡請求
        if (abortControllerRef.current) {
            abortControllerRef.current.abort()
            abortControllerRef.current = null
        }
        
        // 2. 停止打字機效果
        if (typeWriterTimerRef.current) {
            clearInterval(typeWriterTimerRef.current)
            typeWriterTimerRef.current = null
        }
        
        // 3. 更新狀態
        setIsStreaming(false)
        
        // 4. 重置 Promise 鏈
        promiseChainRef.current = Promise.resolve()
    }

    return (
        <div style={{ width: '720px' }}>
            {/* 文章顯示區域 */}
            <div className="article-container">
                <div className="article-text">
                    {articleText}
                    {/* 當正在接收時,顯示一個閃爍的光標,模擬打字效果 */}
                    {isStreaming && <span className="cursor"></span>}
                </div>
            </div>
            <Progress percent={progress} />
            <Button
                type="primary"
                onClick={startChunkedStream}
                disabled={isStreaming}  // 正在接收時禁用,避免重複點擊
            >
                {isStreaming ? '正在接收...' : '開始接收'}
            </Button>
            
            <Button onClick={cancelStream} disabled={!isStreaming}>
                {isStreaming ? '取消傳輸' : '未在傳輸'}
            </Button>
        </div>
    )
}
A good memory is not as reliable as a written record. Write it down...
user avatar wszgrcy 頭像 54r9rxzy 頭像 moax 頭像 qifengliao_5e7f5b20ee3bd 頭像 assassin 頭像 humi 頭像 congrongdehanbaobao 頭像 zihuai1949 頭像 2021_60e2de6ea45f7 頭像 tuzixiansen_63d4d65909d62 頭像
點贊 10 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.