Flink 狀態和checkpoint的區別和聯繫(附源碼

  • 1. 本質區別:運行時 vs 持久化
  • 1.1 State(狀態):運行時的"工作內存"
  • 1.2 Checkpoint:狀態的"快照存檔"
  • 2. 形象類比
  • 3. 源碼層面的關係
  • 3.1 CheckpointableKeyedStateBackend:連接兩者的橋樑
  • 3.2 StateHandle:Checkpoint 的元數據
  • 3.3 HeapKeyedStateBackend:實際的實現
  • 4. 完整的生命週期
  • 4.1 正常運行時
  • 4.2 Checkpoint 觸發時
  • 4.3 故障恢復時
  • 5 關鍵區別對比表
  • 6. 源碼中的協作機制
  • 6.1 Checkpoint 選項配置
  • 6.2不同類型的 Checkpoint
  • 7. 實戰示例:完整流程
  • 8. 核心聯繫總結
  • 8.1 依賴關係
  • 8.2 協作關係
  • 8.3 性能權衡
  • 8.4 統一抽象
  • 9. 關鍵要點

1. 本質區別:運行時 vs 持久化

1.1 State(狀態):運行時的"工作內存"

package org.apache.flink.api.common.state;
import org.apache.flink.annotation.PublicEvolving;
/**
* Interface that different types of partitioned state must implement.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the key of
* the current element. That way, the system can handle stream and state partitioning consistently
* together.
*/
@PublicEvolving
public interface State {

/** Removes the value mapped under the current key. */
void clear();
}

State 的特徵:

  • 位置:存儲在 TaskManager 的內存或本地磁盤(RocksDB)
  • 目的:算子處理數據時的"工作記憶"
  • 訪問:頻繁、實時、微秒級
  • 生命週期:作業運行期間一直存在
  • 可變性:每處理一條數據可能就會更新

1.2 Checkpoint:狀態的"快照存檔"

@Internal
public interface Snapshotable<S extends StateObject> {
  
  /**
  * Operation that writes a snapshot into a stream that is provided by the given {@link
  * CheckpointStreamFactory} and returns a @{@link RunnableFuture} that gives a state handle to
  * the snapshot. It is up to the implementation if the operation is performed synchronous or
  * asynchronous. In the later case, the returned Runnable must be executed first before
  * obtaining the handle.
  *
  * @param checkpointId The ID of the checkpoint.
  * @param timestamp The timestamp of the checkpoint.
  * @param streamFactory The factory that we can use for writing our state to streams.
  * @param checkpointOptions Options for how to perform this checkpoint.
  * @return A runnable future that will yield a {@link StateObject}.
  */
  @Nonnull
  RunnableFuture<S> snapshot(
    long checkpointId,
    long timestamp,
    @Nonnull CheckpointStreamFactory streamFactory,
    @Nonnull CheckpointOptions checkpointOptions)
    throws Exception;
    }

Checkpoint 的特徵:

  • 位置:持久化存儲(HDFS、S3、OSS 等)
  • 目的:容錯恢復的"存檔點"
  • 訪問:低頻、定期(如每分鐘)
  • 生命週期:獨立於作業運行,故障恢復時使用
  • 不可變性:一旦完成就不再改變

2. 形象類比

State = 你正在編輯的 Word 文檔(內存中)
↓ 每隔一段時間
Checkpoint = 保存到磁盤的文檔副本(硬盤上)
↓ 如果程序崩潰
Recovery = 從最近的保存恢復(重新加載到內存)

3. 源碼層面的關係

3.1 CheckpointableKeyedStateBackend:連接兩者的橋樑

/**
* Interface that combines both, the {@link KeyedStateBackend} interface, which encapsulates methods
* responsible for keyed state management and the {@link Snapshotable} which tells the system how to
* snapshot the underlying state.
*
* <p><b>NOTE:</b> State backends that need to be notified of completed checkpoints can additionally
* implement the {@link CheckpointListener} interface.
*
* @param <K> Type of the key by which state is keyed.
*/
public interface CheckpointableKeyedStateBackend<K>
extends KeyedStateBackend<K>, Snapshotable<SnapshotResult<KeyedStateHandle>>, Closeable {
  
  /** Returns the key groups which this state backend is responsible for. */
  KeyGroupRange getKeyGroupRange();
  /**
  * Returns a {@link SavepointResources} that can be used by {@link SavepointSnapshotStrategy} to
  * write out a savepoint in the common/unified format.
  */
  @Nonnull
  SavepointResources<K> savepoint() throws Exception;
    }

設計理念:

  • KeyedStateBackend:管理運行時狀態的讀寫
  • Snapshotable:提供狀態快照能力
  • CheckpointableKeyedStateBackend:同時具備兩種能力

3.2 StateHandle:Checkpoint 的元數據

/**
* Base for the handles of the checkpointed states in keyed streams. When recovering from failures,
* the handle will be passed to all tasks whose key group ranges overlap with it.
*/
public interface KeyedStateHandle extends CompositeStateHandle {

/** Returns the range of the key groups contained in the state. */
KeyGroupRange getKeyGroupRange();
/**
* Returns a state over a range that is the intersection between this handle's key-group range
* and the provided key-group range.
*
* @param keyGroupRange The key group range to intersect with, will return null if the
*     intersection of this handle's key-group and the provided key-group is empty.
*/
@Nullable
KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange);
/**
* Returns a unique state handle id to distinguish with other keyed state handles.