1. 簡介
在本教程中,我們將探討 Kafka 主題和分區,以及它們之間的關係。
2. Kafka 主題是什麼
主題是事件序列的存儲機制。 基本上,主題是持久化的日誌文件,它們按照事件發生的先後順序存儲事件。 因此,每次新的事件都會始終添加到日誌的末尾。 此外,事件是不可變的。 也就是説,一旦事件被添加到主題中,就無法修改它。
Kafka 主題的一個典型使用案例是記錄房間的温度測量序列。 一旦一個温度值被記錄下來,例如下午 5:02 的 25 攝氏度,就無法修改它,因為它已經發生。 同樣,下午 5:06 的温度值也無法出現在下午 5:02 的值之前。 因此,通過將每個温度測量值視為一個事件,Kafka 主題將是一個合適的選項來存儲這些數據。
3. Kafka 分區是什麼
Kafka 使用主題分區來提高可擴展性。在主題分區中,Kafka 將其分解為多個部分,並將每個部分存儲在分佈式系統的不同節點上。該部分的數量由我們或集羣的默認配置決定。
Kafka 保證同一主題分區內事件的順序。 但是,默認情況下,它不保證跨所有分區內的事件順序。
例如,為了提高性能,我們可以將主題劃分為兩個不同的分區,並在消費者端從中讀取。在這種情況下,消費者按照事件在同一分區中到達的順序讀取事件。相反,如果 Kafka 將兩個事件傳遞到不同的分區,則無法保證消費者以它們被生產的相同順序讀取這些事件。
為了提高事件的排序,我們可以將事件鍵設置為事件對象。通過這樣做,具有相同鍵的事件將被分配到同一分區,該分區是排序的。因此,具有相同鍵的事件以它們被生產的相同順序到達消費者端。
4. 消費者組
消費者組是由多個消費者組成的集合,這些消費者從一個主題中讀取數據。 Kafka 將所有分區分配給組中的消費者,其中任何給定的分區都始終會被組的一個成員消費一次。 然而,這種分配可能會是不平衡的,這意味着多個分區可以被分配給同一個消費者。
例如,假設一個主題有三個分區,一個有兩名消費者組成的消費者組應該從中讀取數據。 因此,一種可能的分配方式是第一個消費者獲取分區一和分區二,而第二個消費者只獲取分區三。
在 KIP-500 更新中,Kafka 引入了一種名為 KRaft 的新共識算法。 當向組添加消費者或從組中移除消費者時,KRaft 會根據剩餘消費者的比例重新平衡分區。 這樣可以保證沒有任何分區沒有分配給消費者。
5. 配置應用程序
在本節中,我們將創建用於配置主題、消費者和生產者服務的類。
5.1. 主題配置
首先,讓我們為我們的主題創建配置類:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
public NewTopic celciusTopic() {
return TopicBuilder.name("celcius-scale-topic")
.partitions(2)
.build();
}
}KafkaTopicConfig 類注入了兩個 Spring Bean。KafkaAdmin Bean 初始化 Kafka 集羣,指定其運行的網絡地址,而NewTopic Bean 則創建一個名為celcius-scale-topic 的主題,幷包含一個分區。
5.2. 消費者和生產者配置
我們需要必要的類來注入主題的生產者和消費者配置。
首先,讓我們創建一個生產者配置類:
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Double> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Double> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}KafkaProducerConfig 注入兩個 Spring Bean。ProducerFactory 告知 Kafka 如何序列化事件以及生產者應該監聽的服務器。KafkaTemplate 將被用於消費者服務類中創建事件。
5.3. Kafka Producer 服務
最後,在完成初始配置後,我們可以創建驅動程序應用程序。首先,讓我們創建一個生產者應用程序:
public class ThermostatService {
private final KafkaTemplate<String, Double> kafkaTemplate;
public ThermostatService(KafkaTemplate<String, Double> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void measureCelsiusAndPublish(int numMeasurements) {
new Random().doubles(25, 35)
.limit(numMeasurements)
.forEach(tmp -> {
kafkaTemplate.send("celcius-scale-topic", tmp);
});
}
}ThermostatService 包含一個名為 measureCelsiusAndPublish 的單方法。該方法產生 [25, 35] 範圍內的隨機温度測量值,併發布到 celsius-scale-topic Kafka 主題。為此,我們使用 Random 類的 doubles() 方法來創建隨機數字流。然後,我們使用 kafkaTemplate 的 send() 方法發佈事件。
6. 生產和消費事件
在本節中,我們將學習如何配置 Kafka 消費者以從主題中讀取事件,使用嵌入式 Kafka 代理。
6.1. 創建消費者服務
為了消費事件,我們需要一個或多個消費者類。下面創建一個消費 celcius-scale-topic 頂標的消費者:
@Service
public class TemperatureConsumer {
Map<String, Set<String>> consumedRecords = new ConcurrentHashMap<>();
@KafkaListener(topics = "celcius-scale-topic", groupId = "group-1")
public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
trackConsumedPartitions("consumer-1", consumerRecord.partition());
}
private void trackConsumedPartitions(String consumerName, int partitionNumber) {
consumedRecords.computeIfAbsent(consumerName, k -> new HashSet<>());
consumedRecords.computeIfPresent(consumerName, (k, v) -> {
v.add(String.valueOf(partitionNumber));
return v;
});
}
}我們的 consumer1() 方法使用 @KafkaListener 註解來啓動消費者。 topics 參數是一個要消費的主題列表,而 groupId 參數標識消費者所屬的消費者組。
為了稍後可視化結果,我們使用了 ConcurrentHashMap 來存儲已消費的事件。 key 對應於消費者的名稱,而 value 包含其消費的分區。
6.2. 創建測試類
現在,讓我們創建一個集成測試類:
@SpringBootTest(classes = ThermostatApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaTopicsAndPartitionsIntegrationTest {
@ClassRule
public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype");
@Autowired
private ThermostatService service;
@Autowired
private TemperatureConsumer consumer;
@Test
public void givenTopic_andConsumerGroup_whenConsumersListenToEvents_thenConsumeItCorrectly() throws Exception {
service.measureCelsiusAndPublish(10000);
Thread.sleep(1000);
System.out.println(consumer.consumedRecords);
}
}我們正在使用嵌入式 Kafka 代理來運行與 Kafka 相關的測試。<em @EmbeddedKafka</em> 註解使用 brokerProperties 參數來配置代理將運行的 URL 和端口。然後,我們使用 JUnit 規則在 <em EmbeddedKafkaBroker</em> 字段中啓動嵌入式代理。
最後,在測試方法中,我們調用 thermostat 服務以生產 10,000 個事件。
我們將使用 Thread.sleep() 等待事件生產後 1 秒。這確保了消費者已正確配置在代理中以開始處理消息。
讓我們來看一個在運行測試時我們可能獲得的結果示例:
{consumer-1=[0, 1]}這意味着相同的消費者處理了分區 0 和 1 上的所有事件,因為我們只有一個消費者和一個消費者組。如果存在不同消費者組中的多個消費者,結果可能會有所不同。
7. 結論
在本文中,我們探討了 Kafka 主題(topic)和分區(partition)的定義,以及它們之間的關係。
我們還演示了一個消費者從主題的兩個分區中讀取事件的場景,該場景使用了嵌入式 Kafka 代理(broker)。