1. 為什麼需要“上下文(Context)”?
算子名稱、並行度這種“靜態配置”可以在構建拓撲時設置;但當前處理的 Key、任務並行度、執行模式、度量上報、定時器/水位線等運行時信息,只能在ProcessFunction 執行期間獲取。
DataStream API 通過統一入口 Runtime Context 把執行引擎的能力暴露給算子。
1.1 Runtime Context 能拿到什麼?
按職能可分為兩類:
NonPartitionedContext(非分區上下文) — 與具體分區/Key 無關
- JobInfo:Job 名稱、執行模式等
- TaskInfo:並行度、子任務索引等
- MetricGroup:度量註冊/上報
- Watermark Manager:水位線相關
典型時機:
open()初始化、close()清理
PartitionedContext(分區上下文) — 與當前分區/Key 強相關
- State Manager:狀態訪問(
getState*)、當前 Key(getCurrentKey()) - ProcessingTime Manager:處理時間定時器、當前處理時間
典型時機:
processRecord()、定時器回調觸發
1.2 例子:在 open() 裏讀取並行度和執行模式
new OneInputStreamProcessFunction<String, String>() {
private transient int parallelism;
private transient ExecutionMode executionMode;
@Override
public void open(NonPartitionedContext<String> ctx) throws Exception {
parallelism = ctx.getTaskInfo().getParallelism();
executionMode = ctx.getJobInfo().getExecutionMode();
}
}
2. 狀態處理三步走:聲明 → 註冊 → 使用
原則:先聲明(define & declare),後使用(get & update)。寫一個有狀態的 ProcessFunction 通常分 3 步:
- 定義 State:用
StateDeclaration定義狀態的名稱與RedistributionMode - 註冊 State:在
ProcessFunction#usesStates()中顯式聲明所有要用的狀態 - 獲取/更新 State:在處理邏輯裏通過
StateManager獲取並讀寫
2.1 完整示例
private static class StatefulFunction implements OneInputStreamProcessFunction<Long, Long> {
// Step 1: 定義一個 ListState
static final StateDeclaration.ListStateDeclaration<Long> LIST_STATE_DECL =
StateDeclarations.listStateBuilder("example-list-state", TypeDescriptors.LONG).build();
// Step 2: 在 usesStates 中註冊
@Override
public Set<StateDeclaration> usesStates() {
return Collections.singleton(LIST_STATE_DECL);
}
// Step 3: 獲取並更新
@Override
public void processRecord(Long record, Collector<Long> out, RuntimeContext ctx) throws Exception {
ListState<Long> state = ctx.getStateManager().getState(LIST_STATE_DECL);
state.update(Collections.singletonList(record));
}
}
3. 定義 State:名字 + 重分配(Redistribution)
StateDeclaration 需要兩類信息:
- Name:狀態唯一標識
- RedistributionMode:並行度/分區變化(尤其是 Non-Keyed)時,狀態如何在分區間重新分佈
NONE:不支持重分配(並行度變化將不安全)REDISTRIBUTABLE:該狀態可以安全重分配,具體策略由狀態定義決定IDENTICAL:保證不同分區中的狀態恆等(一致),因此無需考慮重分配
經驗法則:
- Keyed 流:狀態天然綁定到 Key/分區,一般不需要 Redistribute
- Non-Keyed 流:分區隨並行度變動而變化,強烈建議使用
REDISTRIBUTABLE或IDENTICAL的聲明設計
3.1 支持的狀態類型(對應 StateDeclaration)
- ValueState:單值,可
update(T)/value() - ListState:列表,可
add/addAll/get/update(List<T>) - ReducingState:基於
ReduceFunction的聚合單值 - AggregatingState<IN,OUT>:聚合單值,輸入類型可與聚合結果不同,基於
AggregateFunction - MapState<UK,UV>:KV 映射,
put/putAll/get/entries/keys/values/isEmpty - BroadcastState<K,V>:廣播流上的 KV 狀態(所有並行實例接收相同元素)
3.2 快速構造:StateDeclarations 與 TypeDescriptors
// ValueState<Long>
ValueStateDeclaration<Long> v = StateDeclarations.valueState("example-value-state", TypeDescriptors.LONG);
// MapState<Long, String>
MapStateDeclaration<Long, String> m =
StateDeclarations.mapState("example-map-state", TypeDescriptors.LONG, TypeDescriptors.STRING);
// ReducingState<Long>,聚合函數為 sum
ReducingStateDeclaration<Long> r =
StateDeclarations.reducingState("example-reducing-state", TypeDescriptors.LONG, Long::sum);
TypeDescriptors提供常用類型:INT/LONG/BOOLEAN/STRING/LIST/MAP/...- 自定義類型:實現
TypeDescriptor描述自己的序列化信息
4. 聲明 State:usesStates() 必須覆蓋
@Override
public Set<StateDeclaration> usesStates() {
return Set.of( /* 所有將被使用的聲明 */ );
}
- 不在
usesStates()中聲明的狀態一律不可用 - Flink 會在作業編譯期做合法性校驗(詳見 §6),非法聲明會直接報錯,更早暴露問題、避免運行期炸鍋
5. 獲取與更新:StateManager
StateManager 是狀態入口,常用方法:
- 當前 Key:
<K> K getCurrentKey()(非 Keyed 流調用會拋UnsupportedOperationException) - 獲取狀態:
getState(...):未註冊/不可用將拋異常getStateOptional(...):返回Optional,適合“存在即用”場景
示例要點:
@Override
public void processRecord(Event e, Collector<Result> out, RuntimeContext ctx) throws Exception {
StateManager sm = ctx.getStateManager();
String userId = sm.getCurrentKey(); // 僅在 Keyed 流合法
ValueState<UserAgg> agg = sm.getState(USER_AGG_DECL);
UserAgg cur = agg.value();
agg.update(update(cur, e));
}
6. 合法性矩陣:哪些輸入/狀態組合會被拒絕?
不同 輸入流類型(Global、Keyed、NonKeyed、Broadcast)對狀態聲明與訪問的允許程度不同。文檔給出了兩張表(單輸入 & 雙輸入),這裏提煉核心規則:
- Broadcast 輸入:只允許使用 BroadcastState;不允許按 Key 訪問狀態(沒有 Key 的概念)
- Global 輸入:不具備 Key 語義,不能訪問依賴 Key 的狀態(如
getCurrentKey()、Keyed State) - Non-Keyed 輸入:可以使用 Non-Keyed 維度的狀態(取決於聲明的 Redistribute 能力);不允許讀取當前 Key
- Keyed 輸入:可以訪問當前 Key,並使用各類 Keyed State(Value/List/Map/Reducing/Aggregating)
雙輸入(TwoInput):每個輸入邊各自遵循上述規則,以輸入邊維度判定合法性。例如:
TwoInputBroadcastStreamProcessFunction:廣播邊能用BroadcastState;另一個(Keyed/Non-Keyed)邊按其自身規則取用狀態TwoInputNonBroadcastStreamProcessFunction:兩邊均不為 Broadcast,則分別按其Keyed/NonKeyed/Global規則校驗
編譯期保護:
- Flink 會在編譯期檢查
usesStates()聲明的合法性; - 若聲明瞭與輸入類型不兼容的狀態,作業提交前就會報錯。
7. ProcessingTime / Watermark:定時與時間語義補充
- ProcessingTime Manager:
- 讀取當前處理時間
- 註冊/刪除處理時間定時器(常用於“每 X 分鐘清理”類邏輯)
- Watermark Manager:
- 管理水位線推進
- 驅動事件時間相關定時器(如果你使用事件時間擴展 API)
選型建議:
- 業務時效以消息時間為準 → 用事件時間 + 水位線/事件定時
- 更關注“處理節點的牆鍾” → 用處理時間定時
8. 典型設計範式
8.1 Keyed 計數器(ValueState)
static final ValueStateDeclaration<Long> COUNT_DECL =
StateDeclarations.valueState("cnt", TypeDescriptors.LONG);
@Override public Set<StateDeclaration> usesStates() { return Set.of(COUNT_DECL); }
@Override
public void processRecord(Event e, Collector<Out> out, RuntimeContext ctx) throws Exception {
ValueState<Long> cnt = ctx.getStateManager().getState(COUNT_DECL);
long v = Optional.ofNullable(cnt.value()).orElse(0L) + 1;
cnt.update(v);
if (v % 1000 == 0) out.collect(new Out(ctx.getStateManager().getCurrentKey(), v));
}
8.2 Non-Keyed 聚合(ReducingState,支持重分配)
static final ReducingStateDeclaration<Long> SUM_DECL =
StateDeclarations.reducingState("sum", TypeDescriptors.LONG, Long::sum);
// 建議把該聲明設計為 REDISTRIBUTABLE(構建器中配置),以支持並行度變更時的安全遷移
8.3 廣播規則 + 事件主流
- 廣播邊:
BroadcastState<RuleId, Rule>存規則 - 事件邊:Keyed 處理,按用户/設備應用規則
- 合法性:
- 廣播邊只能訪問
BroadcastState - 事件邊可訪問 Keyed State 與
getCurrentKey()
9. 工程化建議與避坑清單
- 聲明即契約:所有狀態都必須在
usesStates()中顯式聲明;否則不可獲取 - Non-Keyed 狀態要考慮並行度變化:優先選
REDISTRIBUTABLE,或保證IDENTICAL - Keyed vs Non-Keyed 分清語義邊界:不要在 Non-Keyed 中調用
getCurrentKey() - Broadcast 只配合使用:廣播輸入只用
BroadcastState,不可直接“轉成”其它流 - 類型描述一致性:
TypeDescriptors與業務類的序列化保持一致,避免反序列化失敗 - 狀態增長控制:必要時配置 TTL/定期清理;長列表/Map 建議分片或外溢
- 編譯期報錯要重視:非法狀態聲明/訪問,在提交前就會失敗,修掉才是王道
- 度量與命名:用
MetricGroup暴露關鍵指標;對每個處理節點withName,便於排障 - 時間語義對齊:處理時間定時器 ≠ 事件時間定時器,選錯會出現“遲到/早到”異常
10. 一頁速記(Cheat Sheet)
- Context:
NonPartitionedContext:open/close場景(Job/Task/Metric/Watermark)PartitionedContext:processRecord/定時器回調(State/CurrentKey/ProcessingTime)
- State 三步:定義(聲明對象)→ 註冊(
usesStates())→ 獲取/更新(StateManager) - Redistribution:Keyed 通常不需;Non-Keyed 選
REDISTRIBUTABLE/IDENTICAL - 合法性:按輸入流類型(Global/Keyed/NonKeyed/Broadcast)判定;非法即編譯期報錯
- Broadcast 限制:只能用
BroadcastState,且需與另一輸入一同處理