博客 / 詳情

返回

結項報告完整版:Apache SeaTunnel 支持 Flink 引擎 Schema Evolution 功能

過去兩週,我們對開源之夏活動中表現優異的開發者們進行了簡單的採訪,初步粗略地瞭解了一下他們的開發過程和心得體會。今天,我們將通過同學們的完整結項報告,深入瞭解項目的開發技術細節,希望能夠幫助大家更好地瞭解 Apache SeaTunnel 項目的最新進展。

接下來是關於在 Flink 引擎上對 Schema Evolution 功能的支持這一項目的完整報告:

一. 已完成工作

根據原定方案(https://ycn2sw1zdz0c.feishu.cn/wiki/QTxYwPcytiG4bxku0vQcrvtlnlb)和時間規 劃,已完成在 Flink 引擎上對 Schema Evolution 功能的支持,目前 Sink 端已支持在 Flink 引擎 上進行流式變更的有: JdbcSinkWriter , JdbcExactlyOnceSinkWriter , ConsoleSinkWriter ,已 經測試完畢,目前沒有發現在 Schema Evolution 流程中有明顯 bug。

✅在 source 和 transform 之間動態插入算子,如果檢測到實現了 SupportSchemaEvolution 的類, 並且開啓了 schema evolution 的配置,則插入SchemaOperator。
✅實現 SchemaCoordiantor 協調器,接收 sink 端上報的刷寫信息,同時接收 SchemaOperator 上 報的 Schema 變更請求。
✅擴展 SchemaChangeEvent子類,支持 FlushEvent 事件流轉。
✅擴展 SupportSchemaEvolutionSinkWriter 方法,支持上報刷寫成功信息,處理 FlushEvent。
✅實現 SchemaOperator 算子,檢測被 source 端發出的變更事件並處理,支持變更事件透傳到下 遊。
✅重寫 SupportSchemaEvolutionSinkWriter 關於 Schema evolution方法,目前支持
JdbcSinkWriter,JdbcExactlyOnceSinkWriter ,ConsoleSinkWriter,測試完畢,符合預期。
✅擴展 FlinkRowCollector 的 collect 方法,支持變更事件的收集。
✅擴展 FlinkSinkWriter 方法,支持檢測變更事件並處理。
✅擴展 SchemaEvolution 錯誤碼和異常體系,變更出現異常時支持詳細異常信息上報。
✅變更任務出現異常後,自動拋出異常,交給重試機制處理。

二. 遇到的問題及解決方案

1. 事件流轉問題

在 source 端和 transform 中間插入一個 operator時,需要在內部判斷流轉過來的元素是否是事 件,如果是事件,就阻塞,等待刷寫變更之後再次流轉;否則就繼續向下流轉,有兩種方案:

  • 和 Zeta 引擎保持一致,創建類似 Record 的類 StreamElement ,但是從 source 到 transform 到 sink 端的所有鏈路,關於 SeatunnelRow 的都要修改為 StreamElement,入侵性極高,且非常危 險,影響面大。
  • 在 SeatunnelRow 中添加特殊標記,比如在 options 裏面添加一個選項,如果遇到事件,存儲到額 外信息裏面,這樣對鏈路入侵性不高,但是違反單一設計原則,按理來説 SeatunnelRow 不應該關 心事件,只負責數據流轉,之後如果架構升級可以重構,目前以實現功能為主,減小風險。

之後就可以在 SchemaOperator 算子裏面檢測到這個標記了:


但是這樣同樣會帶來一個問題,就是我們 new 了一個空行,會導致 sink 端的寫入報錯,所以需要 在 sink 端檢測:




這樣就能解決事件透傳的問題。

2. 多並行度問題

實際上在 Flink CDC 的實現中,增量快照階段,按照用户定義並行度開啓任務,讀取快照數據;進 入增量階段後,為了保證順序,只會保留一個任務來讀取,所以我們不需要給協調器多麼複雜的實現, 讓它正常接收 sink 端響應即可,也不用考慮多個分區重複上報以及順序問題:







關於 source 端明明是一個任務,但是 sink 可能是多個任務的問題,看了下 flink cdc 相關源碼, CDC source 確實是由一個任務來讀取 binlog,但之後數據通過 KeyGroupStreamPartitioner 按主鍵 hash分發,不同的 key 被髮送到不同的 sink 任務,每個 sink 任務處理分配給它的 key 範圍的數據:

具體實現裏面,會先檢查 sink 端和 input 的並行度是否相同,如果不同,會採取 primary key shuffle 的手段:

sink 配置了自定義並行度且不等於輸入並行度時, Flink 會進行特殊處理:

如果 sink 並行度與 input 並行度不同,會通過 primary key 進行 shuffle:

Flink 自己應該支持這種 sink 端的多並行度,但是我有幾個考慮的點:

  1. 如果真要實現這種機制, Shuffle 的實現對我來説有難度。
  2. 如果多個並行度同時收到變更命令,對於冪等性的數據庫來説,變更可能不受影響,但是像
    StarRocks 這種 OLAP 數據庫沒有冪等性,所以有困難,當然這種也有解決辦法,就是收到幾個分 區的刷寫完成響應之後,協調器收到 ack,讓協調器來變更,同樣也很麻煩,不如讓 source 和 sink 使用相同並行度,在一條算子鏈裏面,也不用 shuffle,但是還有一個點是,數據量大的情況下可能 影響性能。

所以,我目前檢測到 cdc 變更之後強制指定 sink 端並行度就是1,這樣也不會有上面的問題,之後 可以進行迭代來支持 sink 端的需求:


3. 刷寫與請求的執行順序問題

之前在 SchemaOperator 算子裏面處理變更事件的時候,我先發送了刷寫事件,之後才請求協調器 進行變更,這樣會有一個問題,協調器內部的 SchemaChangeState 還沒有進行初始化,所以之後協調 器遲遲獲取不到 State,先一步到的 FluEvent 也沒有被成功接收, 一直阻塞,之後任務超過了我設定的 超時時間,任務就失敗了。
分析日誌後發現:

  • 12:33:36,597 - FlushEvent 被處理, Sink 立即上報了 flush 成功
  • 12:33:36,597 - 協調器警告: "No schema change state found"
  • 12:33:36,598 - 協調器才創建 schema change state

再次查看我寫的代碼:

所以問題就比較明顯, Sink 的 flush 通知比協調器的 requestSchemaChange 更早到達,導致通知 被丟棄,我們只需要修改執行順序即可解決此問題:


4. FlushData 和 變更問題

之前我在實現的時候, FlushEvent 內部包裹着 SchemaChangeEvent,在 FlushData 的同時就把表 變更了,這樣有一個問題就是職責不清晰,比較混亂,之後就把職責分開,刷寫數據就只刷寫數據,之 後上報協調器,再次發送變更事件,真正進行變更:

5. 默認實現與接口職責問題

目前為了向前兼容,SupportSchemaEvolutionSinkWriter 中新增方法均被標記為 default,之後再 進行迭代。迭代完畢之後,即可取消 default 關鍵字:

6. 變更失敗後標記失敗 or 回滾問題

有一個問題是,假設説因為網絡問題或者其他問題,作業失敗了,那麼應該直接標記作業失敗,讓 Flink 自己從檢查點拉起作業,還是讓其直接回滾?

Flink CDC 的實現是直接標記失敗,之後從檢查點恢復,目前我採用的是標記失敗的策略,考慮的點 是,主動回滾開發相當麻煩,可能還需要 flink ck 進行適配,直接讓 schema 變更失敗時拋出異常,讓現 有的重試機制處理就行,而且也觀察到 SeaTunnel 這邊做了重試相關的機制, Flink自己有全局重試策略,no ,fixed-delay ,failure-rate(已實現,已測試)。

因為要拋出異常,直接拋出 RuntimeException 對開發者定位問題和用户都不是很友好,所以增強 了異常機制,實現了自己的異常類,錯誤碼和異常方法。
異常處理示例:


三. 測試用例與結果

關於 MySQLCDC to MySQL 場景測試報告如下:
✅add column 場景測試報告:https://ycn2sw1zdz0c.feishu.cn/wiki/XYotwQ7QeiJqsikiTEscBXwcn
✅drop column 場景測試報告: https://ycn2sw1zdz0c.feishu.cn/wiki/QU73wXqTpirfZmk6NDCc1i
1wnDf
✅modify column 場景測試報告: https://ycn2sw1zdz0c.feishu.cn/wiki/NXVwwTLf8iWUiFk6nGgcv GJGnmd
✅change column 場景測試報告: https://ycn2sw1zdz0c.feishu.cn/wiki/UoIvwdUcJiutXSkyvm1ceb
LcnUh

四. 後續工作計劃

  • 目前並不是所有支持 schema evolution 的 sink 端均實現了,後續支持SupportSchemaEvolutionSinkWriter 相關子類的實現。
  • 測試不同數據源之間的流轉情況,修復可能的小 bug。
  • 測試大量數據情況下是否會出現嚴重阻塞問題。
  • 測試高併發情況下是否有不一致性問題。
user avatar yingjundeqie 頭像 sfbc 頭像 dbkangaroo 頭像 wangmingx 頭像 Rocokingdom2024 頭像 hashdata 頭像 zhangxueliang 頭像
7 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.