動態

詳情 返回 返回

從零開始學Flink:數據源 - 動態 詳情

在實時數據處理場景中,數據源(Source)是整個數據處理流程的起點。Flink作為流批一體的計算框架,提供了豐富的Source接口支持,其中通過Kafka獲取實時數據是最常見的場景之一。本文將以Flink DataStream API為核心,帶你從0到1實現“從Kafka消費數據並輸出到日誌”的完整流程,掌握Flink Source的核心用法。

一、為什麼選擇Kafka作為Flink的數據源?

Kafka作為分佈式流處理平台,具備高吞吐量、低延遲、持久化存儲等特性,是實時數據管道的首選。Flink與Kafka的集成方案經過多年優化,支持:

  • 高吞吐量:單集羣可處理數十萬條/秒的消息,滿足大規模實時數據處理需求;
  • 持久化存儲:數據按時間順序寫入磁盤並保留一定週期,支持離線重放和故障恢復;
  • 精確一次(Exactly-Once)消費語義:通過Kafka偏移量(Offset)管理和Flink檢查點(Checkpoint)機制保證數據一致性;
  • 動態分區發現:自動感知Kafka主題的分區變化(如新增分區),無需重啓任務;
  • 靈活的消費模式:支持從指定偏移量、時間戳或最新位置開始消費。

二、環境準備與依賴配置

1. 版本説明

本文基於以下版本實現(需保持版本兼容):

  • Flink:1.20.1(最新穩定版)
  • Kafka:3.4.0(Flink Kafka Connector兼容Kafka 2.8+)
  • JDK:17+
  • gradle 8.3+

2. gradle依賴

在gradle添加Flink核心依賴及Kafka Connector依賴,build.gradle配置可以是如下:

plugins {
    id 'java' // Java項目插件
    id 'application' // 支持main方法運行
    }

    // 設置主類(可選,用於application插件)
    application {
    mainClass.set('com.cn.daimajiangxin.flink.source.KafkaSourceDemo') // 替換為你的主類全限定名
    }

    // 依賴倉庫(Maven中央倉庫)
    repositories {
    mavenCentral()
    }

    // 依賴配置
    dependencies {
    // Flink核心依賴(生產環境通常標記為provided,由Flink運行時提供)
    implementation 'org.apache.flink:flink-java:1.20.1'
    implementation 'org.apache.flink:flink-streaming-java_2.12:1.20.1'

    // Flink Kafka Connector(新版API,兼容Kafka 2.8+)
    implementation 'org.apache.flink:flink-connector-kafka_2.12:1.20.1'

    // SLF4J日誌門面 + Log4j實現(避免日誌警告)
    implementation 'org.apache.logging.log4j:log4j-api:2.17.1'
    implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
    implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'
    }

    // 編譯配置(可選,根據需要調整)
    tasks.withType(JavaCompile) {
    options.encoding = 'UTF-8' // 指定編碼
    sourceCompatibility = JavaVersion.VERSION_17 // 兼容Java 8
    targetCompatibility = JavaVersion.VERSION_17
    }

在深入代碼前,需理解Flink Kafka Source的核心組件:

  • KafkaSource:Flink提供的Kafka數據源連接器,負責與Kafka Broker建立連接、拉取消息;
  • 反序列化器(Deserializer):將Kafka消息的字節數組(byte[])轉換為Flink可處理的數據類型(如String、POJO、Row等);
  • 偏移量管理:記錄已消費的Kafka消息位置(Offset),確保故障恢復時能從斷點繼續消費;
  • 檢查點(Checkpoint):Flink的容錯機制,定期將狀態(包括偏移量)持久化到存儲系統(如HDFS),保證Exactly-Once語義。

四、核心代碼實現:從Kafka讀取數據並輸出到日誌

1. 流程概述

整個流程分為5步:

  1. 配置Kafka連接參數(如Broker地址、主題、消費者組);
  2. 創建Flink流執行環境(StreamExecutionEnvironment);
  3. 定義Kafka Source(使用新版KafkaSource);
  4. 將Source添加到執行環境,並處理數據(如打印到日誌);
  5. 觸發任務執行。

2.代碼詳解

以下是完整的示例代碼,包含詳細註釋:

package com.cn.daimajiangxin.flink.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class KafkaSourceDemo {
        private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceDemo.class);

public static void main(String[] args) throws Exception {
            // 1. 創建Flink流執行環境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 可選:啓用檢查點(生產環境必選,保證Exactly-Once語義)
            env.enableCheckpointing(5000); // 每5秒做一次檢查點
            // 啓用檢查點
            env.enableCheckpointing(5000); // 每5秒做一次檢查點

// 設置檢查點超時時間
            env.getCheckpointConfig().setCheckpointTimeout(60000);

// 2. 配置Kafka參數
            String kafkaBootstrapServers = "172.30.244.152:9092"; // Kafka Broker地址
            String topic = "test_topic"; // 目標主題
            String consumerGroup = "flink-consumer-group"; // 消費者組ID

LOG.info("Connecting to Kafka at " + kafkaBootstrapServers);
            LOG.info("Consuming topic: " + topic);
            LOG.info("Consumer group: " + consumerGroup);

// 3. 定義Kafka Source(新版API)
            KafkaSource`<String>` kafkaSourceDemo = KafkaSource.`<String>`builder()
                    .setBootstrapServers(kafkaBootstrapServers) // Kafka Broker地址
                    .setTopics(topic) // 訂閲的主題
                    .setGroupId(consumerGroup) // 消費者組
                    .setProperty("enable.auto.commit", "true")
                    .setProperty("auto.commit.interval.ms", "1000")
                    .setProperty("session.timeout.ms", "30000")
                    .setProperty("retry.backoff.ms", "1000")
                    .setProperty("reconnect.backoff.max.ms", "10000")
                    .setDeserializer(new KafkaRecordDeserializationSchema `<String>`() {
                        @Override
                        public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector `<String>` out) throws IOException {
                            // 從ConsumerRecord中提取值(字節數組),並轉為字符串
                            String value = new String(record.value(), StandardCharsets.UTF_8);
                            LOG.info("Received message: " + value);
                            out.collect(value); // 將反序列化後的數據收集到Flink流中
                        }

@Override
                        public TypeInformation`<String>` getProducedType() {
                            return TypeInformation.of(String.class);
                        }
                    })
                    // 從最早偏移量開始消費(這樣即使沒有新消息也會讀取歷史數據)
                    .setStartingOffsets(OffsetsInitializer.earliest())
                    .build();

// 4. 將Kafka Source添加到Flink流環境,並處理數據
            DataStream`<String>` kafkaStream = env.fromSource(
                    kafkaSourceDemo,
                    WatermarkStrategy.noWatermarks(), // 無水印(適用於無序數據場景)
                    "Kafka Source" // Source名稱(用於監控)
            );

LOG.info("Kafka source created successfully");

// 5. 處理數據:將每條數據打印到日誌(實際生產中可替換為寫入數據庫、消息隊列等)
            kafkaStream.print("KafkaData");
            LOG.info("Flink Kafka Source Demo started.");
            // 6. 觸發任務執行
            env.execute("Flink Kafka Source Demo");

}
    }

3. 關鍵配置説明

  • KafkaSource.Builder:新版Kafka Source的核心構建器,支持靈活配置;
  • setDeserializer:指定反序列化方式,deserialize 接收Kafka的ConsumerRecord(包含鍵、值、偏移量等信息),提取值(record.value())並反序列化為字符,getProducedType聲明輸出數據的類型(此處為String);
  • setStartingOffsets:控制消費起始位置(latest()從最新數據開始,earliest()從最早數據開始,生產環境常用OffsetsInitializer.committedOffsets()從上次提交的偏移量繼續);
  • WatermarkStrategy:用於事件時間(Event Time)處理,示例中無時間窗口需求,故使用noWatermarks();
  • PrintSinkFunction:Flink內置的日誌打印Sink(true表示打印完整上下文,包含Subtask信息)。

五、運行與測試

在WSL2的Ubuntu 環境中安裝Kafka。

1. 安裝Kafka服務

  • 下載Kafka二進制包
    訪問Apache Kafka官網,選擇最新穩定版(如3.9.0),使用wget下載:

    wget https://mirrors.aliyun.com/apache/kafka/3.9.0/kafka_2.12-3.9.0.tgz
    
  • 解壓並配置環境變量

    # 解壓到/opt/kafka(全局可訪問)
    sudo mkdir -p /opt/kafka
    tar -zxvf kafka_2.12-3.9.0.tgz -C /opt/kafka --strip-components=1
    
    # 永久生效(編輯~/.bashrc)
    echo 'export KAFKA_HOME=/opt/kafka' >> /etc/profile
    echo 'export PATH=$KAFKA_HOME/bin:$PATH' >> /etc/profile
    source /etc/profile
    

2. 配置Kafka

Kafka的核心配置文件位於$KAFKA_HOME/config目錄,需修改以下兩個文件:

配置Kafka Broker(server.properties)

修改以下關鍵參數以適配WSL2環境:

# ==================== 核心角色與ID配置 ====================
    # 啓用KRaft模式(默認已啓用)
    # 單節點同時擔任Broker和控制器
    process.roles=broker,controller
    # 節點唯一ID(單節點必須設為0)
    node.id=0
    # 控制器ID(與node.id一致,單節點唯一)
    controller.id=0

    # ==================== 監聽端口配置 ====================
    # 全局監聽端口(客户端讀寫請求)和控制器監聽端口
    # 多個監聽器使用逗號分隔,每個監聽器都需要指定安全協議
    listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

    # 對外暴露的地址(Windows主機通過localhost訪問)
    # 多個公佈的監聽器使用逗號分隔
    advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093

    # 指定CONTROLLER監聽器的安全協議
    listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

    # 定義控制器監聽器的名稱(KRaft模式必需)
    controller.listener.names=CONTROLLER

    # ==================== ZooKeeper兼容配置(可選) ====================
    # 若需兼容舊客户端,可保留ZooKeeper配置(但KRaft模式無需ZooKeeper)
    # zookeeper.connect=localhost:2181

    # ==================== 日誌與分區配置 ====================
    # 數據存儲目錄配置(Kafka的核心配置參數)
    # Kafka將主題數據、索引文件等存儲在該目錄下
    log.dirs=/opt/kafka/data
    num.partitions=1
    # 副本數(單節點設為1)
    default.replication.factor=1
    # 最小同步副本數(單節點設為1)
    min.insync.replicas=1
    # ==================== 日誌存儲高級配置 ====================
    # 日誌保留時間(默認7天,生產環境根據存儲容量和需求調整)
    # log.retention.hours=168
    # 或按大小限制保留(單位:字節)
    # log.retention.bytes=107374182400  # 100GB

    # 單個分區日誌段大小(默認1GB,可根據實際需求調整)
    # log.segment.bytes=1073741824

    # 日誌段檢查和清理的時間間隔(默認300000ms=5分鐘)
    # log.retention.check.interval.ms=300000

    # 控制是否自動創建主題(生產環境建議禁用,改為手動創建)
    # auto.create.topics.enable=false

    # ==================== 控制器引導配置 ====================
    # 控制器引導服務器(單節點指向自己,格式:host:port)
    # 與控制器監聽端口一致
    controller.quorum.bootstrap.servers=localhost:9093

    # 控制器投票者配置(單節點設為0@localhost:9093)
    controller.quorum.voters=0@localhost:9093

3.啓動Kafka服務

3.1初始化KRaft存儲目錄(首次啓動必需)

在KRaft模式下,需要先初始化元數據存儲:


# 生成集羣ID並保存到變量
CLUSTER_ID=$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)
echo "生成的集羣ID: $CLUSTER_ID"

# 使用生成的集羣ID格式化存儲目錄$KAFKA_HOME/bin/kafka-storage.sh format -t $CLUSTER_ID -c $KAFKA_HOME/config/server.properties

注意: 如果手動運行命令,請確保先執行生成集羣ID的命令,然後使用實際生成的ID替換"$CLUSTER_ID"。

3.2啓動Kafka Broker

# 啓動Broker(日誌輸出到$KAFKA_HOME/logs/server.log)     $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

3.3驗證服務狀態

檢查Kafka Broker進程:
    ps -ef | grep kafka  # 應看到Kafka進程

3.4創建測試主題

確保Kafka服務已啓動,並創建測試主題 test_topic

$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test_topic

3.5發送測試數據

使用Kafka內置的生產者工具發送測試消息到 test_topic

# 啓動Kafka生產者控制枱
    $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

    # 輸入幾條測試消息(每行一條)
    > hello flink
    > flink kafka integration
    > real-time data processing

3.6運行Flink程序

在IDE中直接運行 KafkaSourceDemo類的 main方法,或通過Gradle構建並運行:

# 構建項目
    ./gradlew clean build

    # 運行Flink作業
    ./gradlew run

3.7驗證結果

成功運行後,你應該能在控制枱看到類似如下輸出:

20250918103316

六、進階配置與優化

1. 消費語義保證

在生產環境中,為了確保數據一致性,需要配置Flink的檢查點機制和Kafka偏移量提交策略:

// 1. 啓用檢查點
    env.enableCheckpointing(5000); // 每5秒做一次檢查點

    // 2. 獲取檢查點配置對象(Flink 1.20.1及以上版本推薦方式)
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();

    // 3. 配置檢查點模式為EXACTLY_ONCE(精確一次語義)
    checkpointConfig.setMode(CheckpointingMode.EXACTLY_ONCE);

    // 4. 設置檢查點超時時間
    checkpointConfig.setCheckpointTimeout(Duration.ofSeconds(60));

    // 4. 配置從上次提交的偏移量繼續消費(生產環境推薦)
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

2. 並行度與資源配置

合理設置並行度可充分利用集羣資源並提高吞吐量:

// 設置Flink作業的全局並行度
    env.setParallelism(3); // 與Kafka主題分區數匹配

    // 或單獨設置Source的並行度
    KafkaSource`<String>` kafkaSource = KafkaSource.`<String>`builder()
        // ... 其他配置 ...
        .build();

    DataStream`<String>` stream = env.fromSource(
        kafkaSource,
        WatermarkStrategy.noWatermarks(),
        "Kafka Source")
        .setParallelism(3); // Source並行度

3. 高級反序列化

除了基礎的字符串反序列化,還可以使用更靈活的反序列化方式:

3.1 使用預定義反序列化器

// 使用Flink提供的String反序列化器     .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) 

3.2 自定義POJO反序列化

如果Kafka消息是JSON格式,可以使用Jackson等庫將其反序列化為POJO對象:

public class User {
        private String id;
        private String name;
        private int age;
        // getters, setters, constructors...
    }

    // 自定義POJO反序列化器
    .setDeserializer(new KafkaRecordDeserializationSchema`<User>`() {
        private final ObjectMapper mapper = new ObjectMapper();

    @Override
        public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector`<User>` out) throws IOException {
            User user = mapper.readValue(record.value(), User.class);
            out.collect(user);
        }

    @Override
        public TypeInformation`<User>` getProducedType() {
            return TypeInformation.of(User.class);
        }
    })

七、常見問題與解決方案

1. 連接超時問題

問題現象:程序啓動後報 org.apache.kafka.common.errors.TimeoutException

解決方案

  • 檢查Kafka服務是否正常運行:ps -ef | grep kafka
  • 確認 bootstrap.servers配置正確,特別是在WSL2環境中確保端口映射正確
  • 檢查防火牆設置,確保9092端口開放

2. 數據消費不完整

問題現象:部分Kafka消息未被Flink消費

解決方案

  • 檢查Kafka主題的分區數與Flink Source並行度是否匹配
  • 確認 setStartingOffsets配置正確,生產環境建議使用 OffsetsInitializer.committedOffsets()
  • 檢查檢查點機制是否正常啓用,確保偏移量正確提交

3. 性能優化

對於高吞吐量場景,可以通過以下方式優化性能:

  • 增加Kafka主題分區數(與Flink並行度匹配)
  • 調大 fetch.max.bytesmax.partition.fetch.bytes參數,增加單次拉取的數據量
  • 啓用增量檢查點,減少檢查點開銷
  • 使用 setUnboundedUsePreviousEventTimeWatermark()優化水印生成

八、總結與擴展

本文詳細介紹瞭如何使用Flink從Kafka讀取數據,包括環境準備、代碼實現、運行測試以及進階配置。通過本文的學習,你應該能夠掌握Flink數據源的核心用法,為構建企業級實時數據處理應用打下堅實基礎。

在實際應用中,Flink還支持多種其他數據源,如:

  • 文件系統(HDFS、本地文件)
  • 數據庫(MySQL、PostgreSQL、MongoDB等)
  • 消息隊列(RabbitMQ、Pulsar等)
  • 自定義數據源(通過實現 SourceFunction接口)

後續文章將繼續深入探討Flink的數據轉換、窗口計算、狀態管理等核心概念,敬請關注!


源文來自:http://blog.daimajiangxin.com.cn

源碼地址:https://gitee.com/daimajiangxin/flink-learning

user avatar journey_64224c9377fd5 頭像 u_16502039 頭像 mulavar 頭像 u_15844731 頭像 u_11365552 頭像 chuanghongdengdeqingwa_eoxet2 頭像 ahahan 頭像 yizhidanshendetielian 頭像 HunterCode 頭像 laoqing 頭像 huangxunhui 頭像
點贊 11 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.