知識庫 / Spring RSS 訂閱

理解 Kafka 主題和分區

Spring
HongKong
5
11:23 AM · Dec 06 ,2025

1. 簡介

在本教程中,我們將探討 Kafka 主題和分區,以及它們之間的關係。

2. Kafka 主題是什麼

主題是事件序列的存儲機制。 基本上,主題是持久化的日誌文件,它們按照事件發生的先後順序存儲事件。 因此,每次新的事件都會始終添加到日誌的末尾。 此外,事件是不可變的。 也就是説,一旦事件被添加到主題中,就無法修改它。

Kafka 主題的一個典型使用案例是記錄房間的温度測量序列。 一旦一個温度值被記錄下來,例如下午 5:02 的 25 攝氏度,就無法修改它,因為它已經發生。 同樣,下午 5:06 的温度值也無法出現在下午 5:02 的值之前。 因此,通過將每個温度測量值視為一個事件,Kafka 主題將是一個合適的選項來存儲這些數據。

3. Kafka 分區是什麼

Kafka 使用主題分區來提高可擴展性。在主題分區中,Kafka 將其分解為多個部分,並將每個部分存儲在分佈式系統的不同節點上。該部分的數量由我們或集羣的默認配置決定。

Kafka 保證同一主題分區內事件的順序。 但是,默認情況下,它不保證跨所有分區內的事件順序。

例如,為了提高性能,我們可以將主題劃分為兩個不同的分區,並在消費者端從中讀取。在這種情況下,消費者按照事件在同一分區中到達的順序讀取事件。相反,如果 Kafka 將兩個事件傳遞到不同的分區,則無法保證消費者以它們被生產的相同順序讀取這些事件。

為了提高事件的排序,我們可以將事件鍵設置為事件對象。通過這樣做,具有相同鍵的事件將被分配到同一分區,該分區是排序的。因此,具有相同鍵的事件以它們被生產的相同順序到達消費者端。

4. 消費者組

消費者組是由多個消費者組成的集合,這些消費者從一個主題中讀取數據。 Kafka 將所有分區分配給組中的消費者,其中任何給定的分區都始終會被組的一個成員消費一次。 然而,這種分配可能會是不平衡的,這意味着多個分區可以被分配給同一個消費者。

例如,假設一個主題有三個分區,一個有兩名消費者組成的消費者組應該從中讀取數據。 因此,一種可能的分配方式是第一個消費者獲取分區一和分區二,而第二個消費者只獲取分區三。

KIP-500 更新中,Kafka 引入了一種名為 KRaft 的新共識算法。 當向組添加消費者或從組中移除消費者時,KRaft 會根據剩餘消費者的比例重新平衡分區。 這樣可以保證沒有任何分區沒有分配給消費者。

5. 配置應用程序

在本節中,我們將創建用於配置主題、消費者和生產者服務的類。

5.1. 主題配置

首先,讓我們為我們的主題創建配置類:

@Configuration
public class KafkaTopicConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    public NewTopic celciusTopic() {
        return TopicBuilder.name("celcius-scale-topic")
                .partitions(2)
                .build();
    }
}

KafkaTopicConfig 類注入了兩個 Spring Bean。KafkaAdmin Bean 初始化 Kafka 集羣,指定其運行的網絡地址,而NewTopic Bean 則創建一個名為celcius-scale-topic 的主題,幷包含一個分區。

5.2. 消費者和生產者配置

我們需要必要的類來注入主題的生產者和消費者配置。

首先,讓我們創建一個生產者配置類:

public class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Double> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Double> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaProducerConfig 注入兩個 Spring Bean。ProducerFactory 告知 Kafka 如何序列化事件以及生產者應該監聽的服務器。KafkaTemplate 將被用於消費者服務類中創建事件。

5.3. Kafka Producer 服務

最後,在完成初始配置後,我們可以創建驅動程序應用程序。首先,讓我們創建一個生產者應用程序:

public class ThermostatService {

    private final KafkaTemplate<String, Double> kafkaTemplate;

    public ThermostatService(KafkaTemplate<String, Double> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void measureCelsiusAndPublish(int numMeasurements) {
        new Random().doubles(25, 35)
                .limit(numMeasurements)
                .forEach(tmp -> {
                    kafkaTemplate.send("celcius-scale-topic", tmp);
                });
    }
}

ThermostatService 包含一個名為 measureCelsiusAndPublish 的單方法。該方法產生 [25, 35] 範圍內的隨機温度測量值,併發布到 celsius-scale-topic Kafka 主題。為此,我們使用 Random 類的 doubles() 方法來創建隨機數字流。然後,我們使用 kafkaTemplatesend() 方法發佈事件。

6. 生產和消費事件

在本節中,我們將學習如何配置 Kafka 消費者以從主題中讀取事件,使用嵌入式 Kafka 代理。

6.1. 創建消費者服務

為了消費事件,我們需要一個或多個消費者類。下面創建一個消費 celcius-scale-topic 頂標的消費者:

@Service
public class TemperatureConsumer {
    Map<String, Set<String>> consumedRecords = new ConcurrentHashMap<>();

    @KafkaListener(topics = "celcius-scale-topic", groupId = "group-1")
    public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
        trackConsumedPartitions("consumer-1", consumerRecord.partition());
    }

    private void trackConsumedPartitions(String consumerName, int partitionNumber) {
        consumedRecords.computeIfAbsent(consumerName, k -> new HashSet<>());
        consumedRecords.computeIfPresent(consumerName, (k, v) -> {
            v.add(String.valueOf(partitionNumber));
            return v;
        });
    }
}

我們的 consumer1() 方法使用 @KafkaListener 註解來啓動消費者。 topics 參數是一個要消費的主題列表,而 groupId 參數標識消費者所屬的消費者組。

為了稍後可視化結果,我們使用了 ConcurrentHashMap 來存儲已消費的事件。 key 對應於消費者的名稱,而 value 包含其消費的分區。

6.2. 創建測試類

現在,讓我們創建一個集成測試類:

@SpringBootTest(classes = ThermostatApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaTopicsAndPartitionsIntegrationTest {
    @ClassRule
    public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype");

    @Autowired
    private ThermostatService service;

    @Autowired
    private TemperatureConsumer consumer;

    @Test
    public void givenTopic_andConsumerGroup_whenConsumersListenToEvents_thenConsumeItCorrectly() throws Exception {
        service.measureCelsiusAndPublish(10000);
        Thread.sleep(1000);
        System.out.println(consumer.consumedRecords);
    }
}

我們正在使用嵌入式 Kafka 代理來運行與 Kafka 相關的測試。<em @EmbeddedKafka</em> 註解使用 brokerProperties 參數來配置代理將運行的 URL 和端口。然後,我們使用 JUnit 規則在 <em EmbeddedKafkaBroker</em> 字段中啓動嵌入式代理。

最後,在測試方法中,我們調用 thermostat 服務以生產 10,000 個事件。

我們將使用 Thread.sleep() 等待事件生產後 1 秒。這確保了消費者已正確配置在代理中以開始處理消息。

讓我們來看一個在運行測試時我們可能獲得的結果示例:

{consumer-1=[0, 1]}

這意味着相同的消費者處理了分區 0 和 1 上的所有事件,因為我們只有一個消費者和一個消費者組。如果存在不同消費者組中的多個消費者,結果可能會有所不同。

7. 結論

在本文中,我們探討了 Kafka 主題(topic)和分區(partition)的定義,以及它們之間的關係。

我們還演示了一個消費者從主題的兩個分區中讀取事件的場景,該場景使用了嵌入式 Kafka 代理(broker)。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.