Stories

Detail Return Return

DMS Airflow:企業級數據工作流編排平台的專業實踐 - Stories Detail

本文作者:阿里雲數據庫技術專家 賈志威

DMS Airflow 是基於 Apache Airflow 構建的企業級數據工作流編排平台,通過深度集成阿里雲 DMS(Data Management Service)系統的各項能力,為數據團隊提供了強大的工作流調度、監控和管理能力。本文將從 Airflow 的高級編排能力、DMS 集成的特殊能力,以及 DMS Airflow 的使用示例三個方面,全面介紹 DMS Airflow 的技術架構與實踐應用。

一、Airflow 提供的高級編排能力

1.1 DAG(有向無環圖)定義

Airflow 的核心是 DAG(Directed Acyclic Graph),它定義了任務之間的依賴關係和執行順序。

核心特性:

Python 代碼定義:DAG 以 Python 代碼形式定義,支持版本控制和代碼審查
動態生成:支持根據配置或數據動態生成 DAG
模板化:支持 Jinja2 模板,實現參數化配置

示例:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
   
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'complex_etl_pipeline',
    default_args=default_args,
    description='複雜ETL數據管道',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production']
)

# 定義任務
extract_task = BashOperator(
    task_id='extract_data',
    bash_command='python /scripts/extract.py --date {
   { ds }}',
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_function,
    op_kwargs={
   'date': '{
   { ds }}'},
    dag=dag
)

load_task = BashOperator(
    task_id='load_data',
    bash_command='python /scripts/load.py --date {
   { ds }}',
    dag=dag
)

# 定義依賴關係
extract_task >> transform_task >> load_task

1.2 任務依賴管理

Airflow 提供了靈活的任務依賴管理機制,支持複雜的任務編排場景。

依賴操作符:

和 <<:設置任務執行順序
set_upstream() 和 set_downstream():顯式設置上下游關係
cross_downstream():批量設置下游依賴
chain():鏈式依賴設置

複雜依賴示例:

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.helpers import chain, cross_downstream

# 分支任務
branch_task = DummyOperator(task_id='branch', dag=dag)

# 並行任務組
task_a = DummyOperator(task_id='task_a', dag=dag)
task_b = DummyOperator(task_id='task_b', dag=dag)
task_c = DummyOperator(task_id='task_c', dag=dag)

# 合併任務
merge_task = DummyOperator(task_id='merge', dag=dag)

# 設置依賴:branch -> [task_a, task_b, task_c] -> merge
branch_task >> [task_a, task_b, task_c] >> merge_task

# 使用 chain 函數
chain(
    extract_task,
    [transform_task_1, transform_task_2],
    load_task
)

1.3 調度和時間觸發

Airflow 提供了強大的調度功能,支持多種時間觸發方式。

調度類型:

Cron 表達式:schedule_interval='0 0 *'(每天零點執行)
預設值:@daily、@hourly、@weekly 等
時間間隔:timedelta(hours=2)(每2小時執行)
None:手動觸發,不自動調度

時間模板變量:

{ { ds }}:執行日期(YYYY-MM-DD)
{ { ds_nodash }}:執行日期(YYYYMMDD)
{ { ts }}:執行時間戳
{ { yesterday_ds }}:前一天日期
{ { next_ds }}:下一次執行日期

示例:

dag = DAG(
    'scheduled_pipeline',
    schedule_interval='0 */6 * * *',  # 每6小時執行一次
    start_date=datetime(2024, 1, 1),
    catchup=True,  # 補跑歷史數據
    max_active_runs=1  # 最多同時運行1個實例
)

task = PythonOperator(
    task_id='process_data',
    python_callable=process_function,
    op_kwargs={
   
        'execution_date': '{
   { ds }}',
        'next_execution_date': '{
   { next_ds }}'
    },
    dag=dag
)

1.4 任務狀態管理

Airflow 提供了完善的任務狀態管理機制,支持任務重試、失敗處理和狀態轉換。

任務狀態:

None:未調度
Scheduled:已調度,等待執行
Queued:已排隊,等待資源
Running:正在執行
Success:執行成功
Failed:執行失敗
Skipped:跳過執行
Retry:重試中
Up for retry:等待重試

重試機制:

task = PythonOperator(
    task_id='unreliable_task',
    python_callable=unreliable_function,
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,  # 指數退避
    max_retry_delay=timedelta(hours=1),
    dag=dag
)

1.5 數據感知調度(Dataset)

Airflow 2.4+ 引入了 Dataset 概念,支持基於數據可用性的調度。

核心概念:

Dataset:表示數據的抽象概念
Dataset Producer:產生數據的任務
Dataset Consumer:消費數據的任務
調度觸發:當 Dataset 更新時,自動觸發依賴的 DAG

示例:

from airflow import Dataset
from airflow.operators.python import PythonOperator

# 定義 Dataset
raw_data = Dataset("s3://bucket/raw-data/")
processed_data = Dataset("s3://bucket/processed-data/")

# Producer 任務
produce_task = PythonOperator(
    task_id='produce_data',
    outlets=[raw_data],  # 標記產生的數據集
    python_callable=produce_function,
    dag=dag
)

# Consumer 任務
consume_task = PythonOperator(
    task_id='consume_data',
    inlets=[raw_data],  # 依賴的數據集
    outlets=[processed_data],
    python_callable=consume_function,
    dag=another_dag  # 可以跨 DAG
)

1.6 動態任務生成

Airflow 支持在運行時動態生成任務,實現靈活的編排邏輯。

應用場景:

根據配置文件生成任務
根據數據庫查詢結果生成任務
根據文件列表生成處理任務

示例:

def generate_tasks():
    """根據配置動態生成任務"""
    configs = [
        {
   'table': 'users', 'database': 'db1'},
        {
   'table': 'orders', 'database': 'db1'},
        {
   'table': 'products', 'database': 'db2'}
    ]

    tasks = []
    for config in configs:
        task = PythonOperator(
            task_id=f"process_{config['table']}",
            python_callable=process_table,
            op_kwargs=config,
            dag=dag
        )
        tasks.append(task)

    return tasks

# 動態生成的任務
dynamic_tasks = generate_tasks()

1.7 任務組和子 DAG

Airflow 支持任務組(TaskGroup)和子 DAG(SubDAG),用於組織複雜的任務結構。

TaskGroup 示例:

from airflow.utils.task_group import TaskGroup

with TaskGroup('etl_group') as etl_group:
    extract_task = BashOperator(task_id='extract', ...)
    transform_task = PythonOperator(task_id='transform', ...)
    load_task = BashOperator(task_id='load', ...)

    extract_task >> transform_task >> load_task

# TaskGroup 可以像普通任務一樣使用
start_task >> etl_group >> end_task

1.8 XCom 數據傳遞

Airflow 的 XCom(Cross-Communication)機制支持任務間數據傳遞。

使用示例:


def extract_function(**context):
    data = {
   'records': 1000, 'size': '10MB'}
    return data

def transform_function(**context):
    # 獲取上游任務的數據
    ti = context['ti']
    data = ti.xcom_pull(task_ids='extract')
    records = data['records']
    # 處理數據
    processed = records * 2
    return processed

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_function,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_function,
    dag=dag
)

extract_task >> transform_task

二、DMS 集成的 Airflow 特殊能力

2.1 與 DMS 系統的深度集成

2.1.1 統一認證與授權

DMS Airflow 通過 DmsAuthManager 實現了與 DMS UC Center 的統一認證,用户無需單獨管理 Airflow 賬號,直接使用 DMS 賬號登錄。

核心優勢:

單點登錄:一次登錄,全平台訪問
權限統一:權限管理與 DMS 系統保持一致
角色映射:自動映射 DMS 角色到 Airflow 角色(Public、Viewer、User、Operator、Admin)

2.1.2 DMS 服務集成

DMS Airflow 通過內部代理機制,實現了與 DMS 各種服務的無縫集成。

集成服務:

DMS Enterprise API:SQL 執行、任務管理
AnalyticDB API:Spark 任務提交、資源管理
DTS API:數據同步任務控制
Notebook API:Notebook 資源管理
UC Center:用户認證和權限管理

2.2 企業級通知能力

DMS Airflow 提供了三種通知方式,滿足不同場景的告警需求。

2.2.1 多通道通知

DMS Notification:

直接集成到 DMS 系統通知中心
支持任務狀態、錯誤信息、執行結果等
與 DMS 工作流系統聯動

SLS Notification:

集中式日誌管理
支持日誌查詢和分析
可與日誌分析工具集成

CloudMonitor Notification:

實時監控指標
支持自定義告警規則
與雲監控告警系統集成

2.3 智能資源管理

2.3.1 自動擴縮容服務

DMS Airflow 的自動擴縮容服務基於任務負載動態調整 Worker 數量,實現資源的智能化管理。

核心特性:

負載監控:實時監控隊列中等待和執行的任務數量
智能計算:根據任務數量和 Worker 併發度計算目標 Worker 數
平滑處理:使用滑動窗口和 Kalman 濾波算法平滑負載波動
邊界約束:支持最小和最大 Worker 數量限制
K8s 集成:通過 API 調用調整 Kubernetes 副本數

配置示例:

# airflow.cfg
[scale]
queue_length = 15          # 滑動窗口長度
worker_num_min = 2         # 最小 Worker 數
worker_num_max = 20        # 最大 Worker 數
polling_interval = 30       # 輪詢間隔(秒)

2.3.2 資源組管理

DMS Airflow 支持 AnalyticDB 的資源組管理,可以指定任務在特定的資源組中執行,實現資源隔離和優先級控制。

資源組類型:

Interactive 資源組:交互式查詢,低延遲
Batch 資源組:批處理任務,高吞吐
Warehouse 資源組:數據倉庫查詢

2.4 DAG 動態刷新

DMS Airflow 提供了 DAG 刷新插件(dags_refresh_plugin),支持通過 API 觸發 DAG 文件重新加載,無需重啓 Airflow 服務。

核心特性:
API 觸發:通過 HTTP API 觸發刷新
安全認證:基於 POP 簽名算法的安全認證
批量刷新:支持批量刷新多個 DAG

使用場景:

代碼更新後快速生效
配置變更後立即應用
開發調試時的快速迭代

2.5 日誌優化

DMS Airflow 實現了日誌棧過濾(no_stack_filter),自動過濾異常堆棧信息,使日誌更加簡潔易讀。

優勢:

減少日誌體積
提高日誌可讀性
加快日誌傳輸速度

2.6 實例名稱到 Cluster ID 映射

DMS Airflow 支持通過 DMS 實例名稱(dblink)自動解析 AnalyticDB Cluster ID,簡化配置管理。

使用場景:

# 方式1:直接使用 cluster_id
spark_task = DMSAnalyticDBSparkSqlOperator(
    task_id='spark_task',
    cluster_id='adb-cluster-001',
    resource_group='interactive-spark',
    sql='SELECT * FROM table',
    dag=dag
)

# 方式2:使用 instance 名稱(自動解析)
spark_task = DMSAnalyticDBSparkSqlOperator(
    task_id='spark_task',
    instance='production-adb-dblink',  # DMS 中的 dblink 名稱
    resource_group='interactive-spark',
    sql='SELECT * FROM table',
    dag=dag
)

2.7 企業級監控與可觀測性

DMS Airflow 集成了多種監控和可觀測性工具,提供全方位的任務執行監控。

監控維度:

任務執行監控:任務狀態、執行時間、重試次數
資源使用監控:Worker 數量、隊列長度、資源組使用率
業務指標監控:通過 CloudMonitor 發送自定義業務指標
日誌分析:通過 SLS 進行集中日誌管理和分析

2.8 安全特性

DMS Airflow 實現了多層安全機制,確保系統安全可靠。

安全機制:

POP 簽名認證:API 調用使用 POP 簽名算法驗證
Token 管理:自動刷新 DMS Token,保證長期任務的穩定性
權限控制:基於角色的細粒度權限控制
連接加密:所有 API 調用通過加密通道傳輸

三、DMS Airflow 使用示例

3.1 SQL 任務執行示例

DMSSqlOperator 用於執行 DMS SQL 任務,支持異步執行和狀態監控。

核心特性:

異步執行,避免長時間阻塞
自動輪詢任務狀態
支持多條 SQL 語句順序執行
支持任務完成回調

使用示例:

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator
from datetime import datetime

dag = DAG(
    'dms_sql_example',
    default_args={
   'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily'
)

sql_task = DMSSqlOperator(
    task_id='execute_sql',
    instance='production_db',
    database='analytics',
    sql='''
        SELECT COUNT(*) as total_records
        FROM user_behavior_log
        WHERE date = '{
   { ds }}'
    ''',
    polling_interval=10,
    callback=lambda result: print(f"SQL執行完成: {result}"),
    dag=dag
)

3.2 Spark 計算任務示例

DMSAnalyticDBSparkOperator 用於執行 AnalyticDB MySQL 3.0 Data Lakehouse 的 Spark 任務,支持兩種資源組類型:Job 資源組和 Warehouse 資源組。

核心特性:

支持 SparkWarehouse 和傳統 Spark Job 兩種執行引擎
自動識別資源組類型
支持 Spark 配置參數自定義
自動獲取 Spark Web UI 地址
支持執行時間限制

使用示例:

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import (
    DMSAnalyticDBSparkSqlOperator,
    DMSAnalyticDBSparkOperator
)
from datetime import datetime

dag = DAG(
    'spark_analysis_example',
    default_args={
   'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily'
)

# Spark SQL 執行(Warehouse模式)
spark_sql_task = DMSAnalyticDBSparkSqlOperator(
    task_id='spark_sql_analysis',
    cluster_id='adb-cluster-001',
    resource_group='interactive-spark',
    sql='''
        SELECT 
            user_id,
            COUNT(*) as action_count,
            SUM(amount) as total_amount
        FROM user_events
        WHERE date = '{
   { ds }}'
        GROUP BY user_id
    ''',
    schema='analytics',
    conf={
   'spark.sql.shuffle.partitions': 200},
    execute_time_limit_in_seconds=3600,
    dag=dag
)

# Spark Job 執行(傳統模式)
spark_job_task = DMSAnalyticDBSparkOperator(
    task_id='spark_batch_job',
    cluster_id='adb-cluster-001',
    resource_group='batch-job',
    sql='your_spark_sql_here',
    app_type='SQL',
    app_name='daily_etl_job',
    dag=dag
)

3.3 數據同步任務示例

DTSLakeInjectionOperator 用於控制阿里雲 DTS(Data Transmission Service)數據同步任務,支持數據庫到數據湖的同步場景。

核心特性:
自動構建 DTS 任務
實時監控同步任務狀態
自動處理預檢查失敗場景
自動刷新 HMS Token

使用示例:

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
from datetime import datetime

dag = DAG(
    'dts_sync_example',
    default_args={
   'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily'
)

dts_task = DTSLakeInjectionOperator(
    task_id='sync_to_data_lake',
    source_instance='source_rds',
    source_database='production_db',
    target_instance='target_oss',
    bucket_name='data-lake-bucket',
    reserve={
   
        'table_filter': ['user_*', 'order_*'],
        'sync_mode': 'full'
    },
    db_list={
   
        'include': ['analytics', 'reporting']
    },
    polling_interval=10,
    dag=dag
)

3.4 Notebook 任務執行示例

DMSNotebookOperator 支持執行 Jupyter Notebook 文件,適合數據科學和機器學習工作流。

核心特性:

自動創建或獲取 Notebook 實例
支持運行時參數注入
實時獲取任務執行進度
支持任務超時配置
自動獲取並輸出 Notebook 執行日誌

使用示例:

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator
from datetime import datetime

dag = DAG(
    'notebook_example',
    default_args={
   'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily'
)

notebook_task = DMSNotebookOperator(
    task_id='run_ml_training',
    file_path='notebooks/model_training.ipynb',
    profile_name='ml-profile',
    cluster_name='ml-cluster',
    cluster_type='spark',
    spec='large',
    runtime_name='python3.9',
    run_params={
   
        'training_date': '{
   { ds }}',
        'model_version': 'v2.0'
    },
    timeout=7200,
    polling_interval=10,
    dag=dag
)

3.5 通知器使用示例

DMS Airflow 提供了三種通知器,滿足不同場景的告警需求。

3.5.1 基礎通知示例

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.notifications.sls_notification import SLSNotifier
from airflow.providers.alibaba_dms.cloud.notifications.cloudmonitor_notification import CloudMonitorNotifier
from datetime import datetime

# 定義通知回調
def notify_on_failure(context):
    # SLS 通知
    sls_notifier = SLSNotifier(
        sls_conn_id='sls_default',
        project='airflow-logs',
        logstore='task-alerts',
        success=False,
        message=f"Task {context['task_instance'].task_id} failed"
    )
    sls_notifier.notify(context)

    # CloudMonitor 通知
    cms_notifier = CloudMonitorNotifier(
        cms_conn_id='cms_default',
        region='cn-hangzhou',
        metric_name='TaskFailure',
        event_name='TaskFailedEvent',
        success=False,
        message=f"Task {context['task_instance'].task_id} failed"
    )
    cms_notifier.notify(context)

dag = DAG(
    'example_with_notifications',
    default_args={
   
        'start_date': datetime(2024, 1, 1),
        'on_failure_callback': notify_on_failure
    },
    schedule_interval='@daily'
)

3.6 完整 ETL 工作流示例

以下是一個完整的 ETL 工作流示例,展示瞭如何組合使用多個 DMS Airflow 操作器:

from airflow import DAG
from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import DMSAnalyticDBSparkSqlOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
from airflow.providers.alibaba_dms.cloud.notifications.sls_notification import SLSNotifier
from datetime import datetime, timedelta

default_args = {
   
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': lambda context: SLSNotifier(
        project='airflow-alerts',
        logstore='task-failures',
        success=False,
        message=f"DAG {context['dag'].dag_id} failed"
    ).notify(context)
}

dag = DAG(
    'complete_etl_pipeline',
    default_args=default_args,
    description='完整ETL數據管道',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production']
)

# 步驟1:數據同步(從源庫同步到數據湖)
sync_task = DTSLakeInjectionOperator(
    task_id='sync_source_data',
    source_instance='production_rds',
    source_database='production_db',
    target_instance='data_lake_oss',
    bucket_name='raw-data-bucket',
    reserve={
   
        'table_filter': ['user_*', 'order_*'],
        'sync_mode': 'incremental'
    },
    polling_interval=10,
    dag=dag
)

# 步驟2:執行 SQL 驗證數據
validate_task = DMSSqlOperator(
    task_id='validate_data',
    instance='analytics_db',
    database='staging',
    sql='''
        SELECT 
            COUNT(*) as total_records,
            COUNT(DISTINCT user_id) as unique_users
        FROM raw_user_data
        WHERE date = '{
   { ds }}'
    ''',
    polling_interval=10,
    dag=dag
)

# 步驟3:Spark 數據處理和分析
spark_transform_task = DMSAnalyticDBSparkSqlOperator(
    task_id='spark_data_transform',
    cluster_id='adb-cluster-001',
    resource_group='batch-processing',
    sql='''
        INSERT INTO analytics.user_daily_summary
        SELECT 
            user_id,
            date,
            COUNT(*) as event_count,
            SUM(amount) as total_amount,
            AVG(amount) as avg_amount
        FROM staging.raw_user_data
        WHERE date = '{
   { ds }}'
        GROUP BY user_id, date
    ''',
    schema='analytics',
    conf={
   'spark.sql.shuffle.partitions': 200},
    execute_time_limit_in_seconds=3600,
    dag=dag
)

# 步驟4:生成報表
report_task = DMSSqlOperator(
    task_id='generate_report',
    instance='analytics_db',
    database='analytics',
    sql='''
        INSERT INTO daily_reports
        SELECT 
            date,
            COUNT(DISTINCT user_id) as daily_active_users,
            SUM(total_amount) as daily_revenue
        FROM user_daily_summary
        WHERE date = '{
   { ds }}'
        GROUP BY date
    ''',
    polling_interval=10,
    dag=dag
)

# 定義依賴關係
sync_task >> validate_task >> spark_transform_task >> report_task

四、總結

DMS Airflow 作為企業級數據工作流編排平台,通過深度集成 DMS 系統的各項能力,為數據團隊提供了強大的工作流調度、監控和管理能力。

核心優勢總結:

無縫集成:與 DMS 系統的深度集成,實現統一的認證、授權和服務調用
豐富功能:提供 SQL、Spark、DTS、Notebook 等多種任務類型的支持
智能管理:自動擴縮容、資源組管理等智能化資源管理能力
企業級監控:多通道通知、集中日誌管理、自定義指標監控
安全可靠:多層安全機制,確保系統安全可靠

適用場景:

數據 ETL 工作流
數據分析和報表生成
機器學習模型訓練和部署
數據同步和遷移
定時任務調度

DMS Airflow 將持續演進,為數據團隊提供更加高效、穩定、易用的工作流編排能力。

附錄:相關資源

DMS Airflow 文檔:https://help.aliyun.com/zh/dms/create-and-manage-an-airflow-i...
Apache Airflow 官方文檔:https://airflow.apache.org/docs/
歡迎釘釘搜索羣號: 96015019923 加入交流~

user avatar ting_61d6d9790dee8 Avatar didiaodekaishuiping Avatar seatunnel Avatar leeqvip Avatar
Favorites 4 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.