1. 為什麼需要“上下文(Context)”?

算子名稱、並行度這種“靜態配置”可以在構建拓撲時設置;但當前處理的 Key、任務並行度、執行模式、度量上報、定時器/水位線等運行時信息,只能在ProcessFunction 執行期間獲取。
DataStream API 通過統一入口 Runtime Context 把執行引擎的能力暴露給算子。

1.1 Runtime Context 能拿到什麼?

按職能可分為兩類:

NonPartitionedContext(非分區上下文) — 與具體分區/Key 無關

  • JobInfo:Job 名稱、執行模式等
  • TaskInfo:並行度、子任務索引等
  • MetricGroup:度量註冊/上報
  • Watermark Manager:水位線相關

典型時機:open() 初始化、close() 清理

PartitionedContext(分區上下文) — 與當前分區/Key 強相關

  • State Manager:狀態訪問(getState*)、當前 Key(getCurrentKey()
  • ProcessingTime Manager:處理時間定時器、當前處理時間

典型時機:processRecord()、定時器回調觸發

1.2 例子:在 open() 裏讀取並行度和執行模式

new OneInputStreamProcessFunction<String, String>() {
  private transient int parallelism;
  private transient ExecutionMode executionMode;

  @Override
  public void open(NonPartitionedContext<String> ctx) throws Exception {
    parallelism   = ctx.getTaskInfo().getParallelism();
    executionMode = ctx.getJobInfo().getExecutionMode();
  }
}

2. 狀態處理三步走:聲明 → 註冊 → 使用

原則先聲明(define & declare),後使用(get & update)。寫一個有狀態的 ProcessFunction 通常分 3 步:

  1. 定義 State:用 StateDeclaration 定義狀態的名稱RedistributionMode
  2. 註冊 State:在 ProcessFunction#usesStates()顯式聲明所有要用的狀態
  3. 獲取/更新 State:在處理邏輯裏通過 StateManager 獲取並讀寫

2.1 完整示例

private static class StatefulFunction implements OneInputStreamProcessFunction<Long, Long> {
  // Step 1: 定義一個 ListState
  static final StateDeclaration.ListStateDeclaration<Long> LIST_STATE_DECL =
      StateDeclarations.listStateBuilder("example-list-state", TypeDescriptors.LONG).build();

  // Step 2: 在 usesStates 中註冊
  @Override
  public Set<StateDeclaration> usesStates() {
    return Collections.singleton(LIST_STATE_DECL);
  }

  // Step 3: 獲取並更新
  @Override
  public void processRecord(Long record, Collector<Long> out, RuntimeContext ctx) throws Exception {
    ListState<Long> state = ctx.getStateManager().getState(LIST_STATE_DECL);
    state.update(Collections.singletonList(record));
  }
}

3. 定義 State:名字 + 重分配(Redistribution)

StateDeclaration 需要兩類信息:

  • Name:狀態唯一標識
  • RedistributionMode:並行度/分區變化(尤其是 Non-Keyed)時,狀態如何在分區間重新分佈
  • NONE:不支持重分配(並行度變化將不安全)
  • REDISTRIBUTABLE:該狀態可以安全重分配,具體策略由狀態定義決定
  • IDENTICAL:保證不同分區中的狀態恆等(一致),因此無需考慮重分配

經驗法則:

  • Keyed 流:狀態天然綁定到 Key/分區,一般不需要 Redistribute
  • Non-Keyed 流:分區隨並行度變動而變化,強烈建議使用 REDISTRIBUTABLEIDENTICAL 的聲明設計

3.1 支持的狀態類型(對應 StateDeclaration

  • ValueState:單值,可 update(T) / value()
  • ListState:列表,可 add/addAll/get/update(List<T>)
  • ReducingState:基於 ReduceFunction 的聚合單值
  • AggregatingState<IN,OUT>:聚合單值,輸入類型可與聚合結果不同,基於 AggregateFunction
  • MapState<UK,UV>:KV 映射,put/putAll/get/entries/keys/values/isEmpty
  • BroadcastState<K,V>:廣播流上的 KV 狀態(所有並行實例接收相同元素)

3.2 快速構造:StateDeclarationsTypeDescriptors

// ValueState<Long>
ValueStateDeclaration<Long> v = StateDeclarations.valueState("example-value-state", TypeDescriptors.LONG);

// MapState<Long, String>
MapStateDeclaration<Long, String> m =
    StateDeclarations.mapState("example-map-state", TypeDescriptors.LONG, TypeDescriptors.STRING);

// ReducingState<Long>,聚合函數為 sum
ReducingStateDeclaration<Long> r =
    StateDeclarations.reducingState("example-reducing-state", TypeDescriptors.LONG, Long::sum);
  • TypeDescriptors 提供常用類型:INT/LONG/BOOLEAN/STRING/LIST/MAP/...
  • 自定義類型:實現 TypeDescriptor 描述自己的序列化信息

4. 聲明 State:usesStates() 必須覆蓋

@Override
public Set<StateDeclaration> usesStates() { 
  return Set.of( /* 所有將被使用的聲明 */ );
}
  • 不在 usesStates() 中聲明的狀態一律不可用
  • Flink 會在作業編譯期做合法性校驗(詳見 §6),非法聲明會直接報錯,更早暴露問題、避免運行期炸鍋

5. 獲取與更新:StateManager

StateManager 是狀態入口,常用方法:

  • 當前 Key<K> K getCurrentKey()(非 Keyed 流調用會拋 UnsupportedOperationException
  • 獲取狀態
  • getState(...):未註冊/不可用將拋異常
  • getStateOptional(...):返回 Optional,適合“存在即用”場景

示例要點:

@Override
public void processRecord(Event e, Collector<Result> out, RuntimeContext ctx) throws Exception {
  StateManager sm = ctx.getStateManager();
  String userId = sm.getCurrentKey(); // 僅在 Keyed 流合法

  ValueState<UserAgg> agg = sm.getState(USER_AGG_DECL);
  UserAgg cur = agg.value();
  agg.update(update(cur, e));
}

6. 合法性矩陣:哪些輸入/狀態組合會被拒絕?

不同 輸入流類型(Global、Keyed、NonKeyed、Broadcast)對狀態聲明與訪問的允許程度不同。文檔給出了兩張表(單輸入 & 雙輸入),這裏提煉核心規則:

  • Broadcast 輸入:只允許使用 BroadcastState不允許按 Key 訪問狀態(沒有 Key 的概念)
  • Global 輸入:不具備 Key 語義,不能訪問依賴 Key 的狀態(如 getCurrentKey()、Keyed State)
  • Non-Keyed 輸入:可以使用 Non-Keyed 維度的狀態(取決於聲明的 Redistribute 能力);不允許讀取當前 Key
  • Keyed 輸入:可以訪問當前 Key,並使用各類 Keyed State(Value/List/Map/Reducing/Aggregating)

雙輸入(TwoInput):每個輸入邊各自遵循上述規則,以輸入邊維度判定合法性。例如:

  • TwoInputBroadcastStreamProcessFunction:廣播邊能用 BroadcastState;另一個(Keyed/Non-Keyed)邊按其自身規則取用狀態
  • TwoInputNonBroadcastStreamProcessFunction:兩邊均不為 Broadcast,則分別按其 Keyed/NonKeyed/Global 規則校驗

編譯期保護

  • Flink 會在編譯期檢查 usesStates() 聲明的合法性
  • 若聲明瞭與輸入類型不兼容的狀態,作業提交前就會報錯

7. ProcessingTime / Watermark:定時與時間語義補充

  • ProcessingTime Manager
  • 讀取當前處理時間
  • 註冊/刪除處理時間定時器(常用於“每 X 分鐘清理”類邏輯)
  • Watermark Manager
  • 管理水位線推進
  • 驅動事件時間相關定時器(如果你使用事件時間擴展 API)

選型建議:

  • 業務時效以消息時間為準 → 用事件時間 + 水位線/事件定時
  • 更關注“處理節點的牆鍾” → 用處理時間定時

8. 典型設計範式

8.1 Keyed 計數器(ValueState)

static final ValueStateDeclaration<Long> COUNT_DECL =
    StateDeclarations.valueState("cnt", TypeDescriptors.LONG);

@Override public Set<StateDeclaration> usesStates() { return Set.of(COUNT_DECL); }

@Override
public void processRecord(Event e, Collector<Out> out, RuntimeContext ctx) throws Exception {
  ValueState<Long> cnt = ctx.getStateManager().getState(COUNT_DECL);
  long v = Optional.ofNullable(cnt.value()).orElse(0L) + 1;
  cnt.update(v);
  if (v % 1000 == 0) out.collect(new Out(ctx.getStateManager().getCurrentKey(), v));
}

8.2 Non-Keyed 聚合(ReducingState,支持重分配)

static final ReducingStateDeclaration<Long> SUM_DECL =
    StateDeclarations.reducingState("sum", TypeDescriptors.LONG, Long::sum);
// 建議把該聲明設計為 REDISTRIBUTABLE(構建器中配置),以支持並行度變更時的安全遷移

8.3 廣播規則 + 事件主流

  • 廣播邊:BroadcastState<RuleId, Rule> 存規則
  • 事件邊:Keyed 處理,按用户/設備應用規則
  • 合法性:
  • 廣播邊只能訪問 BroadcastState
  • 事件邊可訪問 Keyed State 與 getCurrentKey()

9. 工程化建議與避坑清單

  • 聲明即契約:所有狀態都必須在 usesStates() 中顯式聲明;否則不可獲取
  • Non-Keyed 狀態要考慮並行度變化:優先選 REDISTRIBUTABLE,或保證 IDENTICAL
  • Keyed vs Non-Keyed 分清語義邊界:不要在 Non-Keyed 中調用 getCurrentKey()
  • Broadcast 只配合使用:廣播輸入只用 BroadcastState,不可直接“轉成”其它流
  • 類型描述一致性TypeDescriptors 與業務類的序列化保持一致,避免反序列化失敗
  • 狀態增長控制:必要時配置 TTL/定期清理;長列表/Map 建議分片或外溢
  • 編譯期報錯要重視:非法狀態聲明/訪問,在提交前就會失敗,修掉才是王道
  • 度量與命名:用 MetricGroup 暴露關鍵指標;對每個處理節點 withName,便於排障
  • 時間語義對齊:處理時間定時器 ≠ 事件時間定時器,選錯會出現“遲到/早到”異常

10. 一頁速記(Cheat Sheet)

  • Context
  • NonPartitionedContextopen/close 場景(Job/Task/Metric/Watermark)
  • PartitionedContextprocessRecord/定時器回調(State/CurrentKey/ProcessingTime)
  • State 三步:定義(聲明對象)→ 註冊(usesStates())→ 獲取/更新(StateManager
  • Redistribution:Keyed 通常不需;Non-Keyed 選 REDISTRIBUTABLE/IDENTICAL
  • 合法性:按輸入流類型(Global/Keyed/NonKeyed/Broadcast)判定;非法即編譯期報錯
  • Broadcast 限制:只能用 BroadcastState,且需與另一輸入一同處理