主要步驟
Kafka消費者的使用流程主要包括配置消費者、訂閲主題、拉取消息、處理消息和提交偏移量等關鍵步驟。
- 消費者配置與初始化
必須配置bootstrap.servers(Kafka集羣地址)、group.id(消費者組ID)和序列化器推薦使用手動提交偏移量以保證消息處理的可靠性
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- 主題訂閲
支持訂閲特定主題列表或使用正則表達式匹配多個主題。新主題匹配正則表達式時會觸發再均衡。
consumer.subscribe(Collections.singletonList("first-topic"));
- 消息拉取與處理
通過poll()方法批量拉取消息。在獲取消息前可以使用自定義攔截器對消息進行處理。在消息處理完成後手動提交偏移量。
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
- 消費者組管理
同一消費者組內的消費者共享主題分區。消費者數量變化或分區數變化時會觸發再均衡。
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "qichsiii");
自定義攔截器
public class MyConsumerInceptor implements ConsumerInterceptor<Integer, String> {
@Override
public ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> consumerRecords) {
Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecords.iterator();
ConsumerRecords<Integer,String> cr = new ConsumerRecords<>(Collections.EMPTY_MAP);
while(iterator.hasNext()){
ConsumerRecord<Integer, String> record = iterator.next();
if(record.key()%2==0){
System.out.println("攔截器偶數key:"+record.key()+",value"+record.value());
}
}
return consumerRecords;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
public class KafkaConsumerTest {
public static void main(String[] args) throws InterruptedException {
// 創建配置對象
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerConfig.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInceptor.class.getName());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 創建消費者對象
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfig);
// 訂閲主題
consumer.subscribe(Collections.singletonList("first-topic"));
while(true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
System.out.println("接收數據"+stringStringConsumerRecord);
}
Thread.sleep(1000L);
}
}
}
提交偏移量
kafka默認自動提交偏移量
- 消息重複消費風險
當enable.auto.commit=true時,默認每5秒(由auto.commit.interval.ms控制)提交一次偏移量。若在兩次提交間隔內發生再均衡,新消費者會從上次提交的偏移量開始消費,導致已處理但未提交的消息被重複處理。例如:提交間隔5秒,第3秒發生再均衡,則3秒內的消息可能被重複消費。 - 提交時機不可控
自動提交基於時間間隔而非消息處理完成狀態,可能提交未處理完的偏移量。若消費者在提交後崩潰,未處理的消息會被跳過(丟失)。
解決方法
- 手動提交:關閉自動提交(enable.auto.commit=false),在消息處理完成後調用commitSync()(同步)或commitAsync()(異步)。
- 冪等處理:業務邏輯中實現去重機制(如數據庫唯一鍵校驗)。
例子:
public class KafkaConsumerAutoOffsetTest {
public static void main(String[] args) {
// 創建配置對象
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 創建消費者對象
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfig);
// 訂閲主題
consumer.subscribe(Collections.singletonList("first-topic"));
while(true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
System.out.println("接收數據"+stringStringConsumerRecord);
}
// TODO 手動提交偏移量
// TODO 拉取之後,還沒處理數據就提交了。數據處理發生問題。出現露消費問題。
// consumer.commitAsync();//異步
consumer.commitSync();// 同步
}
}
}
分區分配策略
通過 partition.assignment.strategy 配置分區分配策略:
RangeAssignor(默認):按分區範圍分配。
RoundRobinAssignor:輪詢分配分區。
StickyAssignor:儘量減少分區重新分配時的變動。
consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
本文章為轉載內容,我們尊重原作者對文章享有的著作權。如有內容錯誤或侵權問題,歡迎原作者聯繫我們進行內容更正或刪除文章。