1. 什麼時候需要“混用”?
- 先用 Table 生態(Catalog、Connector、SQL 函數)做取數/清洗,再回到 DataStream 寫低階算子(自定義定時器、精細狀態)。
- 某些 無狀態歸一化(大小寫、正則清洗、簡單投影)交給 SQL;複雜業務/亂序處理交給 DataStream。
- 歷史補數(有界流)走 Batch 模式;實時(無界流)走 Streaming 模式,但同一套管道儘量複用。
跨 API 會有輕微結構轉換開銷(RowData ↔ Row)。絕大多數場景可以忽略,但對極致延遲敏感需關注。
2. 環境初始化與“誰來執行”
// 統一構建:先 DataStream,再橋接 Table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- DataStream 執行:
env.execute()提交整套已構建的管道。 - Table 執行:
TablePipeline.execute()/executeSql()/StatementSet.execute()只提交“表內 source→sink”閉環。 - 混合:
tEnv.toDataStream(...)或toChangelogStream(...)會物化其 Table 子管道並掛入 DataStream Builder;必須再env.execute()才會跑。
3. 基礎:DataStream ↔ Table(Insert-Only)
3.1 DataStream → Table(插入流)
DataStream<String> ds = env.fromElements("Alice","Bob","John");
// 自動派生列:f0
Table t = tEnv.fromDataStream(ds);
// 註冊成視圖,用 SQL 繼續處理
tEnv.createTemporaryView("InputTable", t);
Table upper = tEnv.sqlQuery("SELECT UPPER(f0) AS name FROM InputTable");
3.2 Table → DataStream(插入流)
// Row(默認)
DataStream<Row> out = tEnv.toDataStream(upper);
out.print();
env.execute();
// 輸出示例:+I[ALICE]、+I[BOB]...
注意:
toDataStream僅支持 append-only 表(只 INSERT)。若含聚合/窗口等會產生更新,需改用toChangelogStream。
4. 高階:Changelog(變更流)處理
4.1 帶更新的 Table → 變更流
// name, SUM(score) 會產生更新
Table result = tEnv.sqlQuery(
"SELECT name, SUM(score) AS s FROM InputTable GROUP BY name");
// 變更流:RowKind 標記 +I/-U/+U/-D
DataStream<Row> changelog = tEnv.toChangelogStream(result);
4.2 變更流 → Table
DataStream<Row> cdc = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
Table t = tEnv.fromChangelogStream(cdc); // 默認接受全量 RowKind
優化:若上游保證 upsert 語義,可
fromChangelogStream(..., schema.withPrimaryKey(...), ChangelogMode.upsert()),減少 50% 的 UPDATE_BEFORE 流量;下游toChangelogStream(..., upsert())同理。
5. 事件時間與水印的橋接
常見三種寫法:
- 基於字段生成行時間 + 策略
Table t = tEnv.fromDataStream(ds,
Schema.newBuilder()
.columnByExpression("rowtime","CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime","rowtime - INTERVAL '10' SECOND")
.build());
- 繼承 DataStream 水印
Table t = tEnv.fromDataStream(ds,
Schema.newBuilder()
.columnByMetadata("rowtime","TIMESTAMP_LTZ(3)")
.watermark("rowtime","SOURCE_WATERMARK()")
.build());
- Table → DataStream 時保留時間戳
toDataStream(table):單一 rowtime 會寫入 DataStream 的 record timestamp,並繼續傳播水印。toChangelogStream(table, Schema.columnByMetadata("rowtime", ...)):也可把時間戳當 metadata 輸出,不佔物理列。
6. 批流一體:Batch Runtime Mode
有界數據(歷史補數)可走 BATCH,同一套邏輯在 STREAMING 下也能跑:
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 切 Streaming:env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Batch 模式要點:
- 源必須有界(如 datagen
number-of-rows、Kafka 設定終止 offset)。 - 表源目前需 insert-only。
- 操作符可能採用 阻塞交換、禁用 checkpoint,最終只產出 insert-only 結果(增量被摺疊)。
實戰策略:先寫 Streaming 版(最通用),再在有界場景切 BATCH 拿更優算子(如 sort-merge join)。
7. 配置與依賴(橋接模塊)
依賴(Java):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>與集羣一致</version>
<scope>provided</scope>
</dependency>
配置順序:儘量在創建 tEnv 之前統一設置 env。
env.setMaxParallelism(256);
env.getCheckpointConfig().setCheckpointingConsistencyMode(EXACTLY_ONCE);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Paris"));
提醒:Scala 隱式轉換雖好用(
org.apache.flink.table.api.bridge.scala._),但易混淆執行邊界;新項目優先 Java。
8. TypeInformation ↔ DataType 對照與坑
- DataStream 的 TypeInformation → Table 的 DataType:自動推斷,但遇到 RAW/Generic(如
Row、不可反射類型)需顯式聲明。 - 常見修復:
ds.map(...).returns(TypeInformation)- Table 側
Schema.newBuilder().column("f0", DataTypes.of(MyPojo.class)) - 結構化顯式聲明:
DataTypes.STRUCTURED(...)。
結論:看 schema 打印。若出現
RAW('User', ...),多半需要顯式類型或自定義序列化。
9. 在一個 Job 裏拼接多條“表內管道”與 DataStream
StreamStatementSet set = tEnv.createStatementSet();
// 純 Table 管道
Table src = tEnv.from(
TableDescriptor.forConnector("datagen")
.option("number-of-rows","3")
.schema(Schema.newBuilder().column("v", DataTypes.INT()).build())
.build());
set.add(src.insertInto(TableDescriptor.forConnector("print").build()));
// DataStream → Table → Sink
DataStream<Integer> ds = env.fromElements(1,2,3);
set.add(tEnv.fromDataStream(ds).insertInto(TableDescriptor.forConnector("print").build()));
// 把所有表內管道“掛”到 DataStream Builder
set.attachAsDataStream();
// 其他 DataStream 分支...
env.fromElements(4,5,6).sinkTo(new DiscardingSink<>());
// 統一提交
env.execute();
10. 典型“融合”食譜(可直接套)
10.1 SQL 預處理 → 自定義低階算子
CREATE TABLE讀 Kafka,SQL 做脱敏/清洗;toDataStream()拿乾淨 Row;keyBy+KeyedProcessFunction做複雜會話/定時器/狀態機;- 最終 sink(JDBC/Elasticsearch)。
10.2 DataStream 產流 → Table 做時間區間 Join → 回 DataStream
- DS 造兩路(用户、訂單)並補上水印;
createTemporaryView(..., Schema.watermark(...))註冊為臨時表;- SQL 做 interval join;
toDataStream()下游繼續自定義聚合/風控規則。
BATCH/STREAMING 兩種模式產出的最終表一致(批模式只會 insert)。
10.3 處理更新的聚合
- 用
toChangelogStream()承接 SUM/COUNT 等會更新的表; - 若目標是 KV 外部存儲,優先 upsert 模式 + 主鍵,降低網絡與落庫寫放大。
11. 上線前檢查清單
邊界與語義
toDataStream是否滿足 append-only?否則改toChangelogStream。- Rowtime/Watermark 是否按需 繼承 Source 或 自行聲明?
-
類型與 Schema
- 避免
RAW/GenericTypeInfo:顯式聲明 DataType 或 TypeInformation。 - Sink 與結果表 列名/類型/可空性 一致,DDL 校驗通過。
性能與穩定性
-
- 多 Sink 使用 StatementSet 合併優化,減少重複掃描與 shuffle。
- 監控:吞吐/延遲/背壓/水印推進/RowKind 統計/狀態大小。
執行與提交
- Table 子管道轉 DataStream 後,記得最終
env.execute()。 - 純 Table 閉環,使用
pipeline.execute()或executeSql();與 DS 無關。
12. 可改造示例:統一管道,批流同源
// 1) 選擇運行模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // or BATCH
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 2) 兩路 DS + 水印 → 註冊為表
DataStream<Row> users = ...
DataStream<Row> orders = ...
tEnv.createTemporaryView("UserTable", users,
Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP_LTZ(3))
.column("uid", DataTypes.INT())
.column("name", DataTypes.STRING())
.watermark("ts", "ts - INTERVAL '1' SECOND")
.build());
tEnv.createTemporaryView("OrderTable", orders,
Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP_LTZ(3))
.column("uid", DataTypes.INT())
.column("amount", DataTypes.INT())
.watermark("ts", "ts - INTERVAL '1' SECOND")
.build());
// 3) SQL:時間區間 join(模式無關,結果一致)
Table joined = tEnv.sqlQuery(
"SELECT U.name, O.amount " +
"FROM UserTable U, OrderTable O " +
"WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");
// 4) 回到 DataStream,繼續自定義處理
DataStream<Row> out = tEnv.toDataStream(joined);
out.keyBy(r -> r.<String>getFieldAs("name"))
.process(new MyDedup()).print();
env.execute();
結語
- 把 Table 當入口與加速器,把 DataStream 當控制面與精細算子;二者自由互轉,讓你兼得開發效率與可控性能。
- 牢牢記住三件事:
- append-only → toDataStream;有更新 → toChangelogStream
- 時間與水印要麼繼承 Source,要麼在 Schema 聲明
- Table → DataStream 後,最終得
env.execute()