1. 項目結構與最小閉環

1.1 建立 TableEnvironment(入口)

import org.apache.flink.table.api.*;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()   // 或 .inBatchMode()
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

如果需要與 DataStream 互操作,使用 StreamTableEnvironment

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 支持流表互轉

1.2 註冊 Source / Sink(Catalog 中的表)

  • 臨時表(temporary):僅在當前會話內存在,內存持久。
  • 永久表(permanent):依賴外部 Catalog(如 Hive Metastore),跨會話可見。

示例:使用內置 datagen 構造源表;用 blackhole 作為下游(丟棄)。

import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;

tEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
    .schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build())
    .option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
    .build());

tEnv.executeSql(
    "CREATE TEMPORARY TABLE SinkTable " +
    "WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS)"
);

1.3 查詢並輸出(Table API / SQL 任你選)

Table API:

Table t = tEnv.from("SourceTable");  // 掃描
TableResult tr = t.insertInto("SinkTable").execute(); // 直寫 Sink

SQL:

Table t2 = tEnv.sqlQuery("SELECT * FROM SourceTable");
t2.insertInto("SinkTable").execute();

以上完成了從源到匯的“最小閉環”。

2. Catalog 與標識符(Identifier)的那些事

Table 的三段式標識為:catalog.database.table。你可以設置“當前 catalog / database”,省略前兩段:

tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");

// 註冊到當前命名空間
tEnv.createTemporaryView("exampleView", tEnv.from("SourceTable"));

// 指定數據庫
tEnv.createTemporaryView("other_database.exampleView", tEnv.from("SourceTable"));

// 標識中包含點號時用反引號轉義
tEnv.createTemporaryView("`example.View`", tEnv.from("SourceTable"));

// 完整三段式
tEnv.createTemporaryView("other_catalog.other_database.exampleView",
                         tEnv.from("SourceTable"));

2.1 Temporary Shadowing(影子覆蓋)

同名臨時表會覆蓋永久表(shadowing)。這對A/B 驗證、脱敏聯調很有用:先在同名臨時表上驗證,確認後刪除臨時表即可切回生產表。

3. 創建表:Virtual(視圖)與 Connector(外部系統)

3.1 視圖(Virtual Table / VIEW)

Table API/SQL 的查詢結果是邏輯計劃,可註冊為臨時視圖

Table proj = tEnv.from("X").select($("a"), $("b")); // 邏輯查詢
tEnv.createTemporaryView("projectedTable", proj);

注意:視圖不會物化、也不會共享執行,多處引用會分別內聯執行

3.2 Connector 表(外部系統)

既可用 Table API,也可用 SQL DDL:

final TableDescriptor source = TableDescriptor.forConnector("datagen")
    .schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build())
    .option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
    .build();

tEnv.createTable("SourceTableA", source);           // 永久表(依 catalog)
tEnv.createTemporaryTable("SourceTableB", source);  // 臨時表

// SQL DDL
tEnv.executeSql("CREATE TEMPORARY TABLE MyTable (...) WITH (...)");

4. 查詢 Table:Table API 與 SQL 混搭

4.1 Table API(強類型、鏈式)

Table orders = tEnv.from("Orders");

Table revenue = orders
  .filter($("cCountry").isEqual("FRANCE"))
  .groupBy($("cID"), $("cName"))
  .select($("cID"), $("cName"), $("revenue").sum().as("revSum"));

4.2 SQL(字符串、標準兼容)

Table revenue = tEnv.sqlQuery(
  "SELECT cID, cName, SUM(revenue) AS revSum " +
  "FROM Orders WHERE cCountry = 'FRANCE' GROUP BY cID, cName"
);

4.3 混搭方式

  • 在 SQL 結果返回的 Table 上繼續走 Table API。
  • 反之,把 Table API 的結果註冊為視圖,讓 SQL 的 FROM 去引用。

5. 落地與執行:Sink、Pipeline、StatementSet

5.1 寫出到 Sink(文件系統示例)

final Schema schema = Schema.newBuilder()
    .column("a", DataTypes.INT())
    .column("b", DataTypes.STRING())
    .column("c", DataTypes.BIGINT())
    .build();

tEnv.createTemporaryTable(
    "CsvSinkTable",
    TableDescriptor.forConnector("filesystem")
        .schema(schema)
        .option("path", "/path/to/file")
        .format(FormatDescriptor.forFormat("csv").option("field-delimiter", "|").build())
        .build()
);

Table result = ...; // 你的查詢結果

TablePipeline pipeline = result.insertInto("CsvSinkTable");
pipeline.printExplain();   // 打印計劃
pipeline.execute();        // 執行

5.2 多 Sink(StatementSet 合併優化)

StatementSet stmt = tEnv.createStatementSet();

Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmt.add(table1.insertInto("MySink1"));

Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmt.add(table2.insertInto("MySink2"));

String plan = stmt.explain(); // 查看合併後的 DAG
stmt.execute();               // 一次性優化並執行

6. 何時“被翻譯並執行”?

  • 翻譯為 DataStream 程序的時機:
  • TableEnvironment.executeSql(...)
  • TablePipeline.execute()
  • Table.execute()(本地 collect)
  • StatementSet.execute()
  • Table ↔ DataStream 轉換後,執行由 StreamExecutionEnvironment.execute() 驅動(針對流環境)。

7. 查詢優化(Calcite 加持)

Flink 基於/擴展 Calcite 做規則與基於代價的優化,包括但不限於:

  • 子查詢去相關(decorrelation)
  • 投影/分區裁剪過濾下推
  • 子計劃複用/去重
  • 子查詢改寫:IN/EXISTS → left semi-join;NOT IN/NOT EXISTS → left anti-join
  • 可選 Join 重排序(table.optimizer.join-reorder-enabled

高階用法:通過 CalciteConfig 自定義優化規則:tEnv.getConfig().setPlannerConfig(...)

8. 計劃解釋(Explain):看懂三張“圖”

你可以在 Table / StatementSet 上直接 explain(),或用 SQL 的 EXPLAIN

  • 未優化的邏輯計劃(AST)
  • 優化後的邏輯計劃
  • 物理執行計劃
System.out.println(table.explain());  // 單個表
System.out.println(stmtSet.explain()); // 多 Sink

9. DataStream 集成(一點提示)

  • StreamTableEnvironment 支持 fromDataStream / toDataStream 互轉;
  • 轉回 DataStream 後就是“普通”流作業,執行由 env.execute() 觸發;
  • 注意:不同 TableEnvironment 下的表不能互聯(不能直接 join/union)。

10. 工程實踐清單(上線前自查)

命名與 Catalog

  • 明確 catalog.database.table,在多 catalog 環境中顯式指定,避免歧義

Schema 與兼容性

  • LIKE ... (EXCLUDING OPTIONS) 複用 schema 時,確認 connector 必填項
  • Sink 與結果表 schema 完全一致(類型、順序、可空性)

性能與穩定性



  • 大作業建議加 explain() 與 Web UI 檢查並行度、鏈路與 shuffle

可觀測與排障



語言與 API


11. 參考片段:從源 → 查詢 → 多 Sink → Explain → 執行(可直接改造)

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

// 1) 註冊兩個文件源與兩個文件 Sink(略:見上文 Csv/Filesystem 示例)

// 2) 查詢
Table s1 = tEnv.from("MySource1").where($("word").like("F%"));
Table s2 = tEnv.from("MySource2");
Table unioned = s1.unionAll(s2);

// 3) 輸出(多 Sink 合併執行)
StatementSet stmt = tEnv.createStatementSet();
stmt.add(s1.insertInto("MySink1"));
stmt.add(unioned.insertInto("MySink2"));

// 4) 打印計劃並執行
System.out.println(stmt.explain());
stmt.execute();

12. 結語

  • 記住閉環:TableEnvironment → 註冊表/視圖 → Table API/SQL 查詢 → Sink → Explain → Execute
  • 善用 臨時表 + 影子覆蓋 做聯調與灰度;用 StatementSet 做多寫合併;用 Explain 看穿優化與物理計劃。
  • 新項目優先 Java Table API/SQL,提前規避 Scala API 的棄用風險。