1. 前言
5 月的時候,React 的核心開發者 Dan 發表了一篇名為《Progressive JSON》 的文章,介紹了一種將 JSON 數據從服務器流式傳輸到客户端的技術,允許客户端在接收到全部數據之前就開始渲染部分數據。
這可以顯著提升用户體驗,尤其是處理大型數據集時。
讓我們以“獲取用户文章”這個場景為例。
這是一個完整的數據結構:
{
"user": {
"id": 1,
"name": "John Doe",
"posts": [
{ "id": 101, "title": "First Post", "content": "..." },
{ "id": 102, "title": "Second Post", "content": "..." }
]
}
}
假設我們能夠很快獲取用户信息,但文章數據還需要一段時間從數據庫獲取。
與其等待數據完全加載完畢,不如先發送一個佔位符表示文章字段:
{
"user": {
"id": 1,
"name": "John Doe",
"posts": "_$1"
}
}
客户端收到數據後,先將用户信息渲染出來。
然後,當文章數據準備完畢後,我們將文章數據作為一個單獨的 chunk 發送:
{
"_$1": [
{ "id": 101, "title": "First Post", "content": "..." },
{ "id": 102, "title": "Second Post", "content": "..." }
]
}
客户端收到數據後,最後將文章數據渲染出來。
要實現這樣一個功能,客户端需要具備處理這些佔位符的能力,並在最終數據到達時替換為實際數據。
如果要實現這樣一個單獨的功能需要多少代碼呢?
200 行就可以!
本篇文章和大家介紹下實現思路,供大家學習和思考使用。
2. 服務端實現
讓我們來看下服務器端實現。
首先是服務端函數。
function serve(res, data) {
res.setHeader("Content-Type", "application/x-ndjson; charset=utf-8");
res.setHeader("Transfer-Encoding", "chunked");
// 向客户端發送 chunks
res.write(JSON.stringify(...) + "\n");
res.write(JSON.stringify(...) + "\n");
// 當完成的時候
res.end();
}
這裏有 2 點值得注意:
- 我們使用了
application/x-ndjson內容類型。
NDJSON,全拼 Newline Delimited JSON,其實就是一種換行符分割的 JSON,其中每一行都是一個有效的 JSON 對象。這允許我們在單個響應中發送多個 JSON 對象,並以換行符分隔。
- 我們使用了
Transfer-Encoding: chunked響應頭。
使用該響應頭,可以通知客户端,響應將分塊發送。在調用 res.end()之前,請保持連接活躍狀態。
其次,我們需要對數據進行分塊。
實現方式也很簡單,遍歷數據對象,並用佔位符替代那些暫時沒有準備好的部分。
當遇到需要稍後發送的部分(一個 Promise)時,我們將其存儲到隊列中,並在準備就緒後,將其作為單獨的數據塊發送。
函數如下:
function normalize(value) {
function walk(node) {
if (isPromise(node)) {
const id = getId();
registerPromise(node, id);
return id;
}
if (Array.isArray(node)) {
return node.map((item) => walk(item));
}
if (node && typeof node === "object") {
const out = {};
for (const [key, val] of Object.entries(node)) {
out[key] = walk(val);
}
return out;
}
return node;
}
return walk(value);
}
函數遞歸遍歷數據對象。
當遇到 Promise 時,它會生成一個唯一的佔位符 ID,註冊該 Promise 以便稍後解析,並返回該佔位符。
對於數組和對象,它會遞歸處理它們的元素或屬性。原始值將按原樣返回。
這是註冊 Promise 的代碼:
let promises = [];
function registerPromise(promise, id) {
promises.push({ promise, id });
promise.then((value) => {
send(id, value);
}).catch((err) => {
console.error("Error resolving promise for path", err);
send(id, { error: "promise error", timeoutMs: TIMEOUT });
});
這是 send 的代碼,send函數負責將解析後的數據發送給客户端:
function send(id, value) {
res.write(JSON.stringify({ i: id, c: normalize(value) }) + "\n");
promises = promises.filter((p) => p.id !== id);
if (promises.length === 0) res.end();
}
該 send 函數會向響應中寫入一個新的數據塊,其中包括佔位符 ID 和 normalize 後的值。然後它會從隊列中移除已經 resolve 的 Promise。如果沒有其他要處理的 Promise,它就會結束響應,從而關閉與客户端的連接。
完整的實現代碼點擊這裏。
最後,我們舉一個從服務端發送的對象示例:
const data = {
user: {
id: 1,
name: "John Doe",
posts: fetchPostsFromDatabase(), // 返回一個 promise
},
};
async function fetchPostsFromDatabase() {
const posts = await database.query("SELECT * FROM posts WHERE userId = 1");
return posts.map((post) => ({
id: post.id,
title: post.title,
content: post.content,
comments: fetchCommentsForPost(post.id), // 返回一個 promise
}));
}
每篇文章還有一個評論字段(comments),該字段是一個 Promise 對象。意味着評論數據將在文章數據發送後,作為單獨的片段發送。
3. 客户端實現
那客户端該如何實現呢?
在客户端,我們處理傳入的數據塊,並將佔位符替換為實際數據。
我們可以使用 Fetch API 向服務器發送請求,並將響應讀取為流。每當遇到佔位符時,我們都會將其替換為一個 Promise,該 Promise 將在實際數據到達時解析。
核心邏輯如下:
try {
const res = await fetch(endpoint);
const reader = res.body.getReader();
const decoder = new TextDecoder();
async function process() {
let done = false;
while (!done) {
const { value, done: readerDone } = await reader.read();
done = readerDone;
if (value) {
try {
const chunk = JSON.parse(decoder.decode(value, { stream: true }));
chunk.c = walk(chunk.c);
if (promises.has(chunk.i)) {
promises.get(chunk.i)(chunk.c);
promises.delete(chunk.i);
}
} catch (e) {
console.error(`Error parsing chunk.`, e);
}
}
}
}
process();
} catch (e) {
console.error(e);
throw new Error(`Failed to fetch data from Streamson endpoint ${endpoint}`);
}
}
對流的處理,你可能感到陌生,可以拓展閲讀我的這篇文章:《如何用 Next.js v14 實現一個 Streaming 接口?》
process 函數逐塊讀取響應流。每個數據塊都被解析為 JSON,並調用 walk 函數將佔位符替換為 Promise。
如果數據塊包含先前註冊的佔位符 ID ,則相應的 Promise 會被解析為接收到的數據。關鍵在於 await reader.read(),它允許我們等待新數據到來。
walk函數用於將佔位符替換為 Promise:
function walk(node) {
if (isPromisePlaceholder(node)) {
return new Promise((done) => {
promises.set(node, done);
});
}
if (Array.isArray(node)) {
return node.map((item) => walk(item));
}
if (node && typeof node === "object") {
const out = {};
for (const [key, val] of Object.entries(node)) {
out[key] = walk(val);
}
return out;
}
return node;
}
function isPromisePlaceholder(val) {
return typeof val === "string" && val.match(/^_\$(\d)/);
}
類似於服務端的 normalize 函數。當遇到佔位符的時候,它會返回一個新的 Promise,該 Promise 將在實際數據到達時解析。對於數組和對象,它會遞歸處理它們的元素或屬性。原始值則直接返回。當然,ID 必須與服務器端生成的 ID 匹配。
完整的實現代碼點擊這裏。兩個文件加起來一共 155 行代碼。
4. NPM 包
本篇文章整理翻譯自 Streaming JSON in just 200 lines of JavaScript。
作者還將代碼整理成了一個 NPM 包:Streamson。
通過 npm 安裝:npm intall streamson
服務端上使用:
import { serve } from "streamson";
import express from "express";
const app = express();
const port = 5009;
app.get("/data", async (req, res) => {
const myData = {
title: "My Blog",
description: "A simple blog example using Streamson",
posts: getBlogPosts(), // this returns a Promise
};
serve(res, myData);
});
app.listen(port, () => {
console.log(`Example app listening on port ${port}`);
});
客户端是一個 1KB 的 JavaScript 文件,地址:https://unpkg.com/streamson@latest/dist/streamson.min.js
客户端使用如下:
const request = Streamson("/data");
const data = await request.get();
console.log(data.title); // "My Blog"
const posts = await request.get("posts");
console.log(posts); // Array of blog posts
5. 最後
作為準前端開發專家的你,第一時間獲取前端資訊、技術乾貨、AI 課程,那不得關注下我的公眾號「冴羽」。
流式傳輸 JSON 數據是一種提升 Web 應用感知性能的有效方法,尤其適用於處理大型數據集或動態生成數據。
通過在數據可用時立即發送部分數據,我們可以讓客户端更早地開始渲染內容,從而帶來更佳的用户體驗。