博客 / 詳情

返回

百萬架構師第三十三課:kafka:分佈式消息通信Kafka(一)|JavaGuide

原文鏈接

課程目標

  • Kafka 產生的背景
  • Kafka 的架構
  • Kafka 的安裝部署和集羣部署
  • Kafka 的基本操作
  • Kafka 的應用

Kafka 的簡介

  • 高性能
  • 高吞吐量

什麼是 Kafka

​ Kafka 是一款分佈式消息發佈和訂閲系統,具有高性能、高吞吐量的特點而被廣泛應用於大數據傳輸場景。它是由 LinkedIn公 司開發,使用 Scala 語言編寫,之後成為 Apache 基金會的一個頂級項目。 kafka 提供了類似 JMS 的特性,但是在設計和實現上是完全不同的,而且他也不是 JMS 規範的實現。

Kafka 產生的背景

​ kafka作為一個消息系統,早期設計的目的是用作 LinkedIn 的活動流(Activity Stream)和運營數據處理管道(Pipeline)。活動流數據是所有的網站對用户的使用情況做分析的時候要用到的最常規的部分,活動數據包括頁面的訪問量(PV)、被查看內容方面的信息以及搜索內容。這種數據通常的處理方式是先把各種活動以日誌的形式寫入某種文件,然後週期性地對這些文件進行統計分析。運營數據指的是服務器的性能數據(CPU、IO使用率、請求時間、服務日誌等)。

Kafka 的應用場景

  • 內置分區
  • 實現集羣

spring cloud stream 也有 Kafka 的實現。

​ 由於 kafka 具有更好的吞吐量、內置分區、冗餘及容錯性的優點 ( kafka 每秒可以處理幾十萬消息 ) ,讓 kafka 成為了一個很好的大規模消息處理應用的解決方案。所以在企業級應用中,主要會應用於如下幾個方面:

  • 行為跟蹤:kafka可以用於跟蹤用户瀏覽頁面、搜索及其他行為。通過發佈-訂閲模式實時記錄到對應的topic中,通過後端大數據平台接入處理分析,並做更進一步的實時處理和監控。
  • 日誌收集:在日誌收集方面,有很多比較優秀的產品,比如 Apache Flume,很多公司使用kafka 代理日誌聚合。日誌聚合表示從服務器上收集日誌文件,然後放到一個集中的平台(文件服務器)進行處理。在實際應用開發中,我們應用程序的 log 都會輸出到本地的磁盤上, 排查問題的話通過 linux 命令來搞定,如果應用程序組成了負載均衡集羣,並且集羣的機器有幾十台以上,那麼想通過日誌快速定位到問題,就是很麻煩的事情了。所以一般都會做一個日誌統一收集平台管理 log 日誌用來快速查詢重要應用的問題。所以很多公司的套路都是把應用日誌集中到 kafka 上,然後分別導入到 es 和 hdfs 上,用來做實時檢索分析和離線統計數據備份等。而另一方面,kafka 本身又提供了很好的 api 來集成日誌並且做日誌收集。

JavaGuide_Kafka_通信1_ELK圖解.png

Kafka 本身的架構

​ 一個典型的 kafka 集羣包含若干 Producer(可以是應用節點產生的消息,也可以是通過Flume 收集日誌產生的事件),若干個 Broker(kafka 支持水平擴展)、若干個 Consumer 、Group,以及一個 zookeeper 集羣。kafka 通過 zookeeper 管理集羣配置及服務協同。

  • Producer 使用 push 模式將消息發佈到 broker,consumer 通過監聽使用 pull 模式從broker 訂閲並消費消息。
  • 多個 broker 協同工作, producer 和 consumer 部署在各個業務邏輯中。三者通過zookeeper 管理協調請求和轉發。這樣就組成了一個高性能的分佈式消息發佈和訂閲系統。圖上有一個細節是和其他 mq 中間件不同的點。producer 發送消息到 broker 的過程是 push,而 consumer 從 broker 消費消息的過程是 pull,主動去拉數據。而不是 broker 把數據主動發送給 consumer 。
  • Topic 主題
  • partion 數據分區
  • group

JavaGuide_Kafka_通信1_Kafka通信分區.png

kafka 的安裝部署

下載安裝包

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka...

kafka_2.11-1.1.0.tgx: http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0....

安裝過程

  1. tar -zxvf 解壓安裝包
[root@Darian1 software]# tar -zxvf kafka_2.11-2.0.0.tgz

kafka 目錄介紹

  1. /bin 操作 kafka 的可執行腳本
  2. /config 配置文件
  3. /libs 依賴庫目錄
  4. /logs 日誌數據目錄

啓動 /停止 kafka

  1. 需要先啓動 zookeeper ,如果沒有搭建 zookeeper 環境,可以直接運行kafka內嵌的zookeeper
    啓動命令: bin/zookeeper-server-start.sh config/zookeeper.properties &
  2. 進入kafka目錄,運行 bin/kafka-server-start.sh {-daemon 後台啓動} config/server.properties &
  3. 進入kafka目錄,運行bin/kafka-server-stop.sh config/server.properties

運行的外部的 Zookeeper 集羣的 Kafka

[root@Darian1 bin]# vim ../config/server.properties 

zookeeper.connect=168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 
    
[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties 

如果超時時間連接較長可以延長時間。

[root@Darian1 bin]# vim ../config/server.properties 

zookeeper.connect=192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=30000

Kafka 的基本操作

前提:
首先需要啓動 zookeeper
啓動 kafka
[root@Darian1 bin]# sh kafka-server-start.sh ../config/server.properties 
後台啓動kafka
[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties
創建一個 Topic
[root@Darian1 bin]# sh kafka-topics.sh --create --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --replication-factor 1 --partitions 1 --topic darianTest

Created topic "darianTest".

Replication-factor 表示該topic需要在不同的broker中保存幾份,這裏設置成1,表示在兩個broker中保存兩份

Partitions 分區數

查看所有的 Topic
[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181
[root@Darian1 bin]# sh kafka-topics.sh --list --zookeeper localhost:2181
darianTest
查看 topic 屬性
[root@Darian1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.40.128:2181,192.168.40.129:2181,192.168.40.131:2181 --topic darianTest

Topic:darianTest    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: darianTest    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
創建一個控制枱發送端

Broker-list 不是 zookeeper

[root@Darian1 bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --topic darianTest
>hello
>helloworld
>dsfsdf
>sdfsd
>
創建一個控制枱接收端
[root@Darian1 bin]# sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic darianTest --from-beginning

hello
helloworld
dsfsdf
sdfsd

安裝集羣環境

zookeeper 能夠完成 kafka 的集羣

修改 server.properties 配置

  1. 修改server.properties. broker.id=0 / 1
  2. 修改server.properties 修改成本機IP

advertised.listeners=PLAINTEXT://192.168.11.153:9092

當 Kafka broker 啓動時,它會在ZK上註冊自己的IP和端口號,客户端就通過這個IP和端口號來連接。Kafka 的 listeners 如果需要配置集羣的話,需要把自己機器的 IP 配置上去。

192.168.40.128
[root@Darian1 bin]# vim ../config/server.properties 


# 在集羣裏邊必須是唯一的,默認情況下 broker.id 都是 0 ,需要標識它的唯一性
broker.id=1 / 2 / 3

zookeeper.connect=192.168.40.128:2181

listeners=PLAINTEXT://192.168.40.128:9092
192.168.40.129
# 在集羣裏邊必須是唯一的,默認情況下 broker.id 都是 0 ,需要標識它的唯一性
broker.id= 2
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.129:9092
192.168.40.131
# 在集羣裏邊必須是唯一的,默認情況下 broker.id 都是 0 ,需要標識它的唯一性
broker.id= 3
zookeeper.connect=192.168.40.128:2181
listeners=PLAINTEXT://192.168.40.131:9092
[root@Darian1 bin]# sh kafka-server-start.sh -daemon ../config/server.properties
Kafka 啓動不好的話,多啓動兩次。

啓動成功以後,看 zookeeper 節點的變化:

[root@Darian1 bin]# sh zkCli.sh 

[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 12] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 15] get /controller
{"version":1,"brokerid":2,"timestamp":"1548340180496"}
cZxid = 0x100000147
ctime = Thu Jan 24 22:29:40 CST 2019
mZxid = 0x100000147
mtime = Thu Jan 24 22:29:40 CST 2019
pZxid = 0x100000147
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1687fbe81fe000a
dataLength = 54
numChildren = 0

​ 每一個節點都有它相應的含義。我們的寫請求會落到 Master 節點上,讀請求可以走其他節點去讀。他們都是熱備可以工作的。他的選舉規則是最小的節點也就是最早註冊的節點。

​ 中間件都是成熟的商業化的東西。

JAVA API 的使用

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>

KafKaConsumer

public class KafkaConsumerDemo extends Thread {
    private final KafkaConsumer kafkaConsumer;

    public KafkaConsumerDemo(String topic) {
        // 設置屬性
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                       "192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
        // 消費組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 間隔時間
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                       "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                       "org.apache.kafka.common.serialization.StringDeserializer");
        // 從最早的開始
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(topic));
    }

    @Override
    public void run() {
        while (true) {
            ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000); // 超時時間
            consumerRecords.forEach(consumerRecord -> {
                System.err.println("[message receive]:" + consumerRecord.value());
                kafkaConsumer.commitSync();
            });
        }
    }

    public static void main(String[] args) {
        new KafkaConsumerDemo("test").start();
    }
}

KafkaProducer

public class KafkaProducerDemo extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final boolean isAsync;

    public KafkaProducerDemo(String topic, boolean isAsync) {
        // 設置屬性
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                       "192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                       "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                       "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    @Override
    public void run() {
        int num = 0;
        while (num < 50) {
            String message = "message_" + num;
            System.err.println("[producer message]:" + message);
            if (isAsync) { // 異步發送
                producer.send(new ProducerRecord<>(topic, message), (recordMetadata, e) -> {
                    if (recordMetadata != null) {
                        System.err.println("[async-offset]:" + recordMetadata.offset() +
                                           "->[partition]:" + recordMetadata.partition());
                    }
                });
            } else {  // 同步發送 future / callable
                try {
                    RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, message)).get();
                    System.err.println("[sync-offset]:" + recordMetadata.offset() +
                                       "->[partition]:" + recordMetadata.partition());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            num++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new KafkaProducerDemo("test", true).start();
    }
}
  • Kafka 有兩種 API

    • 封裝得比較好的
    • 非常靈活的
  • Kafka 的配置信息非常地詳細

JavaGuide_Kafka_通信1_Kafka備註截圖.png

配置信息分析

發送端的可選配置信息分析

ackacks

acks 配置表示 producer 發送消息到 broker 上以後的確認值。有三個可選項

  • 0:表示 producer 不需要等待 broker 的消息確認。這個選項時延最小但同時風險最大(因為當 server 宕機時,數據將會丟失)。
  • 1:表示 producer 只需要獲得 kafka 集羣中的 leader 節點確認即可,這個選擇時延較小同時確保了 leader 節點確認接收成功。
  • all(-1):需要 ISR 中所有的 Replica 給予接收確認(需要集羣中的所有節點確認),速度最慢,安全性最高,但是由於 ISR 可能會縮小到僅包含一個 Replica ,所以設置參數為 all 並不能一定避免數據丟失。
batch.size

​ 生產者發送多個消息到 broker 上的同一個 分區 時,為了減少網絡請求帶來的性能開銷,通過批量的方式來提交消息,可以通過這個參數來控制批量提交的字節數大小,默認大小是 16384byte , 也就是 16kb ,意味着當一批消息大小達到指定的 batch.size 的時候會統一發送。

linger.ms

Producer 默認會把兩次發送時間間隔內收集到的所有 Requests 進行一次聚合然後再發送,以此提高吞吐量,而 linger.ms 就是為每次發送到 broker 的請求增加一些 delay,以此來聚合更多的 Message 請求。 這個有點像TCP裏面的 Nagle 算法,在 TCP 協議的傳輸中,為了減少大量小數據包的發送,採用了 Nagle 算法,也就是基於小包的等-停協議。

  • batch.sizelinger.ms 這兩個參數是kafka性能優化的關鍵參數,很多同學會發現 batch.size 和 linger.ms 這兩者的作用是一樣的,如果兩個都配置了,那麼怎麼工作的呢?實際上,當二者都配置的時候,只要滿足其中一個要求,就會發送請求到 broker 上。
max.request.size

​ 設置請求的數據的最大字節數,為了防止發生較大的數據包影響到吞吐量,默認值為1MB。

還有重試次數等

消息的同步發送和異步發送:

​ Kafka 1.0 以後,默認的發送方式都是異步發送消息。

​ 我們的消息通過 Kafka producersend 以後,這個消息實際上是放到一個後台的發送隊列裏邊,然後通過一個後台的線程,通過不斷地從後代的發送隊列中不斷地取出消息進行發送。發送以後,進行調用回調方法。就是 Callback 方法進行執行。

​ 同步發送就是用的 future 的時候去阻塞。獲得發送信息是進行阻塞。

消費端的可選配置分析

group.id

consumer group 是 kafka 提供的可擴展且具有容錯性的消費者機制。既然是一個組,那麼組內必然可以有多個消費者或消費者實例 ( consumer instance ) ,它們共享一個公共的 ID ,即 group ID 。組內的所有消費者協調在一起來消費訂閲主題 ( subscribed topics ) 的所有分區 ( partition ) 。當然,每個分區只能由同一個消費組內的一個 consumer 來消費。如下圖所示,分別有三個消費者,屬於兩個不同的 group ,那麼對於 firstTopic 這個 topic 來説,這兩個組的消費者都能同時消費這個 topic 中的消息,對於此時的架構來説,這個 firstTopic 就類似於 ActiveMQ 中的 topic 概念。如右圖所示,如果3個消費者都屬於同一個 group ,那麼此時 firstTopic 就是一個 Queue 的概念。

  • 不同的組可以同時消費同一條消息。

JavaGuide_Kafka_通信1_不同的組可以消費同一條消息.png

  • 同一個組只能有一個 Consumer 能夠拿到消息

JavaGuide_Kafka_通信1_同一個組只能有一個消費者消費消息.png

enable.auto.commit

ENABLE_AUTO_COMMIT_CONFIG 可以設置成 false

​ 消費者消費消息以後自動提交,只有當消息提交以後,該消息才不會被再次接收到,還可以配合 auto.commit.interval.ms 控制自動提交的頻率。 每一段時間內的消息批量地確認提交。

​ 當然,我們也可以通過 consumer.commitSync() 的方式實現手動提交

{
    // 異步 Commit
    kafkaConsumer.commitAsync();
    // 同步 commit
    kafkaConsumer.commitSync();
    // 
}

// 可以選擇不同的 Topic 
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
    // ...
}

// 可以選擇不同的回調接口 callback 接口
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    // ...
}
auto.offset.reset

​ 這個參數是針對新的 groupid 中的消費者而言的,當有新 groupid 的消費者來消費指定的 topic 時,對於該參數的配置,會有不同的語義

  • auto.offset.reset=latest 情況下,新的消費者將會從其他消費者最後消費的 offset 處開始消費 Topic 下的消息。
  • auto.offset.reset= earliest 情況下,新的消費者會從該 topic 最早的消息開始消費,(對於新的 groupId 來説,重置 offset )。
  • auto.offset.reset=none 情況下,新的消費者加入以後,由於之前不存在offset,則會直接拋出異常。

沒有消費組,就會拋出異常。

max.poll.records

​ 此設置限制每次調用 poll 返回的消息數,這樣可以更容易地預測每次 poll 間隔要處理的最大值。通過調整此值,可以減少 poll 間隔。

Kafka 工具:

JavaGuide_Kafka_通信1_Kafka_Client連接工具.png

​ 消息都會寫到磁盤上,只要磁盤上這條消息可以存在,那麼我換不同的 GroupId ,那麼就可以一直去消費這條消息。

Spring -kafka 集成

​ Spring 整合 Kafka 實現註冊成功以後去設置抽獎名額(贈送一個抽獎機會)。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
# 定義應用的名稱
spring.application.name=spring-cloud-stream-kafka
# 配置 Web 服務端口
server.port=8080
# 失效管理安全
management.security.enabled=false
# 配置需要的 kafka 主題
kafka.topic.test=test
kafka.topic.darian=darian

# 配置 kafka 的 zookeeper 的節點
#spring.cloud.stream.kafka.binder.zk-nodes=192.168.136.128:2181
spring.cloud.stream.kafka.streams.binder.configuration.zk-nodes=192.168.40.128:2181
# 配置 Spring Kafka 配置信息
spring.kafka.bootstrap-servers=192.168.40.128:9092,192.168.40.129:9092,192.168.40.131:9092
# Kafka 生產者配置
spring.kafka.producer.bootstrap-servers=192.168.40.128:9092
spring.kafka.producer.key-deserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka 消費者配置
spring.kafka.consumer.group-id=darian-1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
KafkaConsumerListener
@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = "${kafka.topic.test}")
    public void onMessageTest(String message) {
        System.out.println("Kafka test 消費者監聽器,接收到消息:" + message);
    }

    @KafkaListener(topics = "${kafka.topic.darian}")
    public void onMessageDarian(String message) {
        System.out.println("Kafka darian消費者監聽器,接收到消息:" + message);
    }

}
KafkaProducerController
@RequiredArgsConstructor
@RestController
public class KafkaProducerController {
    private final KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/message/send/{topic}")
    public TopicMessage sendMessage(
            @PathVariable String topic,
            @RequestParam String message) {
        if ((!"darian".equals(topic)) && (!"test".equals(topic))) {
            return new TopicMessage(topic, message, false, "topic【" + topic + "】 不存在");
        }
        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);

        return new TopicMessage(topic, message, true, "success");
    }


    @AllArgsConstructor
    @Data
    public static class TopicMessage {
        private String topic;
        private String message;
        private boolean send;
        private String errorMessage;
    }

}

JavaGuide_Kafka_通信1_Kafka消息發送截圖.png

JavaGuide_Kafka_通信1_Kafka發送消息Topic不存在.png

來源於: https://javaguide.net

公眾號:不止極客

user avatar xiaoweiyu 頭像 FatTiger4399 頭像 cunyu1943 頭像 prepared 頭像 huzilachadedanche 頭像 goudantiezhuerzi 頭像 fedl 頭像 dadegongjian 頭像
8 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.