Flink 狀態和checkpoint的區別和聯繫(附源碼
- 1. 本質區別:運行時 vs 持久化
- 1.1 State(狀態):運行時的"工作內存"
- 1.2 Checkpoint:狀態的"快照存檔"
- 2. 形象類比
- 3. 源碼層面的關係
- 3.1 CheckpointableKeyedStateBackend:連接兩者的橋樑
- 3.2 StateHandle:Checkpoint 的元數據
- 3.3 HeapKeyedStateBackend:實際的實現
- 4. 完整的生命週期
- 4.1 正常運行時
- 4.2 Checkpoint 觸發時
- 4.3 故障恢復時
- 5 關鍵區別對比表
- 6. 源碼中的協作機制
- 6.1 Checkpoint 選項配置
- 6.2不同類型的 Checkpoint
- 7. 實戰示例:完整流程
- 8. 核心聯繫總結
- 8.1 依賴關係
- 8.2 協作關係
- 8.3 性能權衡
- 8.4 統一抽象
- 9. 關鍵要點
1. 本質區別:運行時 vs 持久化
1.1 State(狀態):運行時的"工作內存"
package org.apache.flink.api.common.state;
import org.apache.flink.annotation.PublicEvolving;
/**
* Interface that different types of partitioned state must implement.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the key of
* the current element. That way, the system can handle stream and state partitioning consistently
* together.
*/
@PublicEvolving
public interface State {
/** Removes the value mapped under the current key. */
void clear();
}
State 的特徵:
- 位置:存儲在 TaskManager 的內存或本地磁盤(RocksDB)
- 目的:算子處理數據時的"工作記憶"
- 訪問:頻繁、實時、微秒級
- 生命週期:作業運行期間一直存在
- 可變性:每處理一條數據可能就會更新
1.2 Checkpoint:狀態的"快照存檔"
@Internal
public interface Snapshotable<S extends StateObject> {
/**
* Operation that writes a snapshot into a stream that is provided by the given {@link
* CheckpointStreamFactory} and returns a @{@link RunnableFuture} that gives a state handle to
* the snapshot. It is up to the implementation if the operation is performed synchronous or
* asynchronous. In the later case, the returned Runnable must be executed first before
* obtaining the handle.
*
* @param checkpointId The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
* @param checkpointOptions Options for how to perform this checkpoint.
* @return A runnable future that will yield a {@link StateObject}.
*/
@Nonnull
RunnableFuture<S> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions)
throws Exception;
}
Checkpoint 的特徵:
- 位置:持久化存儲(HDFS、S3、OSS 等)
- 目的:容錯恢復的"存檔點"
- 訪問:低頻、定期(如每分鐘)
- 生命週期:獨立於作業運行,故障恢復時使用
- 不可變性:一旦完成就不再改變
2. 形象類比
State = 你正在編輯的 Word 文檔(內存中)
↓ 每隔一段時間
Checkpoint = 保存到磁盤的文檔副本(硬盤上)
↓ 如果程序崩潰
Recovery = 從最近的保存恢復(重新加載到內存)
3. 源碼層面的關係
3.1 CheckpointableKeyedStateBackend:連接兩者的橋樑
/**
* Interface that combines both, the {@link KeyedStateBackend} interface, which encapsulates methods
* responsible for keyed state management and the {@link Snapshotable} which tells the system how to
* snapshot the underlying state.
*
* <p><b>NOTE:</b> State backends that need to be notified of completed checkpoints can additionally
* implement the {@link CheckpointListener} interface.
*
* @param <K> Type of the key by which state is keyed.
*/
public interface CheckpointableKeyedStateBackend<K>
extends KeyedStateBackend<K>, Snapshotable<SnapshotResult<KeyedStateHandle>>, Closeable {
/** Returns the key groups which this state backend is responsible for. */
KeyGroupRange getKeyGroupRange();
/**
* Returns a {@link SavepointResources} that can be used by {@link SavepointSnapshotStrategy} to
* write out a savepoint in the common/unified format.
*/
@Nonnull
SavepointResources<K> savepoint() throws Exception;
}
設計理念:
- KeyedStateBackend:管理運行時狀態的讀寫
- Snapshotable:提供狀態快照能力
- CheckpointableKeyedStateBackend:同時具備兩種能力
3.2 StateHandle:Checkpoint 的元數據
/**
* Base for the handles of the checkpointed states in keyed streams. When recovering from failures,
* the handle will be passed to all tasks whose key group ranges overlap with it.
*/
public interface KeyedStateHandle extends CompositeStateHandle {
/** Returns the range of the key groups contained in the state. */
KeyGroupRange getKeyGroupRange();
/**
* Returns a state over a range that is the intersection between this handle's key-group range
* and the provided key-group range.
*
* @param keyGroupRange The key group range to intersect with, will return null if the
* intersection of this handle's key-group and the provided key-group is empty.
*/
@Nullable
KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange);
/**
* Returns a unique state handle id to distinguish with other keyed state handles.
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。