5.2 元數據
5.2.1 工具
數倉開源元數據項目對比(Apache Atlas、Apache Gravitino、DataHub、OpenMetadata、Amundsen、Metacat、Marquez)
5.2.1.1 起源與發展歷程
|
項目 |
起源與關鍵節點 |
發展特點 |
|
Apache Atlas |
2015 年由 Hortonworks 發起,2017 年成為 Apache 頂級項目,最初為 Hadoop 生態數據治理而生。 |
老牌項目,歷經 10 年迭代,功能成熟,早期綁定 Hadoop 生態,近年逐步適配雲原生。 |
|
Apache Gravitino |
2023 年由網易、星環等企業聯合貢獻至 Apache 孵化器,2025年5月畢業,定位 “統一元數據服務”。 |
新興項目,理念超前(聚焦元數據統一接入),目前處於功能快速完善階段。 |
|
DataHub |
2019 年 LinkedIn 開源,2022 年捐給 LF AI & Data 基金會,解決內部 “數據發現難” 問題。 |
互聯網企業驅動,強調實時性與用户體驗,LinkedIn 持續投入,版本迭代快(每 1-2 月一個版本)。 |
|
OpenMetadata |
2021 年由 Collibra 前核心團隊創立,2022 年加入 Linux 基金會,主打 “一體化數據治理”。 |
治理基因濃厚,集成元數據、質量、協作功能,開箱即用,適合中小團隊快速落地。 |
|
Amundsen |
2019 年 Lyft 開源,名字源自 “挪威探險家”,聚焦 “數據發現”(類似數據領域的 Google)。 |
輕量實用,核心解決分析師 “找數據” 問題,Lyft、Airbnb 等企業內部驗證,功能聚焦不冗餘。 |
|
Metacat |
2015 年 Netflix 開源,最初為解決多數據源(Hive、Redshift、S3)元數據統一訪問問題。 |
定位 “元數據中間層”,功能極簡,無治理 / 血緣,僅做聚合與轉發,維護成本低。 |
|
Marquez |
2018 年 WeWork 開源,後被 The Linux Foundation 接管,專注 “數據管道血緣” 跟蹤。 |
聚焦數據工程場景,深度綁定 Airflow,輕量專一,不擴展其他元數據類型(如 BI、ML)。 |
點擊圖片可查看完整電子表格
5.2.1.2 關鍵能力對比
|
能力維度 |
元數據類型 |
血緣管理 |
數據治理 |
搜索與發現 |
協作功能 |
實時性 |
|
Apache Atlas |
表、字段、分區、分類、權限實體 |
表級 / 字段級,支持 Hive/Spark 靜態血緣 |
標籤體系、數據分級、細粒度權限、合規審計 |
基礎全文檢索(Solr) |
無 |
批處理更新(T+1) |
|
Apache Gravitino |
表、列、視圖、函數、存儲位置 |
表級,跨引擎(Spark/Flink)聚合 |
基礎標籤、數據資產目錄 |
基礎搜索(基於存儲索引) |
無 |
近實時(依賴數據源推送) |
|
DataHub |
表、字段、BI 報表、ML 模型、儀表盤 |
表級 / 字段級,實時捕獲(Kafka 驅動) |
標籤、術語表、所有權管理 |
模糊匹配、過濾、排序(Elasticsearch) |
評論、通知、變更日誌 |
實時(事件驅動,毫秒級) |
|
OpenMetadata |
表、字段、管道、BI、ML 模型、SLA |
表級 / 字段級,自動解析 SQL/Spark |
數據質量(Profiler)、SLA 管理、權限 |
精準搜索、關聯推薦(Elasticsearch) |
評論、任務分配、團隊協作 |
準實時(定時同步 + 觸發式) |
|
Amundsen |
表、字段、查詢歷史、用户行為 |
表級,基於 SQL 解析 |
無 |
熱門表推薦、按使用量排序 |
表描述、使用文檔 |
準實時(定時同步) |
|
Metacat |
表、分區、存儲路徑 |
無 |
無 |
無(僅轉發查詢) |
無 |
實時(被動轉發) |
|
Marquez |
數據集、作業、運行實例 |
作業 - 數據集依賴(表級) |
無 |
基礎查詢 |
無 |
準實時(作業運行時同步) |
點擊圖片可查看完整電子表格
5.2.1.3 數據源支持
|
項目 |
核心支持數據源(數據庫 / 數倉) |
擴展支持(BI / 管道 / ML) |
適配特點 |
|
Apache Atlas |
Hive、HBase、Spark、Kafka、SQL Server |
Tableau(有限)、NiFi |
強適配 Hadoop 生態,對雲數倉(Snowflake)支持弱 |
|
Apache Gravitino |
Hive、Glue、Snowflake、BigQuery、PostgreSQL、Spark |
Flink、Trino |
兼顧傳統與雲原生,通過 “連接器” 擴展,覆蓋廣 |
|
DataHub |
Hive、BigQuery、Redshift、Snowflake、MySQL、PostgreSQL |
Tableau、Power BI、Airflow、TensorFlow |
互聯網場景全覆蓋,雲數倉 / BI 適配完善 |
|
OpenMetadata |
Hive、BigQuery、Redshift、Snowflake、MySQL、Oracle |
Tableau、Power BI、Airflow、Fivetran、Great Expectations |
連接器豐富(30+),開箱即用,無需二次開發 |
|
Amundsen |
Hive、Presto、Redshift、BigQuery |
Airflow、Jupyter |
聚焦分析師常用數據源,適配輕量簡潔 |
|
Metacat |
Hive、Redshift、S3、Athena、MySQL |
無(僅元數據聚合) |
多數據源統一接口,屏蔽底層差異 |
|
Marquez |
Hive、PostgreSQL、BigQuery |
Airflow、Spark、Flink |
僅支持數據管道相關數據源,適配極專一 |
點擊圖片可查看完整電子表格
5.2.1.4 技術棧對比
|
項目 |
後端語言 |
存儲數據庫 |
搜索組件 |
消息隊列 |
前端技術 |
部署複雜度 |
|
Apache Atlas |
Java |
HBase(主存儲) |
Solr |
Kafka(可選) |
AngularJS |
高(依賴 HBase/Solr) |
|
Apache Gravitino |
Java |
MySQL/PostgreSQL |
無(依賴存儲索引) |
無 |
無(僅 API) |
低(輕量服務) |
|
DataHub |
Java/Scala |
PostgreSQL |
Elasticsearch |
Kafka(核心) |
React |
中(需部署 Kafka/ES) |
|
OpenMetadata |
Java(Spring Boot) |
PostgreSQL |
Elasticsearch |
無(定時同步) |
React |
低(單服務 + 雙存儲) |
|
Amundsen |
Python(Flask) |
Neo4j(血緣)+ ES(搜索) |
Elasticsearch |
無 |
React |
中(多組件聯動) |
|
Metacat |
Java |
MySQL/PostgreSQL |
無 |
Kafka(事件通知) |
無(僅 API) |
低(單服務) |
|
Marquez |
Java(Dropwizard) |
PostgreSQL |
無 |
無 |
無(僅 API + 簡單 UI) |
低(單服務) |
點擊圖片可查看完整電子表格
5.2.1.5 社區活躍度(截至 2025 年 10 月)
|
項目 |
GitHub Stars |
貢獻者數量 |
近 6 月版本更新 |
Issue 響應速度 |
企業用户案例 |
|
Apache Atlas |
4.8k |
150+ |
1 個(minor) |
較慢(7-14 天) |
華為、騰訊、金融機構 |
|
Apache Gravitino |
1.2k |
50+ |
3 個(功能迭代) |
較快(3-7 天) |
網易、星環科技(初期) |
|
DataHub |
11.5k |
400+ |
6 個(含 major) |
快(1-3 天) |
LinkedIn、Uber、Spotify |
|
OpenMetadata |
8.2k |
200+ |
4 個(功能迭代) |
快(1-3 天) |
Netflix、Walmart、Adobe |
|
Amundsen |
4.2k |
100+ |
2 個(minor) |
中(3-7 天) |
Lyft、Airbnb、Square |
|
Metacat |
1.8k |
50+ |
1 個(維護性) |
慢(14 + 天) |
Netflix、Uber |
|
Marquez |
2.1k |
80+ |
2 個(minor) |
中(5-10 天) |
WeWork、Zalando |
點擊圖片可查看完整電子表格
5.2.1.6 總結與選型建議
核心差異總結
- 治理導向:Apache Atlas(企業級合規,重權限 / 分級)、OpenMetadata(一體化輕治理,含質量 / 協作);
- 實時與發現:DataHub(實時元數據更新,搜索體驗佳)、Amundsen(分析師友好,數據發現效率高);
- 輕量與專一:Metacat(元數據統一訪問層)、Marquez(數據管道血緣跟蹤);
- 未來潛力:Apache Gravitino(統一元數據標準,適配混合數據棧)。
場景化選型建議
- 大型企業級數據治理(金融 / 政務):選Apache Atlas。成熟穩定,支持細粒度權限與合規審計,適配複雜組織架構,但需接受部署複雜度與 Hadoop 生態依賴。
- 互聯網 / 電商(實時元數據 + 多場景):選DataHub。事件驅動架構支持實時更新,覆蓋表、BI、ML 全資產,社區活躍,適合快速迭代的業務團隊。
- 中小團隊 / 初創公司(快速落地治理):選OpenMetadata。一體化功能(元數據 + 質量 + 協作)開箱即用,部署簡單,文檔完善,無需專業數據治理團隊。
- 分析師主導(數據發現優先):選Amundsen。搜索體驗類似 Google,支持熱門表推薦與查詢歷史關聯,輕量化不冗餘,適合分析師自主找數據。
- 多數據源聚合(多雲 / 混合架構):選Metacat。作為元數據中間層屏蔽底層差異,無侵入性,適合已有多數據棧(Hive+Redshift+S3)的企業。
- 數據工程團隊(管道血緣跟蹤):選Marquez。深度集成 Airflow,專注作業 - 數據集依賴,輕量專一,適合 ETL 工程師跟蹤數據流向。
- 混合數據棧(Hadoop + 雲數倉):嘗試Apache Gravitino。理念先進,支持統一元數據接入,適合長期規劃,但需容忍部分功能待完善。
5.2.2 實施方法
任務與數據存儲訪問記錄關聯的解決方案
要建立 Hive、Spark、Flink 等線上任務與 OSS、StarRocks 的審計日誌訪問記錄之間的關聯,核心在於實現任務唯一標識在整個數據訪問鏈路中的傳遞與記錄。以下是具體實現方案:
一、任務唯一標識設計
設計全局唯一的任務標識符(Task ID),格式建議:
Plain Text
[業務線]-[任務類型]-[環境]-[序號]示例:
• recmd-hive-prod-001(推薦業務線 Hive 生產任務)
• user-spark-prod-002(用户業務線 Spark 生產任務)
• log-flink-prod-003(日誌業務線 Flink 生產任務)
Task ID 需滿足:
• 全局唯一性
• 包含業務屬性,便於分類統計
• 與任務調度系統(如 Airflow、DolphinScheduler)中的任務 ID 保持一致
二、任務標識在計算引擎中的傳遞
2.1 Hive 任務
通過hiveconf傳遞任務標識,並配置 Hive 在訪問數據時攜帶該標識:
bash
Bash
# 提交Hive SQL任務時指定Task ID
beeline -u "jdbc:hive2://prod-hive:10000/prod_db;principal=hive/prod-hive@BIGDATA.COM" \--hiveconf task.id=recmd-hive-prod-001 \--hiveconf ranger.audit.taskid=recmd-hive-prod-001 \-f在 Hive 配置中關聯任務標識:
xml
XML
<!-- hive-site.xml 配置 -->
<property>
<name>hive.semantic.analyzer.hook</name><value>com.bigdata.hook.TaskIdAuditHook</value>
</property>自定義 Hook 實現(傳遞 Task ID 到 Ranger 審計):
Java
public class TaskIdAuditHook implements SemanticAnalyzerHook
{
@Override
public ASTNode preAnalyze(ASTNode ast, Context context) {
// 從hiveconf獲取任務ID
String taskId = HiveConf.getVar(context.getConf(), HiveConf.ConfVars.HIVESESSIONSPECIFICPARAMS).get("task.id", "unknown");
// 設置Ranger審計上下文
if (taskId != null && !taskId.isEmpty()) {
RangerPerfTracer.logParam("taskId", taskId);
// 將Task ID存入ThreadLocal,供Ranger審計插件使用
AuditContext.setTaskId(taskId);
}
return ast;
}
}2.2 Spark 任務
通過 Spark 配置傳遞 Task ID,並在訪問數據時攜帶:
bash
Bash
spark-submit \--master yarn \
--deploy-mode cluster \--keytab /etc/keytabs/recmd-prod.keytab \--principal recmd-prod@BIGDATA.COM \--conf spark.app.name=recmd-spark-prod-002 \--conf spark.task.id=recmd-spark-prod-002 \--conf spark.hadoop.ranger.audit.taskid=recmd-spark-prod-002 \--class com.data.RecommendationEngine \
/opt/jobs/recommendation.jarSpark 讀取 OSS/StarRocks 時傳遞 Task ID:
scala
Scala
// 讀取OSS數據時攜帶Task ID
val df = spark.read
.option("taskId", spark.conf.get("spark.task.id")).parquet("oss://prod-bucket/recommendation/data/")
// 寫入StarRocks時攜帶Task ID
df.write
.format("starrocks").option("jdbc.url", "jdbc:mysql://starrocks-fe:9030").option("load-url", "starrocks-fe:8030").option("table.identifier", "prod_db.user_recmd").option("taskId", spark.conf.get("spark.task.id")).save()2.3 Flink 任務
通過 Flink 配置傳遞 Task ID,並在訪問外部系統時攜帶:
bash
Bash
./bin/flink run-application \
-t yarn-application \
-Dsecurity.kerberos.login.principal=recmd-prod@BIGDATA.COM \
-Dsecurity.kerberos.login.keytab=/etc/keytabs/recmd-prod.keytab \
-Dtask.id=recmd-flink-prod-003 \
-Dflink.ranger.audit.taskid=recmd-flink-prod-003 \
-c com.data.LogProcessor \
/opt/jobs/log_processor.jarFlink 消費 Kafka 並寫入 OSS 時傳遞 Task ID:
java
運行
Java
// 配置OSS訪問時的Task ID
Configuration conf = new Configuration();
conf.setString("fs.oss.task.id", getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().get("task.id"));
// 使用帶Task ID的配置創建FileSystem
FileSystem ossFs = FileSystem.get(new URI("oss://prod-bucket/logs/"), conf);
// 寫入數據時攜帶Task ID
DataStream<String> logStream = ...;
logStream.addSink(new RichSinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
// 寫入邏輯中包含Task ID
String taskId = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap().get("task.id");
// ...
}});三、數據存儲審計日誌的任務標識記錄
3.1 OSS 訪問審計配置
配置 OSS 服務記錄訪問來源的 Task ID:
1. 通過 SDK 傳遞 Task ID:
java
Java
// OSS客户端配置Task ID
OSSClientBuilder builder = new OSSClientBuilder();
ClientConfiguration clientConfig = new ClientConfiguration();
clientConfig.setUserAgent("taskId:recmd-hive-prod-001");
// 包含Task ID
OSS ossClient = builder.build(endpoint, accessKeyId, accessKeySecret, clientConfig);1. 配置 OSS 訪問日誌格式:確保 OSS 訪問日誌包含user-agent字段(其中包含 Task ID)
Plain Text
[timestamp] [remote_ip] [user_agent] [request_id] [operation] [bucket] [object] [request_size] [response_size] [duration] [status_code]1. 日誌解析規則:從user_agent中提取 Task ID:
regex
Plain Text
taskId:([a-zA-Z0-9\-]+)3.2 StarRocks 訪問審計配置
配置 StarRocks 記錄查詢請求的 Task ID:
1. 修改 StarRocks FE 配置:
xml
XML
<!-- fe.conf 配置 -->
audit_log_level = INFO
audit_log_file = /var/log/starrocks/fe/audit.log
audit_log_include_variables = true # 記錄會話變量1. 查詢時傳遞 Task ID:在 SQL 語句前設置會話變量傳遞 Task ID:
sql
SQL
-- Hive/Spark/Flink訪問StarRocks時執行
SET @task_id = 'recmd-spark-prod-002';SELECT * FROM prod_db.user_profile WHERE dt = '2023-10-01';1. StarRocks 審計日誌格式:確保審計日誌包含user_vars字段,從中提取task_id
四、Ranger 審計日誌的增強配置
修改 Ranger 配置,確保審計日誌包含 Task ID:
xml
XML
<!-- ranger-admin-site.xml 配置 -->
<property>
<name>ranger.audit.log4j.appender.AUDITFILE.layout.ConversionPattern</name>
<value>{"eventTime":"%d{ISO8601}","reqUser":"%X{username}","accessType":"%X{accessType}",
"resource":"%X{resource}","result":"%X{result}","taskId":"%X{taskId}",
"clientIP":"%X{clientIP}","sessionId":"%X{sessionId}","requestData":"%X{requestData}"}%n</value>
</property>自定義 Ranger 審計插件,從計算引擎上下文提取 Task ID:
java
運行
Java
public class CustomRangerAuditHandler extends RangerDefaultAuditHandler {
@Overridepublic void logAudit(RangerAuditEvent auditEvent) {
// 從ThreadLocal獲取計算引擎設置的Task ID
String taskId = AuditContext.getTaskId();
if (taskId != null && !taskId.isEmpty()) {
auditEvent.setAdditionalInfo("taskId", taskId);
}
super.logAudit(auditEvent);
}
}
五、方案總結
通過以上方案,實現了線上任務與數據存儲訪問記錄的關聯,關鍵要點包括:
- 統一的任務標識:設計全局唯一的 Task ID,作為關聯的核心鍵值
- 全鏈路標識傳遞:在計算引擎(Hive/Spark/Flink)中傳遞 Task ID,並在訪問數據存儲時攜帶
- 審計日誌增強:配置 OSS、StarRocks 和 Ranger 記錄 Task ID,確保審計日誌包含關聯所需信息
該方案可實現任務級別的數據訪問審計,為權限優化、成本核算和安全審計提供精準的數據支持。