1. 項目結構與最小閉環
1.1 建立 TableEnvironment(入口)
import org.apache.flink.table.api.*;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // 或 .inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
如果需要與 DataStream 互操作,使用
StreamTableEnvironment:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 支持流表互轉
1.2 註冊 Source / Sink(Catalog 中的表)
- 臨時表(temporary):僅在當前會話內存在,內存持久。
- 永久表(permanent):依賴外部 Catalog(如 Hive Metastore),跨會話可見。
示例:使用內置 datagen 構造源表;用 blackhole 作為下游(丟棄)。
import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
tEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build())
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
.build());
tEnv.executeSql(
"CREATE TEMPORARY TABLE SinkTable " +
"WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS)"
);
1.3 查詢並輸出(Table API / SQL 任你選)
Table API:
Table t = tEnv.from("SourceTable"); // 掃描
TableResult tr = t.insertInto("SinkTable").execute(); // 直寫 Sink
SQL:
Table t2 = tEnv.sqlQuery("SELECT * FROM SourceTable");
t2.insertInto("SinkTable").execute();
以上完成了從源到匯的“最小閉環”。
2. Catalog 與標識符(Identifier)的那些事
Table 的三段式標識為:catalog.database.table。你可以設置“當前 catalog / database”,省略前兩段:
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
// 註冊到當前命名空間
tEnv.createTemporaryView("exampleView", tEnv.from("SourceTable"));
// 指定數據庫
tEnv.createTemporaryView("other_database.exampleView", tEnv.from("SourceTable"));
// 標識中包含點號時用反引號轉義
tEnv.createTemporaryView("`example.View`", tEnv.from("SourceTable"));
// 完整三段式
tEnv.createTemporaryView("other_catalog.other_database.exampleView",
tEnv.from("SourceTable"));
2.1 Temporary Shadowing(影子覆蓋)
同名臨時表會覆蓋永久表(shadowing)。這對A/B 驗證、脱敏聯調很有用:先在同名臨時表上驗證,確認後刪除臨時表即可切回生產表。
3. 創建表:Virtual(視圖)與 Connector(外部系統)
3.1 視圖(Virtual Table / VIEW)
Table API/SQL 的查詢結果是邏輯計劃,可註冊為臨時視圖:
Table proj = tEnv.from("X").select($("a"), $("b")); // 邏輯查詢
tEnv.createTemporaryView("projectedTable", proj);
注意:視圖不會物化、也不會共享執行,多處引用會分別內聯執行。
3.2 Connector 表(外部系統)
既可用 Table API,也可用 SQL DDL:
final TableDescriptor source = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build())
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
.build();
tEnv.createTable("SourceTableA", source); // 永久表(依 catalog)
tEnv.createTemporaryTable("SourceTableB", source); // 臨時表
// SQL DDL
tEnv.executeSql("CREATE TEMPORARY TABLE MyTable (...) WITH (...)");
4. 查詢 Table:Table API 與 SQL 混搭
4.1 Table API(強類型、鏈式)
Table orders = tEnv.from("Orders");
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName"))
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));
4.2 SQL(字符串、標準兼容)
Table revenue = tEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders WHERE cCountry = 'FRANCE' GROUP BY cID, cName"
);
4.3 混搭方式
- 在 SQL 結果返回的
Table上繼續走 Table API。 - 反之,把 Table API 的結果註冊為視圖,讓 SQL 的
FROM去引用。
5. 落地與執行:Sink、Pipeline、StatementSet
5.1 寫出到 Sink(文件系統示例)
final Schema schema = Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.build();
tEnv.createTemporaryTable(
"CsvSinkTable",
TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/path/to/file")
.format(FormatDescriptor.forFormat("csv").option("field-delimiter", "|").build())
.build()
);
Table result = ...; // 你的查詢結果
TablePipeline pipeline = result.insertInto("CsvSinkTable");
pipeline.printExplain(); // 打印計劃
pipeline.execute(); // 執行
5.2 多 Sink(StatementSet 合併優化)
StatementSet stmt = tEnv.createStatementSet();
Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmt.add(table1.insertInto("MySink1"));
Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmt.add(table2.insertInto("MySink2"));
String plan = stmt.explain(); // 查看合併後的 DAG
stmt.execute(); // 一次性優化並執行
6. 何時“被翻譯並執行”?
- 翻譯為 DataStream 程序的時機:
TableEnvironment.executeSql(...)TablePipeline.execute()Table.execute()(本地 collect)StatementSet.execute()
- Table ↔ DataStream 轉換後,執行由
StreamExecutionEnvironment.execute()驅動(針對流環境)。
7. 查詢優化(Calcite 加持)
Flink 基於/擴展 Calcite 做規則與基於代價的優化,包括但不限於:
- 子查詢去相關(decorrelation)
- 投影/分區裁剪、過濾下推
- 子計劃複用/去重
- 子查詢改寫:
IN/EXISTS→ left semi-join;NOT IN/NOT EXISTS→ left anti-join - 可選 Join 重排序(
table.optimizer.join-reorder-enabled)
高階用法:通過
CalciteConfig自定義優化規則:tEnv.getConfig().setPlannerConfig(...)。
8. 計劃解釋(Explain):看懂三張“圖”
你可以在 Table / StatementSet 上直接 explain(),或用 SQL 的 EXPLAIN:
- 未優化的邏輯計劃(AST)
- 優化後的邏輯計劃
- 物理執行計劃
System.out.println(table.explain()); // 單個表
System.out.println(stmtSet.explain()); // 多 Sink
9. DataStream 集成(一點提示)
StreamTableEnvironment支持fromDataStream/toDataStream互轉;- 轉回 DataStream 後就是“普通”流作業,執行由
env.execute()觸發; - 注意:不同 TableEnvironment 下的表不能互聯(不能直接 join/union)。
10. 工程實踐清單(上線前自查)
命名與 Catalog
- 明確
catalog.database.table,在多 catalog 環境中顯式指定,避免歧義 -
Schema 與兼容性
LIKE ... (EXCLUDING OPTIONS)複用 schema 時,確認 connector 必填項- Sink 與結果表 schema 完全一致(類型、順序、可空性)
性能與穩定性
-
-
- 大作業建議加
explain()與 Web UI 檢查並行度、鏈路與 shuffle
可觀測與排障
語言與 API
11. 參考片段:從源 → 查詢 → 多 Sink → Explain → 執行(可直接改造)
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 1) 註冊兩個文件源與兩個文件 Sink(略:見上文 Csv/Filesystem 示例)
// 2) 查詢
Table s1 = tEnv.from("MySource1").where($("word").like("F%"));
Table s2 = tEnv.from("MySource2");
Table unioned = s1.unionAll(s2);
// 3) 輸出(多 Sink 合併執行)
StatementSet stmt = tEnv.createStatementSet();
stmt.add(s1.insertInto("MySink1"));
stmt.add(unioned.insertInto("MySink2"));
// 4) 打印計劃並執行
System.out.println(stmt.explain());
stmt.execute();
12. 結語
- 記住閉環:TableEnvironment → 註冊表/視圖 → Table API/SQL 查詢 → Sink → Explain → Execute。
- 善用 臨時表 + 影子覆蓋 做聯調與灰度;用 StatementSet 做多寫合併;用 Explain 看穿優化與物理計劃。
- 新項目優先 Java Table API/SQL,提前規避 Scala API 的棄用風險。