過去兩週,我們對開源之夏活動中表現優異的開發者們進行了簡單的採訪,初步粗略地瞭解了一下他們的開發過程和心得體會。今天,我們將通過同學們的完整結項報告,深入瞭解項目的開發技術細節,希望能夠幫助大家更好地瞭解 Apache SeaTunnel 項目的最新進展。
接下來是關於Apache SeaTunnel支持metalake開發這一項目的完整報告:
一、項目背景
目前,Apache SeaTunnel 的任務配置中,數據源的用户名和密碼等敏感信息直接寫死在任務腳本中,這種方式 存在以下問題:
- 安全隱患:敏感信息暴露在腳本中,易導致數據源信息泄漏。
- 維護困難: 數據源配置信息發生變更時,需手動修改所有相關任務腳本,效率低下且易出錯。
為解決上述問題,本項目旨在通過集成metalake,實現數據源信息的集中存儲和管理。通過數據源 ID 映射機 制,用户可方便地更新和管理數據源配置。本項目的目標是支持主流數據目錄 Apache Gravitino,並通過預留 接口,方便擴展支持其他第三方數據目錄服務。
Apache Gravitino獲取數據源配置信息的REST API示例見於:https://gravitino.apache.org/docs/0.9.0- incubating/api/rest/load-catalog
代碼倉庫見於: https://github.com/apache/seatunnel
-
完成metalake配置信息適配
將metalake配置信息配置在seatunnel-env中,任務啓動後加載到任務配置腳本的env中。1.1 任務啓動時讀取
seatunnel-env中的配置項。
1.2 將配置集成到任務腳本的env中,確保任務能夠正確加載metalake配置。 -
完成source和sink的數據源配置信息改造
讀取env中是否開啓metalake標識,在source和sink中增加sourceId作為查詢metalake的唯一標識,獲取數據 源信息並替換source/sink配置項中的佔位符。2.1 在source和sink配置中增加sourceId配置項。
2.2 支持source/sink配置項中的佔位符替換,通過sourceId動態獲取數據源信息。 - 插件方式支持metalake並集成Apache Gravitino
定義metalake接口,支持根據唯一ID查詢數據源配置信息,並實現Apache Gravitino數據源信息轉換為 SeaTunnel配置項佔位符的功能。
3.1 定義metalake實現接口,提供數據源查詢功能。
3.2 支持Apache Gravitino集成,參考Gravitino REST API文檔。
3.3 支持擴展性,通過實現接口可支持其他數據目錄,如UnityCatalog或DataHub。
3.4 確保向後兼容,不影響存量任務的正常運行。
二、方案描述
-
完成metalake配置信息適配
1.1 任務啓動時讀取
seatunnel-env中配置項 - 實現目標:在任務啓動時,從
seatunnel-env.sh或者任務配置文件中讀取metalake相關的配置。 - 實現方法:在
seatunnel-env.sh文件中定義metalake配置項,例如:
METALAKE_ENABLED=true
METALAKE_TYPE=gravitino
METALAKE_URL=http://localhost:8090/api/metalakes/metalake_name/catalogs /
...
或者在任務配置文件中的env中配置
env{
metalake_enabled = true
metalake_type = "gravitino"
metalake_url =
"http://localhost:8090/api/metalakes/metalake_name/catalogs/" }
1.2 將配置集成到env中
- 實現目標:將讀取的metalake配置集成到任務的env中。
-
實現方法:
- 若用户在任務配置文件中配置env,那麼自然無需集成。
- 若在seatunnel-env.sh腳本中配置,也可通過System.getEnv()獲得,無需集成到env中
-
完成source和sink的數據源配置信息改造
2.1 source/sink增加sourceId配置項
- 實現目標:為source和sink添加sourceId字段,用於標識metalake中的數據源。
-
實現方法:
- 在任務腳本中指定sourceId即可。
- 任務腳本示例:
source {
type = "mysql"
sourceId = "mysql_datasource_001"
url = "jdbc:mysql://localhost:3306/db"
...
}
2.2 支持source/sink的配置項佔位符替換
- 實現目標:通過metalake動態獲取數據源信息,並替換配置中的佔位符。
-
實現方法:
- 在配置解析階段,檢查sourceId和metalakeEnabled。
- 如果啓用metalake且sourceId存在,則用户可將username和password等字段並設為佔位符,然 後通過metalake接口查詢數據源信息並佔位符替換。
-
步驟:
-
- 定義佔位符格式,例如${key}。
-
- 通過REST API查詢數據源信息。
- 通過REST API查詢數據源信息。
-
- 替換配置中的佔位符。
-
- 代碼示例:
-
插件方式支持metalake並支持Apache Gravitino集成
3.1 定義metalake實現接口
- 實現目標:定義一個通用接口,用於與metalake交互。
-
實現方法:
- 定義
MetalakeClient接口,包含查詢數據源信息的方法。 - 接口定義:
- 定義
3.2 支持Apache Gravitino集成
- 實現目標:實現與Apache Gravitino的集成,通過REST API獲取數據源信息。
-
實現方法:
- 創建
GravitinoClient類,實現MetalakeClient接口。 - 使用HTTP客户端發送請求到Gravitino API,並解析響應。
- 代碼示例:
3.3 支持可擴展
- 創建
- 實現目標:通過插件化設計,支持其他metalake實現。
-
實現方法:
- 使用工廠方法,根據metalakeType選擇合適的client。
- 代碼示例:
3.4 不影響存量任務,向後兼容
- 實現目標:確保新功能不破壞現有任務。
-
實現方法:
- 將metalakeEnabled設為可選配置,默認值為false。
- 僅在metalakeEnabled=true且sourceId存在時觸發metalake邏輯。
- 代碼示例:
三、時間規劃
四、項目進度
已完成工作
已完成項目所需功能的開發與測試,並經過修改後,已經合併了PR。
遇到的問題與解決方案
在編寫代碼時,我遇到的問題不多,並且要感謝liugddx老師的指導,在老師的指導下,我遇到的問題基本迎刃 而解。
還有一個問題就是該項目的test case較多,測試時間較長,並且合併PR前要通過所有的test case。然後由於網 絡等原因,這些test case不是很穩定,有時需要多次重試才能通過,這很考驗我的耐心。
測試用例
設計了一個簡單的任務配置腳本,並在source中使用了metalake。
併為此測試用例構建了相應的MySQL數據庫和Gravitino。在sink中使用了Assert connector,保證得到正確的結 果。該集成測試的test case代碼也已上傳github,並且通過了測試。
後續工作安排
後續可以考慮集成更多的metalake類型,使得該功能不侷限於Gravitino。