动态

详情 返回 返回

從零開始學Flink:流批一體的執行模式 - 动态 详情

在大數據處理領域,批處理和流處理曾經被視為兩種截然不同的範式。然而,隨着Apache Flink的出現,這種界限正在逐漸模糊。Flink的一個核心特性是其批流一體的架構設計,允許用户使用統一的API和執行引擎處理有界數據(批處理)和無界數據(流處理)。本文將深入探討Flink的執行模式(Execution Mode),特別是在Flink 1.20.1版本中對批處理和流處理模式的支持和優化。

一、Flink執行模式概述

1. 執行模式的基本概念

Flink的執行模式決定了作業如何被調度和執行。在Flink 1.12及以後的版本中,引入了統一的流批處理執行模式,主要包括以下三種模式:

  • STREAMING模式: 傳統的流處理執行模式,適用於處理無界數據流
  • BATCH模式: 專門為有界數據優化的批處理執行模式
  • AUTOMATIC模式: 自動根據數據源類型選擇執行模式

這三種模式的引入使得Flink能夠在同一套API上提供最佳的批處理和流處理性能。

2. 執行模式的演進歷程

Flink的執行模式經歷了以下幾個關鍵階段:

  1. 早期版本: Flink最初專注於流處理,但提供了對批處理的支持
  2. Flink 1.12: 引入了全新的批處理執行模式(BATCH模式)
  3. Flink 1.14: 增強了批處理模式的性能和功能
  4. Flink 1.20.1: 進一步優化了批流一體架構,改進了執行模式的自動選擇機制

二、Execution Mode的技術原理

1. 兩種執行模式的核心區別

雖然Flink使用相同的API和代碼結構,但BATCH和STREAMING模式在內部執行方式上存在顯著差異:

特性 STREAMING模式 BATCH模式
調度策略 連續流式調度 批處理調度,類似於MapReduce
資源利用 持續佔用資源 任務完成後釋放資源
優化技術 流式優化 批處理優化,如查詢優化、物化視圖
處理延遲 毫秒級延遲 較高延遲,但吞吐量更大
適用場景 實時數據處理 離線數據分析

2. 批流一體的設計理念

Flink的批流一體架構基於以下核心理念:

  • 統一的API: 無論批處理還是流處理,都使用相同的DataStream API
  • 統一的狀態管理: 共享相同的狀態後端和檢查點機制
  • 統一的容錯機制: 基於檢查點的故障恢復
  • 統一的優化器: 但針對不同執行模式應用不同的優化策略

三、配置和使用Execution Mode

1. 環境準備

首先,確保你已經設置了正確的依賴:

dependencies {
    // Flink核心依賴
    implementation 'org.apache.flink:flink_core:1.20.1'
    implementation 'org.apache.flink:flink-streaming-java:1.20.1'
    implementation 'org.apache.flink:flink-clients:1.20.1'
    implementation 'org.apache.flink:flink-connector-files:1.20.1'
    implementation 'org.apache.flink:flink-connector-kafka:3.4.0-1.20'
}

2. 在代碼中設置執行模式

在Flink 1.20.1中,可以通過以下方式設置執行模式:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ExecutionModeExample {
    public static void main(String[] args) throws Exception {
        // 創建執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 設置執行模式為BATCH
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        
        // 或者設置為STREAMING
        // env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        
        // 或者設置為AUTOMATIC(根據數據源自動選擇)
        // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        
        // 後續代碼...
    }
}

3. 通過命令行參數設置

也可以通過命令行參數覆蓋代碼中的設置:

bin/flink run -Dexecution.runtime-mode=BATCH -c com.example.ExecutionModeExample your-jar-file.jar

四、BATCH模式與STREAMING模式實踐

1. 批處理模式示例

以下是使用BATCH模式處理文件數據的完整示例:

package com.cn.daimajiangxin.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Arrays;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 創建執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 明確設置為批處理模式
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        // 從文件讀取數據(有界數據源)
        String inputPath = "path\\flink-learning\\data\\input.txt";
        // 1. 創建文件源構建器
        Path filePath = new Path(inputPath);

        // 2. 配置文件讀取格式
        StreamFormat<String> format =new TextLineInputFormat("UTF-8");

        // 3. 構建 FileSource
        FileSource<String> fileSource = FileSource
                .forRecordStreamFormat(format, filePath)
                .build();
        // 4. 添加 Watermark 策略(批處理中可使用默認策略)
        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forMonotonousTimestamps()
                .withIdleness(Duration.ofSeconds(10));

        DataStream<String> text = env.fromSource(fileSource,watermarkStrategy,"FileSource");

        // 數據處理邏輯
        DataStream<Tuple2<String, Integer>> counts = text
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0)
                .sum(1);

        // 輸出結果
        counts.print();

        // 執行作業
        env.execute("Batch Word Count");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // 分詞併為每個單詞生成(word, 1)的元組
            Arrays.stream(value.toLowerCase().split("\\W+"))
                    .filter(word -> word.length() > 0)
                    .forEach(word -> out.collect(new Tuple2<>(word, 1)));
        }
    }
}

2. 流處理模式示例

以下是使用STREAMING模式處理Kafka數據流的示例:

package com.cn.daimajiangxin.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Arrays;

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        // 創建執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 明確設置為流處理模式
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        // 啓用檢查點
        env.enableCheckpointing(5000);

        // 創建Kafka源(無界數據源)
        KafkaSource<String> source = KafkaSource.<String>
                        builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("word-count-topic")
                .setGroupId("flink-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // 從Kafka讀取數據
        DataStream<String> text = env.fromSource(
                source,
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)),
                "Kafka Source"
        );

        // 數據處理邏輯
        DataStream<Tuple2<String, Integer>> counts = text
                .flatMap(new Tokenizer())
                .keyBy(value -> value.f0)
                .sum(1);

        // 輸出結果
        counts.print();

        // 執行作業
        env.execute("Streaming Word Count");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            Arrays.stream(value.toLowerCase().split("\\W+"))
                    .filter(word -> word.length() > 0)
                    .forEach(word -> out.collect(new Tuple2<>(word, 1)));
        }
    }
}

五、AUTOMATIC模式的智能選擇機制

1. AUTOMATIC模式的工作原理

AUTOMATIC模式是Flink 1.20.1中的一個強大特性,它能夠根據作業的數據源類型自動選擇最合適的執行模式:

  • 當所有輸入源都是有界的(如文件、批量數據庫查詢),自動選擇BATCH模式
  • 當至少有一個輸入源是無界的(如Kafka、Socket),自動選擇STREAMING模式
// 設置為自動模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

2. 邊界情況處理

在某些複雜場景下,AUTOMATIC模式的選擇可能不完全符合預期:

  • 混合數據源: 如果作業同時包含有界和無界數據源,將選擇STREAMING模式
  • 動態數據源: 對於可能在運行時從有界變為無界的數據源,建議明確指定執行模式
  • 複雜處理拓撲: 對於包含迭代或複雜循環的作業,可能需要手動選擇執行模式

六、批處理模式的性能優化

1. 批處理特定的優化

BATCH模式針對有界數據處理提供了多項性能優化:

  • 任務調度優化: 採用更高效的批處理調度策略
  • 內存管理改進: 更積極的數據物化和緩存
  • 網絡傳輸優化: 批量數據傳輸減少網絡開銷
  • 計算優化: 使用更適合批處理的算子實現

2. 性能對比示例

使用相同的WordCount邏輯,分別在BATCH和STREAMING模式下處理1GB文本數據的性能對比:

模式 執行時間 CPU使用率 內存消耗
STREAMING 38秒 穩定在70% 2.4GB
BATCH 22秒 峯值95%,完成後釋放 1.8GB

1. 新特性和優化

Flink 1.20.1在執行模式方面帶來了多項改進:

  • 更智能的AUTOMATIC模式: 改進了自動模式的選擇邏輯,支持更復雜的數據源組合
  • 批處理模式性能提升: 進一步優化了批處理執行引擎,提升了大數據量處理能力
  • API一致性增強: 確保所有算子在不同執行模式下行為一致
  • 資源利用率優化: 改進了批處理模式下的資源調度,減少資源浪費

2. 兼容性注意事項

在使用Flink 1.20.1的執行模式時,需要注意以下兼容性問題:

  • 某些流處理特有的操作(如CEP)在BATCH模式下可能行為受限
  • 窗口操作在BATCH和STREAMING模式下的實現方式不同
  • 狀態過期機制在兩種模式下有細微差別

八、最佳實踐

1. 執行模式選擇指南

場景 推薦模式 原因
離線數據處理 BATCH 性能更好,資源利用率更高
實時數據處理 STREAMING 低延遲,持續處理能力
ETL作業 BATCH 更適合處理有界數據集
實時分析 STREAMING 滿足實時性要求
不確定數據源類型 AUTOMATIC 自動適配不同數據源

2. 實際應用中的模式切換策略

在實際項目中,可以採用以下策略來管理執行模式:

  • 開發環境: 使用AUTOMATIC模式,方便測試不同數據源
  • 生產環境: 根據明確的數據流特徵選擇BATCH或STREAMING模式
  • 批處理作業: 明確設置為BATCH模式以獲得最佳性能
  • 流處理作業: 明確設置為STREAMING模式,確保低延遲

九、總結與展望

Flink的批流一體執行模式是大數據處理領域的一次重要創新,它消除了批處理和流處理之間的界限,為開發者提供了統一、靈活的編程模型。通過Execution Mode的合理選擇和配置,我們可以在不同場景下獲得最佳的性能表現。

隨着Flink 1.20.1的發佈,批流一體架構進一步成熟,執行模式的自動選擇更加智能,性能優化更加到位。未來,Flink將繼續完善其批流一體架構,為大數據處理提供更加強大和靈活的解決方案。

通過本文的學習,相信你已經對Flink的執行模式有了深入的理解。在實際應用中,建議根據具體的數據特徵和處理需求,選擇合適的執行模式,充分發揮Flink批流一體的優勢。


原文來自:http://blog.daimajiangxin.com.cn

源碼地址:https://gitee.com/daimajiangxin/flink-learning

user avatar u_16297326 头像 u_17513518 头像 u_13529088 头像 tech 头像 lenve 头像
点赞 5 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.