1. 什麼時候需要“混用”?

  1. 先用 Table 生態(Catalog、Connector、SQL 函數)做取數/清洗,再回到 DataStream 寫低階算子(自定義定時器、精細狀態)。
  2. 某些 無狀態歸一化(大小寫、正則清洗、簡單投影)交給 SQL;複雜業務/亂序處理交給 DataStream。
  3. 歷史補數(有界流)走 Batch 模式;實時(無界流)走 Streaming 模式,但同一套管道儘量複用。

跨 API 會有輕微結構轉換開銷(RowData ↔ Row)。絕大多數場景可以忽略,但對極致延遲敏感需關注。

2. 環境初始化與“誰來執行”

// 統一構建:先 DataStream,再橋接 Table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  • DataStream 執行env.execute() 提交整套已構建的管道。
  • Table 執行TablePipeline.execute() / executeSql() / StatementSet.execute() 只提交“表內 source→sink”閉環。
  • 混合tEnv.toDataStream(...)toChangelogStream(...)物化其 Table 子管道並掛入 DataStream Builder;必須再 env.execute() 才會跑。

3. 基礎:DataStream ↔ Table(Insert-Only)

3.1 DataStream → Table(插入流)

DataStream<String> ds = env.fromElements("Alice","Bob","John");

// 自動派生列:f0
Table t = tEnv.fromDataStream(ds);

// 註冊成視圖,用 SQL 繼續處理
tEnv.createTemporaryView("InputTable", t);
Table upper = tEnv.sqlQuery("SELECT UPPER(f0) AS name FROM InputTable");

3.2 Table → DataStream(插入流)

// Row(默認)
DataStream<Row> out = tEnv.toDataStream(upper);
out.print();
env.execute();
// 輸出示例:+I[ALICE]、+I[BOB]...

注意toDataStream 僅支持 append-only 表(只 INSERT)。若含聚合/窗口等會產生更新,需改用 toChangelogStream

4. 高階:Changelog(變更流)處理

4.1 帶更新的 Table → 變更流

// name, SUM(score) 會產生更新
Table result = tEnv.sqlQuery(
  "SELECT name, SUM(score) AS s FROM InputTable GROUP BY name");

// 變更流:RowKind 標記 +I/-U/+U/-D
DataStream<Row> changelog = tEnv.toChangelogStream(result);

4.2 變更流 → Table

DataStream<Row> cdc = env.fromElements(
  Row.ofKind(RowKind.INSERT, "Alice", 12),
  Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));

Table t = tEnv.fromChangelogStream(cdc); // 默認接受全量 RowKind

優化:若上游保證 upsert 語義,可 fromChangelogStream(..., schema.withPrimaryKey(...), ChangelogMode.upsert()),減少 50% 的 UPDATE_BEFORE 流量;下游 toChangelogStream(..., upsert()) 同理。

5. 事件時間與水印的橋接

常見三種寫法:

  1. 基於字段生成行時間 + 策略
Table t = tEnv.fromDataStream(ds,
  Schema.newBuilder()
    .columnByExpression("rowtime","CAST(event_time AS TIMESTAMP_LTZ(3))")
    .watermark("rowtime","rowtime - INTERVAL '10' SECOND")
    .build());
  1. 繼承 DataStream 水印
Table t = tEnv.fromDataStream(ds,
  Schema.newBuilder()
    .columnByMetadata("rowtime","TIMESTAMP_LTZ(3)")
    .watermark("rowtime","SOURCE_WATERMARK()")
    .build());
  1. Table → DataStream 時保留時間戳
  • toDataStream(table):單一 rowtime 會寫入 DataStream 的 record timestamp,並繼續傳播水印
  • toChangelogStream(table, Schema.columnByMetadata("rowtime", ...)):也可把時間戳當 metadata 輸出,不佔物理列。

6. 批流一體:Batch Runtime Mode

有界數據(歷史補數)可走 BATCH,同一套邏輯在 STREAMING 下也能跑:

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 切 Streaming:env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Batch 模式要點

  • 源必須有界(如 datagen number-of-rows、Kafka 設定終止 offset)。
  • 表源目前需 insert-only
  • 操作符可能採用 阻塞交換、禁用 checkpoint,最終只產出 insert-only 結果(增量被摺疊)。

實戰策略:先寫 Streaming 版(最通用),再在有界場景切 BATCH 拿更優算子(如 sort-merge join)。

7. 配置與依賴(橋接模塊)

依賴(Java)

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  <version>與集羣一致</version>
  <scope>provided</scope>
</dependency>

配置順序儘量在創建 tEnv 之前統一設置 env

env.setMaxParallelism(256);
env.getCheckpointConfig().setCheckpointingConsistencyMode(EXACTLY_ONCE);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Paris"));

提醒:Scala 隱式轉換雖好用(org.apache.flink.table.api.bridge.scala._),但易混淆執行邊界;新項目優先 Java

8. TypeInformation ↔ DataType 對照與坑

  • DataStream 的 TypeInformation → Table 的 DataType:自動推斷,但遇到 RAW/Generic(如 Row、不可反射類型)需顯式聲明
  • 常見修復:
  • ds.map(...).returns(TypeInformation)
  • Table 側 Schema.newBuilder().column("f0", DataTypes.of(MyPojo.class))
  • 結構化顯式聲明:DataTypes.STRUCTURED(...)

結論:看 schema 打印。若出現 RAW('User', ...),多半需要顯式類型或自定義序列化。

9. 在一個 Job 裏拼接多條“表內管道”與 DataStream

StreamStatementSet set = tEnv.createStatementSet();

// 純 Table 管道
Table src = tEnv.from(
  TableDescriptor.forConnector("datagen")
    .option("number-of-rows","3")
    .schema(Schema.newBuilder().column("v", DataTypes.INT()).build())
    .build());
set.add(src.insertInto(TableDescriptor.forConnector("print").build()));

// DataStream → Table → Sink
DataStream<Integer> ds = env.fromElements(1,2,3);
set.add(tEnv.fromDataStream(ds).insertInto(TableDescriptor.forConnector("print").build()));

// 把所有表內管道“掛”到 DataStream Builder
set.attachAsDataStream();

// 其他 DataStream 分支...
env.fromElements(4,5,6).sinkTo(new DiscardingSink<>());

// 統一提交
env.execute();

10. 典型“融合”食譜(可直接套)

10.1 SQL 預處理 → 自定義低階算子

  1. CREATE TABLE 讀 Kafka,SQL 做脱敏/清洗;
  2. toDataStream() 拿乾淨 Row;
  3. keyBy + KeyedProcessFunction 做複雜會話/定時器/狀態機;
  4. 最終 sink(JDBC/Elasticsearch)。

10.2 DataStream 產流 → Table 做時間區間 Join → 回 DataStream

  1. DS 造兩路(用户、訂單)並補上水印;
  2. createTemporaryView(..., Schema.watermark(...)) 註冊為臨時表;
  3. SQL 做 interval join
  4. toDataStream() 下游繼續自定義聚合/風控規則。

BATCH/STREAMING 兩種模式產出的最終表一致(批模式只會 insert)。

10.3 處理更新的聚合

  • toChangelogStream() 承接 SUM/COUNT 等會更新的表;
  • 若目標是 KV 外部存儲,優先 upsert 模式 + 主鍵,降低網絡與落庫寫放大。

11. 上線前檢查清單

邊界與語義

  • toDataStream 是否滿足 append-only?否則改 toChangelogStream
  • Rowtime/Watermark 是否按需 繼承 Source自行聲明

類型與 Schema

  • 避免 RAW/GenericTypeInfo:顯式聲明 DataType 或 TypeInformation。
  • Sink 與結果表 列名/類型/可空性 一致,DDL 校驗通過。

性能與穩定性


  • 多 Sink 使用 StatementSet 合併優化,減少重複掃描與 shuffle。
  • 監控:吞吐/延遲/背壓/水印推進/RowKind 統計/狀態大小

執行與提交

  • Table 子管道轉 DataStream 後,記得最終 env.execute()
  • 純 Table 閉環,使用 pipeline.execute()executeSql();與 DS 無關。

12. 可改造示例:統一管道,批流同源

// 1) 選擇運行模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // or BATCH
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 2) 兩路 DS + 水印 → 註冊為表
DataStream<Row> users = ...
DataStream<Row> orders = ...
tEnv.createTemporaryView("UserTable", users,
  Schema.newBuilder()
    .column("ts", DataTypes.TIMESTAMP_LTZ(3))
    .column("uid", DataTypes.INT())
    .column("name", DataTypes.STRING())
    .watermark("ts", "ts - INTERVAL '1' SECOND")
    .build());

tEnv.createTemporaryView("OrderTable", orders,
  Schema.newBuilder()
    .column("ts", DataTypes.TIMESTAMP_LTZ(3))
    .column("uid", DataTypes.INT())
    .column("amount", DataTypes.INT())
    .watermark("ts", "ts - INTERVAL '1' SECOND")
    .build());

// 3) SQL:時間區間 join(模式無關,結果一致)
Table joined = tEnv.sqlQuery(
 "SELECT U.name, O.amount " +
 "FROM UserTable U, OrderTable O " +
 "WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");

// 4) 回到 DataStream,繼續自定義處理
DataStream<Row> out = tEnv.toDataStream(joined);
out.keyBy(r -> r.<String>getFieldAs("name"))
   .process(new MyDedup()).print();

env.execute();

結語

  • 把 Table 當入口與加速器,把 DataStream 當控制面與精細算子;二者自由互轉,讓你兼得開發效率可控性能
  • 牢牢記住三件事:
  1. append-only → toDataStream;有更新 → toChangelogStream
  2. 時間與水印要麼繼承 Source,要麼在 Schema 聲明
  3. Table → DataStream 後,最終得 env.execute()