作者: 互聯網大數據團隊- Chen Rui
本文簡要介紹了特徵拼接在實時推薦中的重要作用,並講述了vivo實時推薦系統中特徵拼接模塊的架構演進過程以及採用現有的“基於RocksDB的大狀態解決方案”的原因,重點敍述了該方案所遇到的一系列問題,包括TM Lost、RocksDB性能調優門檻高、TM初始化慢、狀態遠程存儲HDFS RPC飆高等,並給出了這些問題的現象以及解決方案。
1分鐘看圖掌握核心觀點👇
一、背景
在推薦系統中,樣本拼接是銜接在線服務與算法模型的重要一個環節,主要職責是樣本拼接和業務相關的ETL處理等,模塊位置如下圖紅框所示。
推薦系統通過學習埋點數據來達到個性化精準推薦的目的,因此需要知道服務端推薦下發的內容,是否有一系列的行為(曝光,點擊,播放,點贊,收藏,加購等等),把被推薦內容的埋點數據與當下的特徵拼接起來的過程,一般稱為樣本拼接,一個簡化的流程如下:
推薦的過程可以檢驗概括為以下幾點:
- 後台服務rank 推薦內容給app客户端,同時把內容對應的特徵快照保存起來;
- app接收到內容後,埋點日誌被上報到消息中間件;
- 樣本拼接負責將特徵與埋點日誌拼接起來,定義正負樣本,格式轉換;
- 模型接收樣本訓練,將使用最新的模型做推薦。
為了保證較高的拼接率和穩定性,我們的拼接架構也經過了長時間的迭代,這篇文章我將給大家介紹vivo特徵拼接架構的發展歷程、當前方案、當前方案遇到的問題和解決方案,以及未來的規劃和展望,希望能幫助到業內的同學。
二、拼接方案選型
2.1 小時粒度拼接
小時拼接是將埋點日誌和特徵快照都保存到Hive並以小時分區,每小時調度一個Spark任務來處理兩個表相應分區的數據做拼接,由於是小時拼接,實時性較低,Spark作業本身也依賴於上游Hive表小時分區生成,每個小時末尾的請求埋點有可能是落在當前小時,也有可能落在下個小時。舉個例子:19點50分下發了一個視頻,客户端在19:59分點擊了,但是視頻播放卻是在20點03分完成的,這個時候就會存在拼接不上的問題。
2.2 基於 Redis 的流式拼接
為了提升拼接率,且達到實時拼接,節點故障容災,完備監控等特性,Flink是一個很好的替代方案,也是最近幾年比較主流的實現。最初在實時推薦場景中,Kafka中的特徵快照通過Flink任務寫入到Redis,另一個Flink任務消費曝光埋點數據和點擊埋點數據並讀取存在Redis中的特徵快照數據做拼接,拼接後的數據作為拼接特徵被寫入到下游的Kafka中,提供給後續的算法做模型的訓練,架構圖如下:
經過一段時間實踐,以上的方案出現了兩個痛點:
- Redis中存儲了幾十T的數據,Redis的成本高;
- 業務數據流量會波動,經常需要DBA對Redis集羣進行擴容,涉及大量數據的遷移,運維成本高。
2.3 基於 RocksDB 大狀態流式拼接
為了解決基於Redis的作為中間數據的存儲存在的問題,我們採用Flink狀態來存儲特徵快照,整個架構中不再需要外部的Redis,由於我們需要存儲的數據量達幾十T,這裏我們選用適合大數據量存儲的RocksDB類型的狀態後端,調整後架構更加簡潔,如下圖所示:
流程如下:
- 首先將曝光流點點擊流以及特徵在Flink 任務中做union並做keyby;
- 在processElement方法中如果接收到曝光流就將數據保存到state中,如果接收到曝光流就將數據保存到state中,如果接收到特徵就去state中查詢相應的曝光和點擊數據;
- 如果能找到就發送到下游並將狀態數據清理掉,沒找到就將特徵保存到state中,並註冊一個定時器;
- 定時器觸發時去state中查詢相應的曝光和點擊數據,如果找到就發到下游,並將狀態數據清理掉。
由於RocksDB可以同時利用內存和磁盤來存儲數據,所以對於內存的使用量大幅下降,由於RocksDB是嵌入式的數據庫,每個TM上的RocksDB數據庫只存儲shuffe到該TM上的數據,無需再關注擴縮容的問題。當然隨着數據上漲,Flink流式拼接在實際的生產過程中也遇到了一系列的問題,為了保證業務的可用性,我們花了較長的時間對這些問題進行攻克,目前任務穩定性達到99.99% ,拼接率長期穩定在99%以上,對拼接效果提升較大。下面我將列舉我們遇到的問題和解決方案,希望能夠幫助到業內的其他團隊。
三、問題及解決方案
3.1 TM Lost問題
3.1.1 現象
在方案實施之初,我們發現這些特徵拼接的任務頻繁出現TM was Lost異常導致任務重啓,我們看了日誌,發現都是TM內存超出了YARN的內存限制被kill。
3.1.2 問題分析
那麼我們的疑問就來了,為啥這部分任務的內存很容易超出,超出的那部分內存又是誰在用呢?下面這張圖是來自Flink的官網,因為我們在平台使用Flink的時,我們只設置了總的內存,並沒有關注其他各個局部的內存,那麼這些部位的內存是如何分配的?為了搞清楚這個問題,有必要梳理一下每個模塊內存計算的邏輯。
圖片引用自Flink
Flink內存分配邏輯
一般在YARN上提交的任務是含有taskmanager.memory.process.size 參數的配置的,所以Flink在分配內存時,會以調用deriveProcessSpecWithTotalProcessMemory 方法分配。
通過配置參數獲得meatspace 的大小,通過jobmanager.memory.jvm-overhead.fraction 的比例計算overhead的內存,totalFlinkMemory通過總的進程的內存減去meatspace + overhead的內存得到。
通過配置中的參數獲取 frameworkHeapMemory-
Size、frameworkOffHeapMemorySize 、task-
OffHeapMemorySize 的大小。
通過managedmemory的配置獲取託管內存的值, 通過networkbuffer的配置獲取networkbuffer的值 。totalFlinkMemory 減去所有需要排除的內存,剩下的內存分配給堆。內存分配邏輯,以及每塊內存的設置方法如下圖:
到此TM的各個內存模塊的內存已經劃分完成。有上面的分析我們可以得出以下的結論:
totalProcessMemorySize = totalFlinkMemorySize + JvmMetaspaceSize + JvmOverheadSize
totalFlinkMemorySize = frameworkOffHeapMemorySize + taskOffHeapMemorySize + managedMemorySize + networkMemorySize + frameworkHeapMemorySize + taskHeapMemorySize
這裏重點將一下JVMOverhead,JVMOverhead並沒有具體的作用,是一個預留值,它是一個緩衝區,可以避免在Flink運行在容器中是因為短時時間的內存超出了容器的限制而被kill。
frameworkOffHeapMemorySize和taskOff-
HeapMemorySize 也是預留值,offheap在概念上的主要是指native內存。frameworkHeap-
MemorySize 也是預留值。由此可以看出雖然Flink官方將TM的內存劃分的較細緻,但是像JvmOverheadSize frameworkOffHeap-
MemorySize,taskOffHeapMemorySize,
frameworkHeapMemorySize 都只是邏輯上的預留,並沒有從操作系統層面實現隔離。
RocksDB內存分配邏輯
因為堆內存不足時一般會報out of memory的異常,所以到這一步我們推測應該是堆外內存溢出了,而堆外內存最大的一塊就是RocksDB使用的,而從Flink的官網的介紹可以知道託管內存就是給RocksDB使用的,下面我們再看一下託管內存是如何分配給RocksDB的。
cacheMemory = (1-(1/3)*(writeBufferRatio))* managedMemory
bufferMemory = (2/3)*(writeBufferRatio)* managedMemory
讀寫緩存總內存 = bufferMemory + cacheMemory = (1 +(1/3)*(writeBufferRatio))* managedMemory
由上面的代碼可以看出,managed memory 是通過一定的比例給RocksDB的各個部分來分配內存的,writeBufferRatio會影響讀緩存和寫緩存的大小,理論上讀寫緩存總內存有可能會超過managedMemory的大小。通過上面的公式可以看出讀寫緩存總內存最多超出managedMemory的1/3,這裏很容易想到,那麼我們在排查overhead的時候配置大於managedMemory的1/3不就能你面內存溢出了,但是在實踐中,我們這樣配置並並沒有完全的解決物理內存溢出的問題,下面關於RocksDB內存的資料,終於找到了是還有哪部分內存容易溢出了,是因為部分區域的內存難以限制導致的。
RocksDB 的內存佔用有 4 個部分:
- Block Cache: OS PageCache 之上的一層緩存,緩存未壓縮的數據 Block;
- Indexes and filter blocks: 索引及布隆過濾器,用於優化讀性能;
- MemTable: 類似寫緩存;
- Blocks pinned by Iterator: 觸發 RocksDB 遍歷操作(比如遍歷 RocksDBMapState 的所有 key)時,Iterator 在其生命週期內會阻止其引用到的 Block 和 MemTable 被釋放,導致額外的內存佔用。
前三個區域的內存都是可配置的,但 Iterator 鎖定的資源則要取決於應用業務使用模式,且沒有提供一個硬限制,因此 Flink 在計算 RocksDB StateBackend 內存時沒有將這部分納入考慮,其次是 RocksDB Block Cache 的一個 bug,它會導致 Cache 大小無法嚴格控制,有可能短時間內超出設置的內存容量,相當於軟限制,原來是迭代器的內存限制的不好,導致的內存溢出。
3.1.3 解決方案
我們在使用Flink 的RocksDB狀態後端時,是通過managed memory來控制RocksDB各個部分的內存的,所以managed memory內存越小分配給各個部分的內存也就越小,迭代器內存越不容易溢出。到此我們對Flink的RocksDB狀態後端的內存有了一定的認知:當性能可以滿足的情況下,Flink的Manaed memory應該越小越好。但是上滿形成的經驗很難高效的在業務上落地,原因是“Flink的Manaed memory應該越小越好”很難去確定。
於是我們聯想到了之前的JVMoverhead,在我們的實際實踐中過程中,我們是通過調大JVMoverhead,和jemalloc內存分配器來解決內存溢出問題的。在Flink1.12之後Flink on k8s的內存分配器已經默認改成了jemalloc,可以避免內存的分配過程中出現64M問題。
但是要注意:由於我們的Java版本是JAVA8小版本是192,在最新版本的jemalloc5.3上出現了死鎖的問題,後來我們採用jemalloc4.5 就沒有問題了。據瞭解業界有些公司使用的JAVA8小版本是256採用jemalloc5.3沒有遇到死鎖問題。
3.2 RocksDB 的性能監控問題
3.2.1 現象
Flink RocksDB大狀態的任務經常出現延遲,但是我們很難知道性能的瓶頸在哪塊,從而優化響應的環節。
3.2.2 解決方案
其實Flink提供了一系列對於RocksDB的性能的監控指標,我們只需要加上參數開啓即可,這裏我只結局我覺得最有參考意義的指標開啓的參數:
下面是相關指標的監控頁面:
3.3 任務出現延遲
3.3.1 現象
Flink RocksDB大狀態的任務經常出現延遲,調優參數高達近百個,如何系統性的調優,難度較大。
3.3.2 解決方案
要想對RocksDB的性能做優化,我們有必要先了解一下RocksDB的讀寫流程。
RocksDB的讀流程
- 獲取當前時刻的SuperVersion,SuperVersion是RocksDB內針對於所有SST文件列表以及內存中的MemTable和Immutable MemTable的一個版本;
- 獲取當前的序號來決定當前讀操作依賴的數據快照;
- 嘗試從第一步SuperVersion中引用的MemTable以及Immutable MemTable中獲取對應的值。首先會經過布隆過濾器,假如不存在則一定不存在,反之假如返回存在則不一定存在;
- 嘗試從Block Cache中讀取;
- 嘗試從SST文件中獲取。
RocksDB的寫流程
-
將寫入操作順序寫入WAL日誌中,接下來把數據寫到 MemTable中(採用SkipList結構實現)
MemTable達到一定大小後,將這個 MemTable 切換為不可更改的 immutable MemTable,並新開一個 MemTable 接收新的寫入請求;
- 這個 immutable MemTable進行持久化到磁盤,成為L0 層的 SSTable 文件;
-
每一層的所有文件總大小是有限制的,每下一層大十倍。一旦某一層的總大小超過閾值了,就選擇一個文件和下一層的文件合併。
注意: 所有下一層被影響到的文件都會參與 Compaction。合併之後,保證 L1 到 L6 層的每一層的數據都是在 key 上全局有序的,而 L0 層是可以有重疊的,寫流程的約束;
- 日誌文件用於崩潰恢復;
- 每個MemTable及SST文件中的Key都是有序的(字符順序的升序);
- 日誌文件中的Key是無序的;
- 刪除操作是標記刪除,是插入操作的一種,真正的刪除要在Compaction的時候實現;
- 無更新實現,記錄更新通過插入一條新記錄實現;
當任務出現延遲時,由於我們已經有了RocksDB性能指標的監控也瞭解RocksDB的原理,我們在做性能優化時就可以對症下藥了。
讀性能優化
當任務出現延遲且塊緩存命中率下降時,説明是讀的性能下降導致延遲,我們可以通過提升緩存命中率的方式來提升讀性能,RocksDB任務緩存命中率的優化思路如下:
- 託管內存小於TM內存20%,可以調大託管內存:state.backend.rocksdb.memory.managed 到 20%;
- Flink內部對RocksDB的優化已經沉澱了多組參數,建議使用配置:
state.backend.rocksdb.predefined-options =
SPINNING\_DISK\_OPTIMIZED\_HIGH\_MEM;
- Flink中使用state.backend.rocksdb.memory
.write-buffer-ratio參數來管理寫緩存,調小該參數,能夠提升讀緩存,該參數默認0.5;
- RocksDB 會有一寫索引和過濾器放在內存中,用這個參數開啓:state.backend.rocksdb
.memory.partitioned-index-filters 默認 false,並且可以調節索引和過濾器佔用的內存比例,參數是:state.backend.rocksdb.memory
.high-prio-pool-ratio默認為0.1。
寫性能優化
當任務延遲,如果出現等待flush的內存表的大小增加,或者等待合併的個數增加,因為等帶flush個數達到一定的個數時寫將會被阻塞,可以先關注一下磁盤io是否打滿,如果已經處於高位,建議提升任務的併發。如果此磁盤io處於低位,我們可以調整flush和compation的線程數來使寫的數據不再積壓。提升寫寫性能。Flink會將flush和compation的線程數通過一個參數統一管理,參數是:state.backend
.rocksdb.thread.num,默認值是1。
3.4 任務啓動慢的問題
3.4.1 現象
由於Flink任務在從狀態啓動時需要將存儲在遠程HDFS的狀態文件讀到本地,當TM較集中時單台機器的磁盤io很容易被打滿,導致某些sub task 長時間處於INITIALIZING的狀態。
3.4.2 解決方案
YARN參數的優化
YARN默認的yarn.scheduler.fair.assignmultiple參數為flase,即一次只分配一個container,但是CDH將這個參數設置成了true,yarn.scheduler.fair.max.assigr默認為-1,表示不限制,所以導致一次調度到單個節點上的container較多。我們的解決方案是將YARN配置中的yarn.scheduler.fair.assignmultiple參數設為false,一次只調度一個container,解決了TM分配較集中的問題。
Flink調度策略的優化
由於只是限制了每次分配TM的個數,還不能完全避免分配集中的問題,於是我們對Flink引擎內部做了優化,可以硬限制在某台機器上調度TM的個數,具體做法是,是當YARN返回給Flink ResourceManager container信息時,判斷container是否符合要求,如果不符合可以部分拒收,再次申請資源,該功能由參數開啓。
3.5 磁盤打滿的問題
3.5.1 現象
由於我們實時集羣的磁盤較小,大狀態任務的狀態達幾十上百T,頻繁出現磁盤使用率達到90%的告警。
3.5.2 解決方案
我們將大狀態的任務的Checkpoint數據存儲到磁盤資源較寬裕的離線的集羣,非大狀態的任務的Checkpoint數據存儲在實時集羣。
3.6 HDFS RPC 飆高問題
3.6.1 現象
在業務新上一批任務後,我們發現離線集羣HDFS的RPC有明顯的增加。
3.6.2 解決方案
由於我們默認只會保存最近的3個Checkpoint,所以對於增量Checkpoint而言,肯定會有文件的修改和刪除,據瞭解修改和刪除是對HDFS性能影響較大的操作。我們對比這一批任務任務在HDFS上的Checkpoint文件和之前的任務對比發現,文件數量大很多,但是每個文件小很多,於是我們調整了參數:state.backend.rocksdb.compaction.level.target-file-size-base參數為256MB,這個參數默認是64MB,參數的作用控制壓縮後的文件的大小。配置改參數後RPC迴歸正常。
效果如圖:
四、總結
4.1 遺留問題
4.1.1.RocksDB的調優的門檻較高
雖然我們在任務上使用了積累通用經驗進行優化,但是有些數據量較大的任務在流量高峯期依然容易出現延遲,RocksDB的參數有幾十個,要想把性能調優做到比較極致需要深入瞭解其原理,還有對業務特點有深入的瞭解,對於應用開發而言,門檻較高。
4.1.2.任務恢復慢
由於有些任務的狀態高達幾十T,在重啓任務或者異常重啓時要從Checkpoint恢復,需要從遠程的HDFS下載狀態到本地磁盤,單機的io很容易被打滿,雖然我們做了TM打散,但是有些單個TM恢復狀態就需要幾十分鐘,這對於特徵拼接任務來講是不可接受的。
4.1.3.SSD壽命消耗加速
我們的實時集羣磁盤使用的是單塊的SSD,SSD壽命是有限的,然而RcoksDB的寫放大的特點加速了SSD的壽命的消耗。
4.2 規劃
經過較長時間的實踐我們理解了樣本拼接的本質是將不同來源、不同更新頻率、不同規模的特徵(如基礎特徵、實時埋點特徵、歷史特徵)組合成完整樣本,而單一組件往往在 “延遲、存儲規模、更新頻率” 等維度存在短板,必須通過混合架構實現 “優勢互補”。
業界混合架構的案例
組件分工
拼接流程
① 實時日誌採集:用户點擊商品的日誌通過Kafka接入Flink實時作業;
② 實時數據存儲:將曝光流的數據存到RocksDB和HBase中,RocksDB的TTL設置成1小時;
③ 算子內實時拼接:Flink算子從RocksDB讀取用户最近1小時埋點特徵,從HBase讀取基礎特徵,初步拼接成“實時+基礎”特徵;
④ 歷史特徵融合:Flink作業將初步拼接結果寫入Paimon,與Paimon中存儲的“7天曆史特徵”融合,生成完整樣本;
⑤ 樣本分發:
- 實時推薦:完整樣本通過Flink寫入到HDFS提供給在線訓練服務使用;
- 離線訓練:Spark作業從Paimon讀取全量完整樣本,用於推薦模型的離線迭代。
下面是一個調用時序圖:
核心價值
- 低延遲:RocksDB 支撐算子內毫秒級拼接,滿足實時推薦的 “秒級響應” 需求;
- 大規模:HBase+Paimon 可支撐億級用户的PB級特徵存儲;
- 流批協同:同一套樣本既供實時推薦,又供離線訓練,實現流批架構統一;
- 易於擴展:Paimon動態列支持特徵迭代。
4.3 展望
近幾年大數據架構已經從計算-存儲緊密耦合的Map-Reduce時代,進入到了以Kubernetes容器化部署為標準的雲原生世界。未來Flink將引入基於遠程存儲的存算分離狀態管理架構,新架構主要為了解決以下問題:
- 容器化環境下計算節點受本地磁盤大小限制的問題;
- 由於RocksDB中LSM結構的週期性 Compaction 導致計算資源尖峯的問題;
- 大規模狀態快速擴縮容的挑戰。
我們也將持續關注Flink社區的發展,嘗試採用遠程存儲狀態後端來做為特徵拼接的解決方案。