Kafka03--Kafka消費者使用方式_#kafka

主要步驟

Kafka消費者的使用流程主要包括配置消費者、訂閲主題、拉取消息、處理消息和提交偏移量等關鍵步驟。

  1. 消費者配置與初始化
    必須配置bootstrap.servers(Kafka集羣地址)、group.id(消費者組ID)和序列化器推薦使用手動提交偏移量以保證消息處理的可靠性
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  1. 主題訂閲
    支持訂閲特定主題列表或使用正則表達式匹配多個主題。新主題匹配正則表達式時會觸發再均衡。
consumer.subscribe(Collections.singletonList("first-topic"));
  1. 消息拉取與處理
    通過poll()方法批量拉取消息。在獲取消息前可以使用自定義攔截器對消息進行處理。在消息處理完成後手動提交偏移量
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
  1. 消費者組管理
    同一消費者組內的消費者共享主題分區。消費者數量變化或分區數變化時會觸發再均衡。
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "qichsiii");

自定義攔截器

public class MyConsumerInceptor implements ConsumerInterceptor<Integer, String> {
    @Override
    public ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> consumerRecords) {
        Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecords.iterator();
        ConsumerRecords<Integer,String> cr = new ConsumerRecords<>(Collections.EMPTY_MAP);
        while(iterator.hasNext()){
            ConsumerRecord<Integer, String> record = iterator.next();
            if(record.key()%2==0){
                System.out.println("攔截器偶數key:"+record.key()+",value"+record.value());
            }
        }
        return consumerRecords;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
public class KafkaConsumerTest {
    public static void main(String[] args) throws InterruptedException {
        // 創建配置對象
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        consumerConfig.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInceptor.class.getName());
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        
        // 創建消費者對象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfig);

        // 訂閲主題
        consumer.subscribe(Collections.singletonList("first-topic"));
        
        while(true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
                System.out.println("接收數據"+stringStringConsumerRecord);
            }
            Thread.sleep(1000L);
        }
    }
}

提交偏移量

kafka默認自動提交偏移量

  • 消息重複消費風險
    當enable.auto.commit=true時,默認每5秒(由auto.commit.interval.ms控制)提交一次偏移量。若在兩次提交間隔內發生再均衡,新消費者會從上次提交的偏移量開始消費,導致已處理但未提交的消息被重複處理。例如:提交間隔5秒,第3秒發生再均衡,則3秒內的消息可能被重複消費
  • 提交時機不可控
    自動提交基於時間間隔而非消息處理完成狀態,可能提交未處理完的偏移量。若消費者在提交後崩潰,未處理的消息會被跳過(丟失)。

解決方法

  • 手動提交:關閉自動提交(enable.auto.commit=false),在消息處理完成後調用commitSync()(同步)或commitAsync()(異步)。
  • 冪等處理:業務邏輯中實現去重機制(如數據庫唯一鍵校驗)。

例子:

public class KafkaConsumerAutoOffsetTest {
    public static void main(String[] args) {
        // 創建配置對象
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 創建消費者對象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfig);

        // 訂閲主題
        consumer.subscribe(Collections.singletonList("first-topic"));

        while(true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
                System.out.println("接收數據"+stringStringConsumerRecord);
            }
            // TODO 手動提交偏移量
            // TODO 拉取之後,還沒處理數據就提交了。數據處理發生問題。出現露消費問題。
//            consumer.commitAsync();//異步
            consumer.commitSync();// 同步
        }
    }
}

分區分配策略

通過 partition.assignment.strategy 配置分區分配策略:
RangeAssignor(默認):按分區範圍分配。
RoundRobinAssignor:輪詢分配分區。
StickyAssignor:儘量減少分區重新分配時的變動。

consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());