博客 / 詳情

返回

結項報告完整版 | Apache SeaTunnel支持metalake開發

過去兩週,我們對開源之夏活動中表現優異的開發者們進行了簡單的採訪,初步粗略地瞭解了一下他們的開發過程和心得體會。今天,我們將通過同學們的完整結項報告,深入瞭解項目的開發技術細節,希望能夠幫助大家更好地瞭解 Apache SeaTunnel 項目的最新進展。

接下來是關於Apache SeaTunnel支持metalake開發這一項目的完整報告:

一、項目背景

目前,Apache SeaTunnel 的任務配置中,數據源的用户名和密碼等敏感信息直接寫死在任務腳本中,這種方式 存在以下問題:

  1. 安全隱患:敏感信息暴露在腳本中,易導致數據源信息泄漏。
  2. 維護困難: 數據源配置信息發生變更時,需手動修改所有相關任務腳本,效率低下且易出錯。

為解決上述問題,本項目旨在通過集成metalake,實現數據源信息的集中存儲和管理。通過數據源 ID 映射機 制,用户可方便地更新和管理數據源配置。本項目的目標是支持主流數據目錄 Apache Gravitino,並通過預留 接口,方便擴展支持其他第三方數據目錄服務。

Apache Gravitino獲取數據源配置信息的REST API示例見於:https://gravitino.apache.org/docs/0.9.0- incubating/api/rest/load-catalog

代碼倉庫見於: https://github.com/apache/seatunnel

  1. 完成metalake配置信息適配
    將metalake配置信息配置在seatunnel-env中,任務啓動後加載到任務配置腳本的env中。

    1.1 任務啓動時讀取seatunnel-env中的配置項。
    1.2 將配置集成到任務腳本的env中,確保任務能夠正確加載metalake配置。

  2. 完成source和sink的數據源配置信息改造
    讀取env中是否開啓metalake標識,在source和sink中增加sourceId作為查詢metalake的唯一標識,獲取數據 源信息並替換source/sink配置項中的佔位符。

    2.1 在source和sink配置中增加sourceId配置項。
    2.2 支持source/sink配置項中的佔位符替換,通過sourceId動態獲取數據源信息。

  3. 插件方式支持metalake並集成Apache Gravitino
    定義metalake接口,支持根據唯一ID查詢數據源配置信息,並實現Apache Gravitino數據源信息轉換為 SeaTunnel配置項佔位符的功能。
    3.1 定義metalake實現接口,提供數據源查詢功能。
    3.2 支持Apache Gravitino集成,參考Gravitino REST API文檔。
    3.3 支持擴展性,通過實現接口可支持其他數據目錄,如UnityCatalog或DataHub。
    3.4 確保向後兼容,不影響存量任務的正常運行。

二、方案描述

  1. 完成metalake配置信息適配

    1.1 任務啓動時讀取seatunnel-env中配置項

  2. 實現目標:在任務啓動時,從seatunnel-env.sh或者任務配置文件中讀取metalake相關的配置。
  3. 實現方法:在seatunnel-env.sh文件中定義metalake配置項,例如:
METALAKE_ENABLED=true
METALAKE_TYPE=gravitino
METALAKE_URL=http://localhost:8090/api/metalakes/metalake_name/catalogs /
...

或者在任務配置文件中的env中配置

env{
metalake_enabled = true
metalake_type = "gravitino"
metalake_url =
"http://localhost:8090/api/metalakes/metalake_name/catalogs/" }

1.2 將配置集成到env中

  • 實現目標:將讀取的metalake配置集成到任務的env中。
  • 實現方法:

    • 若用户在任務配置文件中配置env,那麼自然無需集成。
    • 若在seatunnel-env.sh腳本中配置,也可通過System.getEnv()獲得,無需集成到env中
  1. 完成source和sink的數據源配置信息改造

    2.1 source/sink增加sourceId配置項

  2. 實現目標:為source和sink添加sourceId字段,用於標識metalake中的數據源。
  3. 實現方法:

    • 在任務腳本中指定sourceId即可。
    • 任務腳本示例:
source {
type = "mysql"
sourceId = "mysql_datasource_001"
url = "jdbc:mysql://localhost:3306/db"
...
}

2.2 支持source/sink的配置項佔位符替換

  • 實現目標:通過metalake動態獲取數據源信息,並替換配置中的佔位符。
  • 實現方法:

    • 在配置解析階段,檢查sourceId和metalakeEnabled。
    • 如果啓用metalake且sourceId存在,則用户可將username和password等字段並設為佔位符,然 後通過metalake接口查詢數據源信息並佔位符替換。
    • 步驟:

        1. 定義佔位符格式,例如${key}。
        1. 通過REST API查詢數據源信息。
        1. 替換配置中的佔位符。
    • 代碼示例:

  1. 插件方式支持metalake並支持Apache Gravitino集成

    3.1 定義metalake實現接口

    • 實現目標:定義一個通用接口,用於與metalake交互。
    • 實現方法:

      • 定義MetalakeClient接口,包含查詢數據源信息的方法。
      • 接口定義:
3.2 支持Apache Gravitino集成
  • 實現目標:實現與Apache Gravitino的集成,通過REST API獲取數據源信息。
  • 實現方法:

    • 創建GravitinoClient類,實現MetalakeClient接口。
    • 使用HTTP客户端發送請求到Gravitino API,並解析響應。
    • 代碼示例:

    3.3 支持可擴展

  • 實現目標:通過插件化設計,支持其他metalake實現。
  • 實現方法:

    • 使用工廠方法,根據metalakeType選擇合適的client。
    • 代碼示例:
3.4  不影響存量任務,向後兼容
  • 實現目標:確保新功能不破壞現有任務。
  • 實現方法:

    • 將metalakeEnabled設為可選配置,默認值為false。
    • 僅在metalakeEnabled=true且sourceId存在時觸發metalake邏輯。
    • 代碼示例:

三、時間規劃

四、項目進度

已完成工作

已完成項目所需功能的開發與測試,並經過修改後,已經合併了PR。

遇到的問題與解決方案

在編寫代碼時,我遇到的問題不多,並且要感謝liugddx老師的指導,在老師的指導下,我遇到的問題基本迎刃 而解。

還有一個問題就是該項目的test case較多,測試時間較長,並且合併PR前要通過所有的test case。然後由於網 絡等原因,這些test case不是很穩定,有時需要多次重試才能通過,這很考驗我的耐心。

測試用例

設計了一個簡單的任務配置腳本,並在source中使用了metalake。

併為此測試用例構建了相應的MySQL數據庫和Gravitino。在sink中使用了Assert connector,保證得到正確的結 果。該集成測試的test case代碼也已上傳github,並且通過了測試。

後續工作安排

後續可以考慮集成更多的metalake類型,使得該功能不侷限於Gravitino。

user avatar bigdatacoffe 頭像 b2proxy 頭像 huaweiclouddeveloper 頭像
3 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.