博客 / 詳情

返回

Apache SeaTunnel 如何將 CDC 數據流轉換為 Append-Only 模式?

4aea1ddbb5a97ca5e7ab7a20b5bea8f1

RowKindExtractor 是 Apache SeaTunnel 的一個轉換插件,它能將 CDC 數據流轉為 Append-Only 模式,並提取原始 RowKind 信息為新字段。本文將介紹 RowKindExtractor 的核心功能,其在 CDC 數據同步場景下的使用方法,以及配置選項、注意事項及多種應用示例。 

RowKindExtractor

RowKindExtractor 轉換插件用於將 CDC(Change Data Capture)數據流轉換為 Append-Only(僅追加)模式,同時將原始的 RowKind 信息提取為一個新的字段。

核心功能:

  • 將所有數據行的 RowKind 統一改為 +I(INSERT),實現 Append-Only 模式
  • 將原始的 RowKind 信息(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)保存到新增的字段中
  • 支持短格式和完整格式兩種輸出方式

為什麼需要這個插件?

在 CDC 數據同步場景中,數據行帶有 RowKind 標記(+I、-U、+U、-D),表示不同的變更類型。但某些下游系統(如數據湖、分析系統)只支持 Append-Only 模式,不支持 UPDATE 和 DELETE 操作。此時需要:

  1. 將所有數據轉換為 INSERT 類型(Append-Only)
  2. 將原始的變更類型保存為普通字段,供後續分析使用

轉換示例:

輸入(CDC 數據):
  RowKind: -D (DELETE)
  數據: id=1, name="test1", age=20

輸出(Append-Only 數據):
  RowKind: +I (INSERT)
  數據: id=1, name="test1", age=20, row_kind="DELETE"

 
 
典型應用場景

  • 將 CDC 數據寫入只支持 Append 的數據湖
  • 需要在數據倉庫中保留完整的變更歷史記錄
  • 需要對不同類型的變更進行統計分析

配置選項

custom_field_name [string]

指定新增字段的名稱,該字段用於存儲原始的 RowKind 信息。

默認值:row_kind

注意事項:

  • 字段名不能與原有字段重名,否則會報錯
  • 建議使用有意義的名稱,如 operation_type、change_type、cdc_op 等

示例:

custom_field_name = "operation_type"  # 使用自定義字段名

transform_type [enum]

指定 RowKind 字段值的輸出格式。

可選值:

默認值:SHORT

各值含義:

選擇建議:

  • SHORT 格式:節省存儲空間,適合對存儲敏感的場景
  • FULL 格式:可讀性更好,適合需要人工查看或分析的場景

示例:

transform_type = FULL  # 使用完整格式

完整示例

  • 示例 1:使用默認配置(SHORT 格式)

使用默認配置,將 CDC 數據轉換為 Append-Only 模式,RowKind 以短格式保存。

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.users"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    # 使用默認配置:
    # custom_field_name = "row_kind"
    # transform_type = SHORT
  }
}

sink {
  Console {
    plugin_input = "append_only_data"
  }
}

數據轉換過程:

輸入數據(CDC 格式):
  1. RowKind=+I, id=1, name="張三", age=25
  2. RowKind=-U, id=1, name="張三", age=25
  3. RowKind=+U, id=1, name="張三", age=26
  4. RowKind=-D, id=1, name="張三", age=26

輸出數據(Append-Only 格式):
  1. RowKind=+I, id=1, name="張三", age=25, row_kind="+I"
  2. RowKind=+I, id=1, name="張三", age=25, row_kind="-U"
  3. RowKind=+I, id=1, name="張三", age=26, row_kind="+U"
  4. RowKind=+I, id=1, name="張三", age=26, row_kind="-D"
  • 示例 2:使用 FULL 格式和自定義字段名

使用完整格式輸出 RowKind,並自定義字段名稱。

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MySQL-CDC {
    plugin_output = "cdc_source"
    server-id = 5652
    username = "root"
    password = "your_password"
    table-names = ["mydb.orders"]
    url = "jdbc:mysql://localhost:3306/mydb"
  }
}

transform {
  RowKindExtractor {
    plugin_input = "cdc_source"
    plugin_output = "append_only_data"
    custom_field_name = "operation_type"  # 自定義字段名
    transform_type = FULL                 # 使用完整格式
  }
}

sink {
  Iceberg {
    plugin_input = "append_only_data"
    catalog_name = "iceberg_catalog"
    database = "mydb"
    table = "orders_history"
    # Iceberg 表會包含 operation_type 字段,記錄每條數據的變更類型
  }
}
數據轉換過程:

輸入數據(CDC 格式):
  1. RowKind=+I, order_id=1001, amount=100.00
  2. RowKind=-U, order_id=1001, amount=100.00
  3. RowKind=+U, order_id=1001, amount=150.00
  4. RowKind=-D, order_id=1001, amount=150.00

輸出數據(Append-Only 格式,FULL 格式):
  1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT"
  2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE"
  3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER"
  4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
  • 示例 3:完整的測試示例(使用 FakeSource)

使用 FakeSource 生成測試數據,演示各種 RowKind 的轉換效果。

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    plugin_output = "fake_cdc_data"
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = UPDATE_BEFORE
        fields = [1, "A", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [1, "A_updated", 95]
      },
      {
        kind = UPDATE_BEFORE
        fields = [2, "B", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [2, "B_updated", 98]
      },
      {
        kind = DELETE
        fields = [1, "A_updated", 95]
      }
    ]
  }
}

transform {
  RowKindExtractor {
    plugin_input = "fake_cdc_data"
    plugin_output = "transformed_data"
    custom_field_name = "change_type"
    transform_type = FULL
  }
}

sink {
  Console {
    plugin_input = "transformed_data"
  }
}

預期輸出:

+I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"
user avatar u_13127751 頭像 u_2874575 頭像
2 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.