Stories

Detail Return Return

RxJS 全面解析 - Stories Detail

又名:RxJS “道” 與 “術”

前言

打開此文的小夥伴想必對 RxJS 已經有了或多或少的瞭解,如果沒有倒也無妨,因為下面會從零開始講起;如果你帶着幾個問題來翻閲,本人也希望此文可以帶你找到答案。
温馨提示:文章內容較長,建議收藏反覆觀看。

概覽

從我個人的學習 RxJS 的歷程來看,最開始是“照貓畫虎”能夠基本使用,隨後是研究部分操作符和使用場景,最後瞭解產生背景、設計思想以及實現原理。在這期間有過很多疑問,也曾從不同角度理解 RxJS,最終總結了認為比較系統的知識圖譜(下圖)。

image.png

深入理解 RxJS

大“道”——響應式編程

全面理解一個事物,追溯其歷史是一種好的方式,RxJS 的起源需要追溯到響應式編程(RP),後續產生了一系列基於響應式編程範式的語言擴展(Rx,RxJS 就是其中之一),請看歷史簡譜(左向右延續)。

image (1).png

何為響應式

響應式是學習 RxJS 必須要理解的概念,本人用了大量的文字來解釋,如果您已經深刻理解,可直接跳過。如果您是第一次接觸這個名詞,也不要先給自己心裏暗示它有多麼的高深和難以理解,也許你天天在使用。
一個例子

為了避免上來就接觸晦澀的概念,先來舉個例子:博客平台關注功能。話説你偶然瀏覽到阿杰的文章,覺得寫的很贊,於是你關注了他的博客賬號,以便不會錯過之後的乾貨,在以後的日子裏阿杰每發佈一篇文章博客平台都會給你推送一條消息,提醒你來給他點點贊,假設博客平台沒有關注的功能,那麼你需要想知道他的最新動態就只能打開他的個人主頁查看文章列表來確認,也許稍不留意就會錯過他的文章。這個例子出現了粉絲關注博主、博主發佈博客、平台自動推送給粉絲消息、給文章點贊,這就形成了響應式閉環,平台在觀察到博主粉絲只需要關注一下就能收到博主以後的動態,這就是響應式帶來的好處。

另一個例子

再舉一個貼近我們開發的例子:假設有一個更新某用户密碼的需求,A 同事負責調用更新邏輯並在更新後執行其他任務(比如提醒用户更新成功或失敗),B 同事負責具體更新密碼的邏輯,下圖描述了完成整個任務的流程:

image (2).png
實際情況更新邏輯比較複雜,有以下邏輯:

  1. 驗證一下用户信息的真實性
  2. 驗證密碼是否合法
  3. 最終把新的密碼入庫

上述的每個環節都有可能是異步耗時任務,比如用户的真實性是第三方平台驗證的,入庫的過程中網絡非常慢,再比如......等等,諸如此類的各種不確定性,這對於 B 同事做後續任務就有了一個關鍵性條件,確定/等待更新結果,這種情況有一種做法是:定期輪詢重試,B 每隔一段時間執行一次,直到確定 A 已經修改成功,再去執行後續操作。邏輯中定時 A 邏輯結束這種做法這種做法明顯有一個弊端是執行多次,對於 B 顯然不是好的做法,好的做法是:B 的更新邏輯執行完後通知 A,甚至 B 可以先把更新後的事準備好,讓 A 決定後續邏輯的執行時機。

image (3).png
流程如圖示:訂閲/執行更新邏輯、更新邏輯結束、將結果通知調用者、執行後續邏輯。這就是響應式的做法,它帶來的好處是:當更新結果發生變化時自動通知調用者,而不用輪詢重試

瞭解響應式宣言

相信你已經明白了響應式,並能發現生活/工作中到處可見,下面瞭解一下設計響應式模塊/系統遵循的原則:

  • 即時響應性:只要有可能,就要及時地做出響應。
  • 回彈性:執行過程中在出現失敗時依然保持即時響應性。
  • 彈性: 在不斷變化的工作負載之下依然保持即時響應性。
  • 消息驅動:反應式依賴異步的消息傳遞,從而確保了鬆耦合、隔離、位置透明的組件之間有着明確邊界。

image (4).png

響應式編程

下面我們正式的介紹響應式編程:
響應式編程,Reactive Programing,又稱反應式編程,簡稱 RP,是一種以傳播數據流(數據流概念戳 這裏  )的編程範式。

響應式編程反應式編程(英語:Reactive programming)是一種面向數據流和變化傳播的聲明式編程範式。這意味着可以在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值通過數據流進行傳播。 —— 維基百科

優勢:

  1. 聲明式,方便地表達靜態或動態的數據流
  2. 自動化,自動將變化的值通過數據流進行傳播

核心思想: 從傳統的調用方“拉”數據的思維模式轉變為被調用方“推”數據的思維模式。

JS 異步編程史

眾所周知,JS 執行環境是單線程的,在事件監聽,異步的處理,響應式編程毋庸置疑是其中的一大主力。
Callback 時代

回調函數延續至今,JS 運用高階函數巧妙地將異步後的邏輯進行託管,以事件驅動的方式來解決異步編程,但它有一個“臭名昭著”的問題:回調嵌套,耦合度高。本來很簡單的邏輯但為了控制執行流程卻不得不寫大量的代碼,當時產生了一些知名的庫:async、bluebrid,它們封裝和處理了嵌套問題,暴露出更為簡單好用的 API,額外還可以優雅地處理流程控制相關場景,但所做的只是劃分了邏輯,依舊沒有解決代碼臃腫的問題。

Promise 時代

ES6 納入 Promise 之後可謂一大喜訊,因為它解決了回調嵌套的問題,雖然它只是回調的語法糖,但在處理流程和捕獲錯誤(外層處理)已經非常的優雅了,但它的弊端是:無法監聽和打斷 Promise 的狀態。這意味着一旦聲明它會立即執行並修改它的執行狀態,這源於它的實現。

Generator

Generator 是處於 Promise 和 Async/await 之間的產物,它給我們帶來了寫異步語法像寫同步一般,只需在函數前加 * 修飾,這樣就可以在函數內部使用一個  yield 關鍵字返回結果,類似於  await ,但它也並非完美,不然也不會有後面的 Async/await 了,它的主要問題是流程管理不方便(迭代器模式實現,主動調 next 執行器流轉游標)

Async/await

Async/await 是 Generator 語法糖,既保留了語法上的優勢,也解決了 Generator 每步要調一下 next 執行器的弊端,是現階段的最佳方案,就是得吐槽一下 Top-level await 到 ES2022 才出現。

其中 Generator 和 Async/await 在異步編程是以等待的方式處理。

ReactiveX

業界一致認為正統的響應式實現/擴展是 ReactiveX 系列。

ReactiveX,簡稱 Rx,是基於響應式的擴展,是各種語言實現的一個統稱,除了我們所知道的 RxJS,還有 RxJava、Rx.NET、RxKotlin、RxPHP.....它最早是由微軟提出並引入到 .NET 平台中,隨後 ES6 也引入了類似的技術。

它擴展了觀察者模式,以支持數據序列和/或事件,並添加了操作符,允許您以聲明的方式將序列組合在一起,同時抽象出諸如低級線程、同步、線程安全、併發數據結構和非阻塞I/O等問題

RxJS

RxJS 全稱 Reactive Extensions for JavaScript,翻譯過來是 Javascript 的響應式擴展,它是一個採用流來處理異步和事件的工具庫,簡單來説 Rx(JS) = Observables + Operator + Scheduler

擅長做的事
  • UI 事件:例如鼠標移動、按鈕單擊......
  • 狀態管理:例如屬性更改、集合更新等事件
  • IO 消息事件:服務監聽
  • 廣播/通知:消息總線(Event bus)
  • 網絡消息/事件:例如 HTTP、WebSockets API 或其他低延遲中間件

最大的優勢:異步事件的抽象,這意味着可以把很多事統一起來當作一種方式處理,從而讓問題變得更簡單,同時也降低了學習成本

注意:RxJS 擅長做異步的事,不代表不可以做同步或不擅長同步的事。

RxJS 在 Angular 中的應用

RxJS 在 Angular 中及其重要,很多核心模塊都是由 RxJS 實現的,比如:

  • 響應式表單
  • 路由
  • HttpClient(封裝的 ajax,類似於 axios)
  • async 管道符
  • 狀態管理

更多: https://angular.io/guide/observables-in-angular

RxJS 核心概念—— Observables

image (5).png
RxJS 中的 Observables 系列是圍繞觀察者模式來實現的,基本角色:

  1. Observable:被觀察者,用來產生消息/數據。
  2. Observer:觀察者,用來消費消息/數據。

    Observable

    Observeable 是觀察者模式中的被觀察者,它維護一段執行函數,提供了惰性執行的能力(subscribe)。

    核心函數

  3. constructor(_subscribe) : 創建 Observeable
  4. static create(_subscribe):靜態函數創建 Observeable
  5. pipe():管道
  6. subscribe():執行初始化傳入的 _subscribe

    RxJS 中 Observeable 是一等公民,將一切問題都轉化為 Observable 去處理。轉換的操作符有  from 、 fromEvent 、 of 、 timer 等等,更多戳 這裏。 

注意的是:只有  ObservableInput  或  SubscribableOrPromise  類型的值才可以轉化為 Observable。

基本使用

image (6).png

源碼實現

本人寫(抽取)了一套 RxJS Observable 源碼中的核心實現

image (7).png

Observable 與 Promise

用過兩者的同學可能會有疑問為什麼採用 Observable 而不直接用 Promise 或 Async/await,這兩者在業界也常常用來做對比。

它們關鍵性的不同點:
截屏2022-11-15 18.10.55.png
總的來説,Promise 可讀性更優,Observable 從使用場景更為全面

兩者的相互轉換

在既使用了 RxJS 又引用了用 Promise 封裝的庫時,兩者相互轉換是容易碰到的問題,RxJS 提供了兩者轉換的函數。

Promise 轉 Observable

from 或 fromPromise(棄用) 操作符

const observable$ = from(fetch('http://xxx.com/'));
Observable 轉 Promise
const promise = of(42).toPromise();
const errorPromise = throw(new Error('woops')).toPromise();
errorPromise.catch(err=> console.error);

Subscriber/Observer

Subscriber/Observer 是觀察者模式中的觀察者/消費者,它用來消費/執行 Observable 創建的函數。

核心能力

  1.  next (傳值)
  2.  error (錯誤處理)
  3.  complete (完成/終止)

image (8).png

實現

image (9).png

image (10).png
白話描述:

  1. 將 subscribe 傳進去一個 next 函數賦給 Observer 的 next 函數。
  2. 將 Observer 傳給 Observable 初始化的預加載函數 _subscribe。
  3. 執行 Observable 初始化的預加載函數

    工作流程

    image (11).png

    Subscription

    上面的 Observable 和 Observer 已經完成了觀察者模式的核心能力,但是引發的一個問題是,每次執行一個流創建一個 Observable,這可能會創建多個對象(尤其是大量使用操作符時,會創建多個 Observable 對象,這個我們後面再説),此時需要外部去銷燬此對象,不然會造成內存泄露。

為了解決這個問題,所以產生了一個 Subscription 的對象,Subscription 是表示可清理資源的對象,它是由 Observable 執行之後產生的。

核心能力

  1.  unsubcribe (取消訂閲)
  2.  add (分組或在取消訂閲之前插入一段邏輯)

image (12).png

注意:調用 unsubcribe 後(包含 add 傳入的其它 Subscription)不會再接收到它們的數據。

使用

image (13).png

實現

image (14).png

image (15).png
白話描述:

  1. 調用 Observable 的 subscribe 後會添加(add 方法)到 Subscription(這裏有個關係 Subscriber 繼承了 Subscription) 中,並把 Subscriber(也是 Subscription)返出去。
  2. 調用 Subscription 的 unsubscribe 方法。
  3. unsubscribe 把該對象置空回收。

    完整工作流程

    image (16).png

    Subject

    上述的 Observable 歸根到底就是一個惰性執行的過程,當遇到以下兩種情況就顯得偏弱:

1. 推送多條數據時,需要就要創建多個對象。

2. 做狀態管理或消息通訊,監聽數據變化並實時推送。

基於這兩個方面,所以產生了 Subject,Subject 是一個特殊的 Observable,更像一個 EventEmitter,它既可以是被觀察者/生產者也可以是觀察者/消費者。

優勢

  1. 減少開銷和提高性能
  2. 數據實時推送

    場景

    消息傳遞或廣播。

    與 Observable 的區別

    截屏2022-11-15 18.22.41.png
    重點解釋一下消費策略和消費時機兩塊:

冷數據流:可以訂閲任意時間的數據流。

熱數據流:只給已訂閲的消費者發送消息,定閲之前的消費者,不會收到消息。

用一個示例來演示:

image (17).png

工作原理

image (18).png

PS:忘記了該圖出自哪篇文章,畫的挺不錯的,這裏直接引用了,如有侵權,還望聯繫作者。

源碼實現

  • observers 訂閲者集合
  • _subscribe 添加訂閲者
  • next 函數將所有訂閲者推送相同的數據
    image (19).png

    其他 Subject

    截屏2022-11-15 18.25.34.png

    操作符(Operator)

    由於篇幅問題,本節並不會細化到講每個操作符

    理解操作符

    Operator 本質上是一個純函數 (pure function),它接收一個 Observable 作為輸入,並生成一個新的 Observable 作為輸出

export interface Operator<T, R> {
  call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
// 等價於
function Operator(subscriber: Subscriber<R>, source: any){}

遵循的小道

迭代器模式和集合的函數式編程模式以及管道思想(pipeable)

函數式編程

操作符的實現以及使用均依照函數式的編程範式,Functional Programing,簡稱 FP,函數式編程範式,它的思維就是一切用函數表達和解決問題,避免用命令式。

優點:

  • 鏈式調用/組合開發
  • 簡單易寫易讀(聲明式)
  • 可靠性(純函數不存在依賴)
  • 惰性求值(高階函數)
  • 易於測試

更多詳細看這篇 不完全指南🧭

pipe

管道,用來承載數據流的容器,相信大家一定用過 Lodash 的chain,原生 js 數組,NodeJS 開發者 也許還知道 async/bluebird 的 waterfall,Mongodb 的 pipe,它們都遵循管道思想,最直接的好處是鏈式調用,還可以用來劃分邏輯,在異步的場景中還可以做流程控制(串行、並行、競速等等)。

為什麼要有操作符?

遵循符合響應式宣言,單向線性的通訊或傳輸數據,pipe 可以降低耦合度以便於閲讀和維護,把複雜的問題分解成多個簡單的問題,最後再組合起來。

操作符與數據流

在 RxJS 的世界解決問題的方式是抽象為數據流,整個閉環是圍繞數據流進行的,所以我們再來理解一下數據流:流,可以把數據可以想像成現實中的水流,河流,流有上游、下游每個階段處理不同的事情,在這過程避免不了要操作流,比如合併、流程控制、頻率控制等等,所以操作符就扮演了此角色。

生命週期:創建流(create、new、創建類操作符)——> 執行流(subscribe) ——> 銷燬流(unsubscribe)

分類

image (20).png

工作原理

迭代器模式:當多個操作符時,組合成多個可迭代對象的集合,執行時依次調用 next 函數。
image (21).png

源碼實現

  1. 操作符傳入 pipe
  2. pipe 將操作符轉換成可迭代的 Array
  3. subscribe(執行流)時消費操作符邏輯

如圖
image (22).png
操作符轉換 Array 源碼

export function pipeFromArray(fns: Array<Function>): Function {
    if (fns.length === 0) {
        return (x: any) => x;
    }

    if (fns.length === 1) {
        return fns[0];
    }

    return (input: any) => {
        return fns.reduce((prev: any, fn: Function) => fn(prev), input);
    };
}

創建自定義操作符

方式一

const isEven = () => {
    return (source: Observable<any>) => {
        return new Observable<any>(observer => {
            const subscription = source.subscribe((x) => {
                observer.next(x % 2 === 0);
                observer.complete();
            })
            return () => subscription.unsubscribe();
        })
    }
}
new Observable(observer => {
    observer.next(7);
})
    .pipe(isEven())
    .subscribe(console.log);
// 執行結果:false

方式二:基於 lift

const odd = () => {
    const operator: Operator<any, any> = {
        call(subscriber: Subscriber<any>, source: any) {
            const subscription = source.subscribe((x: any) => subscriber.next(x % 2 !== 0));
            return () => {
                subscription.unsubscribe();
            };
        },
    }
    return operator;
}

new Observable(observer => {
    observer.next(7);
})
    .lift(odd())
    .subscribe(console.log)
// 執行結果 true
lift 源碼

image (23).png

閲讀彈珠/大理石圖

學會閲讀彈珠圖是快速理解 Rx 操作符的手段之一,有些操作符需要描述時間流逝以及序列,所以彈珠圖有很多的標識和符號,如下圖。

image (24).png
這裏有幾個用來理解大理石圖的網站:

  •  https://rxviz.com/ 
  •  https://rxmarbles.com/

    學習參考

  • Async.js
  • Lodash

    調度器(Scheduler)

何為調度器

也許你在使用操作符的過程中從未在意過它,但它在 Rx 起着至關重要的作用,在異步中如何調度異步任務是很複雜的事情(尤其是以線程為核心處理異步任務的語言),很慶幸的是我們用使用的 JS ,所以不需要過多的關注線程問題,更友好的是大多數操作符默認幫開發者選中了合適的調度模式(下文會講到),以至於我們從忽略了它,但無論如何我們都應該對調度器有基本的瞭解。

調度器, Scheduler  用來控制數據推送節奏的,RxJS 有自己的基準時鐘和一套的執行規則,來安排多個任務/數據該如何執行。

官方定義:

  • Scheduler 是一種數據結構
  • Scheduler 是一個執行環境
  • Scheduler 是一個虛擬時鐘

    種類/模式

    截屏2022-11-15 18.39.24.png

    示例

下面我們舉例略窺一下各個模式的表現。

null/undefined/sync

import { asapScheduler, asyncScheduler, from } from 'rxjs';
function syncSchedulerMain() {
    console.log('before');
    from([1, 2, 3]).subscribe(console.log)
    console.log('after');
}
syncSchedulerMain();
// 執行結果:
// before
// 1
// 2
// 3
// after

asap

function asyncSchedulerMain() {
    console.log('asyncScheduler: before');
    from([1, 2], asyncScheduler).subscribe(console.log)
    Promise.resolve('asyncScheduler: promise').then(console.log);
    console.log('asyncScheduler: after');
}
// 執行結果:
// asapScheduler: before
// asapScheduler: after
// 1
// 2
// asapScheduler: promise

async

function asapSchedulerMain() {
    console.log('asapScheduler: before');
    from([1, 2, 3], asapScheduler).subscribe(console.log)
    Promise.resolve('asapScheduler: promise').then(console.log);
    console.log('asapScheduler: after');
}
// 執行結果:
// asyncScheduler: before
// asyncScheduler: after
// asyncScheduler: promise
// 1
// 2

結果示,from 數據輸出順序是在 console.log(同步代碼)和 Promise.then 之後的

工作原理

Scheduler 工作原理可以類比 JS 中的調用棧和事件循環,從實現上  aspa 和  async 也的確交給事件循環來處理。 null /undefined 相當於調用棧, aspa 相當於事件循環中的微任務, async 相當於宏任務,可以肯定的是微任務執行時機的優先級比宏任務要高,所以從執行時機來看 null > aspa > async。 queue 運行模式根據 delay 的參數來決定,如果是 0,那麼就用同步的方式執行,如果大於 0,就以 async 模式執行。

image (25).png

使用原則/策略

RxJS Scheduler 的原則是:儘量減少併發運行。

  1. 對於返回有限和少量消息的 observable 的操作符,RxJS 不使用調度器,即  null  或  undefined  。
  2. 對於返回潛在大量的或無限數量的消息的操作符,使用  queue  調度器。
  3. 對於使用定時器的操作符,使用  aysnc  調度器

    支持調度器的操作符

 of 、 from 、 timer 、 interval 、 concat 、 merge 、 combineLatest ,更多戳 這裏。 
 bufferTime 、 debounceTime 、 delay 、 auditTime 、 sampleTime 、 throttleTime 、 timeInterval 、 timeout 、 timeoutWith 、 windowTime  這樣時間相關的操作符全部接收調度器作為最後的參數,並且默認的操作是在  Scheduler.async  調度器上。
OK,關於調度器我們先了解到這裏。

最後

至此,RxJS 內容已經講解完畢,文中概念較多,若大家都能夠理解,就可以對 RxJS 的認知拉到同一個維度,後續需要做的就是玩轉各種操作符,解決實際問題,學以致用才可達到真正的精通。

最後如果覺得文章不錯,點個贊再走吧!

附文中完整代碼與示例: https://github.com/aaaaaajie/simple-rxjs

推薦閲讀

  •  玩轉 RxJS 操作符 ——流程控制篇 
  •  玩轉 RxJS 操作符——回壓控制篇

    參考

  •  反應式宣言 
  •  RxJS 中文文檔 
  •  Reactive X 文檔 
  •  RxJS 入門指南 
  •  RxJS 給你絲滑般的編程體驗 
  •  Observable vs Subject 
  • 《RxJS 深入淺出》——程墨

Add a new Comments

Some HTML is okay.