1. Kafka 消費者監控概述

1.1 為什麼需要監控 Kafka 消費者

Kafka 消費者監控是確保消息系統穩定運行的關鍵環節。通過監控消費者,我們可以:

  • 實時瞭解消息消費進度
  • 及時發現消費延遲或積壓問題
  • 診斷消費者組重新平衡問題
  • 優化消費者性能和資源配置
  • 保障業務數據的及時處理

1.2 監控指標分類

指標類別

具體指標

説明

消費進度

當前偏移量、最新偏移量、消費延遲

反映消息消費的及時性

消費者狀態

消費者活躍性、分區分配情況

反映消費者健康狀態

性能指標

消費速率、處理時長

反映消費者性能表現

2. Scala 實現 Kafka 消費者監控

2.1 基礎環境配置

scala

// build.sbt
libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka-clients" % "3.4.0",
  "org.apache.kafka" %% "kafka" % "3.4.0",
  "com.typesafe" % "config" % "1.4.2",
  "org.slf4j" % "slf4j-api" % "2.0.6",
  "ch.qos.logback" % "logback-classic" % "1.4.5"
)

// application.conf
kafka {
  bootstrap.servers = "localhost:9092"
  group.id = "consumer-monitor"
  enable.auto.commit = false
  auto.offset.reset = "latest"
}

2.2 Scala 消費者組信息獲取

scala

import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, ListConsumerGroupsOptions, ListConsumerGroupOffsetsOptions, ConsumerGroupListing}
import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.{TopicPartition, Node}
import scala.collection.JavaConverters._
import scala.util.{Try, Success, Failure}
import java.util.Properties
import java.time.Duration

class ScalaKafkaConsumerMonitor(bootstrapServers: String) {
  
  private val adminClient: AdminClient = createAdminClient()
  
  private def createAdminClient(): AdminClient = {
    val props = new Properties()
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000")
    AdminClient.create(props)
  }
  
  // 獲取所有消費者組列表
  def listConsumerGroups(): Try[Seq[String]] = {
    Try {
      val options = new ListConsumerGroupsOptions()
        .timeoutMs(10000)
      val groups = adminClient.listConsumerGroups(options).all().get()
      groups.asScala.map(_.groupId()).toSeq
    }
  }
  
  // 獲取消費者組詳情
  def getConsumerGroupDetails(groupId: String): Try[ConsumerGroupInfo] = {
    Try {
      // 獲取消費者組偏移量
      val offsetOptions = new ListConsumerGroupOffsetsOptions()
        .timeoutMs(10000)
      val offsetsFuture = adminClient.listConsumerGroupOffsets(groupId, offsetOptions)
      val offsets = offsetsFuture.partitionsToOffsetAndMetadata().get()
      
      // 獲取消費者組成員信息
      val describeResult = adminClient.describeConsumerGroups(Seq(groupId).asJava)
      val groupDescription = describeResult.describedGroups().get(groupId).get()
      
      ConsumerGroupInfo(
        groupId = groupId,
        state = groupDescription.state().toString,
        members = groupDescription.members().asScala.map { member =>
          ConsumerMember(
            memberId = member.memberId(),
            clientId = member.clientId(),
            host = member.host(),
            assignments = member.assignment().topicPartitions().asScala.map(_.toString).toSeq
          )
        }.toSeq,
        offsets = offsets.asScala.map { case (tp, offsetMetadata) =>
          TopicPartitionOffset(
            topic = tp.topic(),
            partition = tp.partition(),
            offset = Option(offsetMetadata).map(_.offset()).getOrElse(-1L),
            metadata = Option(offsetMetadata).flatMap(om => Option(om.metadata())).getOrElse("")
          )
        }.toSeq
      )
    }
  }
  
  // 計算消費延遲
  def calculateConsumerLag(groupId: String): Try[Seq[PartitionLag]] = {
    for {
      groupInfo <- getConsumerGroupDetails(groupId)
      lagInfo <- calculateLagForGroup(groupInfo)
    } yield lagInfo
  }
  
  private def calculateLagForGroup(groupInfo: ConsumerGroupInfo): Try[Seq[PartitionLag]] = {
    Try {
      groupInfo.offsets.flatMap { consumerOffset =>
        getTopicEndOffsets(consumerOffset.topic).map { endOffsets =>
          endOffsets.map { case (partition, endOffset) =>
            val consumerOffsetForPartition = groupInfo.offsets
              .find(offset => offset.topic == consumerOffset.topic && offset.partition == partition)
              .map(_.offset)
              .getOrElse(0L)
            
            val lag = Math.max(0, endOffset - consumerOffsetForPartition)
            
            PartitionLag(
              topic = consumerOffset.topic,
              partition = partition,
              consumerOffset = consumerOffsetForPartition,
              endOffset = endOffset,
              lag = lag
            )
          }
        }.getOrElse(Seq.empty)
      }.flatten
    }
  }
  
  private def getTopicEndOffsets(topic: String): Option[Map[Int, Long]] = {
    Try {
      val consumer = createConsumer()
      try {
        val partitions = consumer.partitionsFor(topic).asScala.map(_.partition()).toSeq
        val topicPartitions = partitions.map(new TopicPartition(topic, _)).asJava
        val endOffsets = consumer.endOffsets(topicPartitions)
        endOffsets.asScala.map { case (tp, offset) => tp.partition() -> offset }.toMap
      } finally {
        consumer.close()
      }
    }.toOption
  }
  
  private def createConsumer(): KafkaConsumer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", bootstrapServers)
    props.put("group.id", "temp-monitor")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    new KafkaConsumer[String, String](props)
  }
  
  def close(): Unit = {
    adminClient.close()
  }
}

// 數據模型
case class ConsumerGroupInfo(
  groupId: String,
  state: String,
  members: Seq[ConsumerMember],
  offsets: Seq[TopicPartitionOffset]
)

case class ConsumerMember(
  memberId: String,
  clientId: String,
  host: String,
  assignments: Seq[String]
)

case class TopicPartitionOffset(
  topic: String,
  partition: Int,
  offset: Long,
  metadata: String
)

case class PartitionLag(
  topic: String,
  partition: Int,
  consumerOffset: Long,
  endOffset: Long,
  lag: Long
)

2.3 Scala 監控服務實現

scala

import akka.actor.ActorSystem
import akka.stream.Materializer
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._

class ScalaConsumerMonitoringService(monitor: ScalaKafkaConsumerMonitor)(
  implicit system: ActorSystem, 
  ec: ExecutionContext, 
  mat: Materializer
) {
  
  // 定時監控任務
  def startPeriodicMonitoring(interval: FiniteDuration = 1.minute): Unit = {
    system.scheduler.scheduleAtFixedRate(0.seconds, interval) { () =>
      monitorConsumerGroups().onComplete {
        case Success(report) => 
          println(s"監控報告生成時間: ${java.time.Instant.now()}")
          printMonitoringReport(report)
        case Failure(exception) =>
          println(s"監控失敗: ${exception.getMessage}")
      }
    }
  }
  
  private def monitorConsumerGroups(): Future[MonitoringReport] = {
    Future {
      val groups = monitor.listConsumerGroups().get
      val groupDetails = groups.flatMap { groupId =>
        monitor.getConsumerGroupDetails(groupId).toOption
      }
      val lagInfo = groupDetails.flatMap { group =>
        monitor.calculateConsumerLag(group.groupId).getOrElse(Seq.empty)
      }
      
      MonitoringReport(
        timestamp = java.time.Instant.now(),
        totalGroups = groups.size,
        activeGroups = groupDetails.count(_.state == "Stable"),
        totalLag = lagInfo.map(_.lag).sum,
        groupDetails = groupDetails,
        partitionLags = lagInfo
      )
    }
  }
  
  private def printMonitoringReport(report: MonitoringReport): Unit = {
    println("=" * 80)
    println(s"Kafka消費者監控報告 - ${report.timestamp}")
    println("=" * 80)
    println(s"消費者組總數: ${report.totalGroups}")
    println(s"活躍消費者組: ${report.activeGroups}")
    println(s"總消費延遲: ${report.totalLag} 條消息")
    
    report.groupDetails.foreach { group =>
      println(s"\n消費者組: ${group.groupId} [${group.state}]")
      group.members.foreach { member =>
        println(s"  成員: ${member.memberId} (${member.clientId})")
        println(s"  主機: ${member.host}")
        println(s"  分配分區: ${member.assignments.mkString(", ")}")
      }
    }
    
    if (report.partitionLags.nonEmpty) {
      println("\n分區消費延遲詳情:")
      report.partitionLags.groupBy(_.topic).foreach { case (topic, lags) =>
        println(s"  主題: $topic")
        lags.foreach { lag =>
          println(s"    分區 ${lag.partition}: 消費者偏移量=${lag.consumerOffset}, " +
                 s"最新偏移量=${lag.endOffset}, 延遲=${lag.lag}")
        }
      }
    }
  }
}

case class MonitoringReport(
  timestamp: java.time.Instant,
  totalGroups: Int,
  activeGroups: Int,
  totalLag: Long,
  groupDetails: Seq[ConsumerGroupInfo],
  partitionLags: Seq[PartitionLag]
)

3. 從 Scala 遷移到 Java 的全面指南

3.1 語言特性對比與遷移策略

Scala 特性

Java 替代方案

遷移建議

Case Class

Record (Java 14+) / Lombok

使用不可變數據類

Option

Optional

顯式處理空值

Future

CompletableFuture

異步編程轉換

隱式參數

顯式依賴注入

提高代碼可讀性

集合操作

Stream API

函數式數據處理

3.2 Java 環境配置

xml

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.6</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.4.5</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.14.2</version>
    </dependency>
</dependencies>

3.3 Java 版本消費者監控實現

java

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.ConsumerGroupState;

import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class JavaKafkaConsumerMonitor {
    
    private final AdminClient adminClient;
    private final String bootstrapServers;
    
    public JavaKafkaConsumerMonitor(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
        this.adminClient = createAdminClient();
    }
    
    private AdminClient createAdminClient() {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        return AdminClient.create(props);
    }
    
    // 獲取所有消費者組
    public CompletableFuture<List<String>> listConsumerGroups() {
        ListConsumerGroupsOptions options = new ListConsumerGroupsOptions()
            .timeoutMs(10000);
            
        return adminClient.listConsumerGroups(options)
            .all()
            .thenApply(groups -> 
                groups.stream()
                    .map(ConsumerGroupListing::groupId)
                    .collect(Collectors.toList())
            )
            .toCompletableFuture();
    }
    
    // 獲取消費者組詳情
    public CompletableFuture<ConsumerGroupInfo> getConsumerGroupDetails(String groupId) {
        // 並行獲取偏移量和組成員信息
        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsFuture = 
            getConsumerGroupOffsets(groupId);
            
        CompletableFuture<ConsumerGroupDescription> descriptionFuture = 
            getConsumerGroupDescription(groupId);
            
        return offsetsFuture.thenCombine(descriptionFuture, (offsets, description) -> {
            List<ConsumerMember> members = description.members().stream()
                .map(member -> new ConsumerMember(
                    member.memberId(),
                    member.clientId(),
                    member.host(),
                    member.assignment().topicPartitions().stream()
                        .map(TopicPartition::toString)
                        .collect(Collectors.toList())
                ))
                .collect(Collectors.toList());
                
            List<TopicPartitionOffset> topicOffsets = offsets.entrySet().stream()
                .map(entry -> {
                    TopicPartition tp = entry.getKey();
                    OffsetAndMetadata offsetMeta = entry.getValue();
                    return new TopicPartitionOffset(
                        tp.topic(),
                        tp.partition(),
                        offsetMeta.offset(),
                        offsetMeta.metadata() != null ? offsetMeta.metadata() : ""
                    );
                })
                .collect(Collectors.toList());
                
            return new ConsumerGroupInfo(
                groupId,
                description.state().toString(),
                members,
                topicOffsets
            );
        });
    }
    
    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
        getConsumerGroupOffsets(String groupId) {
            
        ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions()
            .timeoutMs(10000);
            
        return adminClient.listConsumerGroupOffsets(groupId, options)
            .partitionsToOffsetAndMetadata()
            .thenApply(Collections::unmodifiableMap)
            .toCompletableFuture();
    }
    
    private CompletableFuture<ConsumerGroupDescription> 
        getConsumerGroupDescription(String groupId) {
            
        return adminClient.describeConsumerGroups(Collections.singletonList(groupId))
            .describedGroups()
            .get(groupId)
            .thenApply(description -> {
                try {
                    return description.get(10, TimeUnit.SECONDS);
                } catch (Exception e) {
                    throw new RuntimeException("Failed to get group description", e);
                }
            })
            .toCompletableFuture();
    }
    
    // 計算消費延遲
    public CompletableFuture<List<PartitionLag>> calculateConsumerLag(String groupId) {
        return getConsumerGroupDetails(groupId)
            .thenCompose(this::calculateLagForGroup);
    }
    
    private CompletableFuture<List<PartitionLag>> calculateLagForGroup(ConsumerGroupInfo groupInfo) {
        // 按主題分組獲取末端偏移量
        Map<String, List<TopicPartitionOffset>> offsetsByTopic = groupInfo.offsets().stream()
            .collect(Collectors.groupingBy(TopicPartitionOffset::topic));
            
        List<CompletableFuture<List<PartitionLag>>> lagFutures = 
            offsetsByTopic.entrySet().stream()
                .map(entry -> {
                    String topic = entry.getKey();
                    List<TopicPartitionOffset> topicOffsets = entry.getValue();
                    return getTopicEndOffsets(topic)
                        .thenApply(endOffsets -> calculateLagForTopic(topic, topicOffsets, endOffsets));
                })
                .collect(Collectors.toList());
                
        return CompletableFuture.allOf(
                lagFutures.toArray(new CompletableFuture[0]))
            .thenApply(v -> lagFutures.stream()
                .flatMap(future -> future.join().stream())
                .collect(Collectors.toList()));
    }
    
    private List<PartitionLag> calculateLagForTopic(String topic, 
            List<TopicPartitionOffset> consumerOffsets, 
            Map<Integer, Long> endOffsets) {
                
        return consumerOffsets.stream()
            .map(consumerOffset -> {
                int partition = consumerOffset.partition();
                long endOffset = endOffsets.getOrDefault(partition, -1L);
                long consumerOffsetValue = consumerOffset.offset();
                long lag = Math.max(0, endOffset - consumerOffsetValue);
                
                return new PartitionLag(
                    topic,
                    partition,
                    consumerOffsetValue,
                    endOffset,
                    lag
                );
            })
            .collect(Collectors.toList());
    }
    
    private CompletableFuture<Map<Integer, Long>> getTopicEndOffsets(String topic) {
        return CompletableFuture.supplyAsync(() -> {
            try (KafkaConsumer<String, String> consumer = createTempConsumer()) {
                List<org.apache.kafka.common.PartitionInfo> partitions = 
                    consumer.partitionsFor(topic);
                    
                List<TopicPartition> topicPartitions = partitions.stream()
                    .map(partitionInfo -> 
                        new TopicPartition(topic, partitionInfo.partition()))
                    .collect(Collectors.toList());
                    
                Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
                
                return endOffsets.entrySet().stream()
                    .collect(Collectors.toMap(
                        entry -> entry.getKey().partition(),
                        Map.Entry::getValue
                    ));
            }
        });
    }
    
    private KafkaConsumer<String, String> createTempConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", "java-monitor-temp");
        props.put("key.deserializer", 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "latest");
        
        return new KafkaConsumer<>(props);
    }
    
    public void close() {
        if (adminClient != null) {
            adminClient.close();
        }
    }
}

// Java 16+ Record 類替代 Scala Case Class
public record ConsumerGroupInfo(
    String groupId,
    String state,
    List<ConsumerMember> members,
    List<TopicPartitionOffset> offsets
) {}

public record ConsumerMember(
    String memberId,
    String clientId,
    String host,
    List<String> assignments
) {}

public record TopicPartitionOffset(
    String topic,
    int partition,
    long offset,
    String metadata
) {}

public record PartitionLag(
    String topic,
    int partition,
    long consumerOffset,
    long endOffset,
    long lag
) {}

3.4 Java 監控服務實現

java

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.annotation.JsonInclude;

import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

public class JavaConsumerMonitoringService {
    
    private final JavaKafkaConsumerMonitor monitor;
    private final ScheduledExecutorService scheduler;
    private final ObjectMapper objectMapper;
    private final AtomicReference<MonitoringReport> lastReport;
    
    public JavaConsumerMonitoringService(String bootstrapServers) {
        this.monitor = new JavaKafkaConsumerMonitor(bootstrapServers);
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.objectMapper = new ObjectMapper();
        this.objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
        this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        this.lastReport = new AtomicReference<>();
    }
    
    // 啓動定時監控
    public void startPeriodicMonitoring(long initialDelay, long period, TimeUnit unit) {
        scheduler.scheduleAtFixedRate(
            this::monitorConsumerGroups,
            initialDelay,
            period,
            unit
        );
    }
    
    private void monitorConsumerGroups() {
        CompletableFuture<MonitoringReport> reportFuture = 
            monitor.listConsumerGroups()
                .thenCompose(this::createMonitoringReport);
                
        reportFuture.whenComplete((report, throwable) -> {
            if (throwable != null) {
                System.err.println("監控失敗: " + throwable.getMessage());
                throwable.printStackTrace();
            } else {
                lastReport.set(report);
                printMonitoringReport(report);
                exportReportToJson(report);
            }
        });
    }
    
    private CompletableFuture<MonitoringReport> createMonitoringReport(List<String> groupIds) {
        // 並行獲取所有消費者組詳情
        List<CompletableFuture<ConsumerGroupInfo>> groupFutures = groupIds.stream()
            .map(monitor::getConsumerGroupDetails)
            .collect(Collectors.toList());
            
        // 並行計算所有消費者組的延遲
        List<CompletableFuture<List<PartitionLag>>> lagFutures = groupIds.stream()
            .map(monitor::calculateConsumerLag)
            .collect(Collectors.toList());
            
        return CompletableFuture.allOf(
                groupFutures.toArray(new CompletableFuture[0]),
                lagFutures.toArray(new CompletableFuture[0]))
            .thenApply(v -> {
                List<ConsumerGroupInfo> groupDetails = groupFutures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
                    
                List<PartitionLag> allLags = lagFutures.stream()
                    .flatMap(future -> future.join().stream())
                    .collect(Collectors.toList());
                    
                long totalLag = allLags.stream()
                    .mapToLong(PartitionLag::lag)
                    .sum();
                    
                long activeGroups = groupDetails.stream()
                    .filter(group -> "Stable".equals(group.state()))
                    .count();
                    
                return new MonitoringReport(
                    Instant.now(),
                    groupIds.size(),
                    (int) activeGroups,
                    totalLag,
                    groupDetails,
                    allLags
                );
            });
    }
    
    private void printMonitoringReport(MonitoringReport report) {
        System.out.println("=".repeat(80));
        System.out.printf("Kafka消費者監控報告 - %s%n", report.timestamp());
        System.out.println("=".repeat(80));
        System.out.printf("消費者組總數: %d%n", report.totalGroups());
        System.out.printf("活躍消費者組: %d%n", report.activeGroups());
        System.out.printf("總消費延遲: %d 條消息%n", report.totalLag());
        
        report.groupDetails().forEach(group -> {
            System.out.printf("%n消費者組: %s [%s]%n", group.groupId(), group.state());
            group.members().forEach(member -> {
                System.out.printf("  成員: %s (%s)%n", member.memberId(), member.clientId());
                System.out.printf("  主機: %s%n", member.host());
                System.out.printf("  分配分區: %s%n", 
                    String.join(", ", member.assignments()));
            });
        });
        
        if (!report.partitionLags().isEmpty()) {
            System.out.println("\n分區消費延遲詳情:");
            report.partitionLags().stream()
                .collect(Collectors.groupingBy(PartitionLag::topic))
                .forEach((topic, lags) -> {
                    System.out.printf("  主題: %s%n", topic);
                    lags.forEach(lag -> {
                        System.out.printf("    分區 %d: 消費者偏移量=%d, 最新偏移量=%d, 延遲=%d%n",
                            lag.partition(), lag.consumerOffset(), lag.endOffset(), lag.lag());
                    });
                });
        }
        
        // 預警檢查
        checkAndAlert(report);
    }
    
    private void checkAndAlert(MonitoringReport report) {
        // 延遲預警
        if (report.totalLag() > 10000) {
            System.out.println("\n⚠️  警告: 總消費延遲超過閾值!");
        }
        
        // 檢查是否有消費者組處於異常狀態
        List<String> unstableGroups = report.groupDetails().stream()
            .filter(group -> !"Stable".equals(group.state()))
            .map(ConsumerGroupInfo::groupId)
            .collect(Collectors.toList());
            
        if (!unstableGroups.isEmpty()) {
            System.out.printf("⚠️  警告: 以下消費者組狀態異常: %s%n", 
                String.join(", ", unstableGroups));
        }
        
        // 檢查高延遲分區
        List<PartitionLag> highLagPartitions = report.partitionLags().stream()
            .filter(lag -> lag.lag() > 1000)
            .collect(Collectors.toList());
            
        if (!highLagPartitions.isEmpty()) {
            System.out.println("⚠️  警告: 以下分區延遲較高:");
            highLagPartitions.forEach(lag -> 
                System.out.printf("    %s-%d: 延遲=%d%n", 
                    lag.topic(), lag.partition(), lag.lag()));
        }
    }
    
    private void exportReportToJson(MonitoringReport report) {
        try {
            String json = objectMapper.writeValueAsString(report);
            // 這裏可以保存到文件或發送到監控系統
            System.out.println("\nJSON報告已生成");
        } catch (Exception e) {
            System.err.println("生成JSON報告失敗: " + e.getMessage());
        }
    }
    
    public Optional<MonitoringReport> getLastReport() {
        return Optional.ofNullable(lastReport.get());
    }
    
    public void shutdown() {
        scheduler.shutdown();
        monitor.close();
        try {
            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

public record MonitoringReport(
    Instant timestamp,
    int totalGroups,
    int activeGroups,
    long totalLag,
    List<ConsumerGroupInfo> groupDetails,
    List<PartitionLag> partitionLags
) {}

4. 高級特性與最佳實踐

4.1 性能優化實現

java

public class OptimizedKafkaMonitor extends JavaKafkaConsumerMonitor {
    
    private final Cache<String, Map<Integer, Long>> endOffsetCache;
    private final Cache<String, ConsumerGroupInfo> groupInfoCache;
    
    public OptimizedKafkaMonitor(String bootstrapServers) {
        super(bootstrapServers);
        
        // 使用Caffeine緩存提升性能
        this.endOffsetCache = Caffeine.newBuilder()
            .expireAfterWrite(1, TimeUnit.MINUTES)
            .maximumSize(1000)
            .build();
            
        this.groupInfoCache = Caffeine.newBuilder()
            .expireAfterWrite(30, TimeUnit.SECONDS)
            .maximumSize(100)
            .build();
    }
    
    @Override
    public CompletableFuture<Map<Integer, Long>> getTopicEndOffsets(String topic) {
        // 緩存命中檢查
        Map<Integer, Long> cached = endOffsetCache.getIfPresent(topic);
        if (cached != null) {
            return CompletableFuture.completedFuture(cached);
        }
        
        return super.getTopicEndOffsets(topic)
            .thenApply(offsets -> {
                endOffsetCache.put(topic, offsets);
                return offsets;
            });
    }
    
    @Override
    public CompletableFuture<ConsumerGroupInfo> getConsumerGroupDetails(String groupId) {
        ConsumerGroupInfo cached = groupInfoCache.getIfPresent(groupId);
        if (cached != null) {
            return CompletableFuture.completedFuture(cached);
        }
        
        return super.getConsumerGroupDetails(groupId)
            .thenApply(groupInfo -> {
                groupInfoCache.put(groupId, groupInfo);
                return groupInfo;
            });
    }
    
    // 批量操作優化
    public CompletableFuture<Map<String, ConsumerGroupInfo>> 
        getMultipleConsumerGroupDetails(List<String> groupIds) {
            
        List<CompletableFuture<ConsumerGroupInfo>> futures = groupIds.stream()
            .map(this::getConsumerGroupDetails)
            .collect(Collectors.toList());
            
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> {
                Map<String, ConsumerGroupInfo> result = new HashMap<>();
                for (int i = 0; i < groupIds.size(); i++) {
                    result.put(groupIds.get(i), futures.get(i).join());
                }
                return result;
            });
    }
}

4.2 監控指標導出

java

public class MetricsExporter {
    
    private final JavaConsumerMonitoringService monitoringService;
    private final MeterRegistry meterRegistry;
    
    public MetricsExporter(JavaConsumerMonitoringService monitoringService, 
                          MeterRegistry meterRegistry) {
        this.monitoringService = monitoringService;
        this.meterRegistry = meterRegistry;
    }
    
    public void exportMetrics() {
        monitoringService.getLastReport().ifPresent(report -> {
            // 導出到Micrometer指標
            Gauge.builder("kafka.consumer.total_groups")
                .register(meterRegistry, report::totalGroups);
                
            Gauge.builder("kafka.consumer.active_groups")
                .register(meterRegistry, report::activeGroups);
                
            Gauge.builder("kafka.consumer.total_lag")
                .register(meterRegistry, report::totalLag);
                
            // 按消費者組導出延遲
            report.groupDetails().forEach(group -> {
                String groupId = group.groupId().replaceAll("[^a-zA-Z0-9_]", "_");
                
                Gauge.builder("kafka.consumer.group_state")
                    .tag("group", groupId)
                    .register(meterRegistry, () -> 
                        "Stable".equals(group.state()) ? 1 : 0);
            });
            
            // 按分區導出延遲
            report.partitionLags().forEach(lag -> {
                Gauge.builder("kafka.consumer.partition_lag")
                    .tag("topic", lag.topic())
                    .tag("partition", String.valueOf(lag.partition()))
                    .register(meterRegistry, lag::lag);
            });
        });
    }

}

5. 部署與運維

5.1 Docker 容器化部署

dockerfile

FROM openjdk:17-jdk-slim

WORKDIR /app

# 安裝必要的工具
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 複製應用
COPY target/kafka-monitor.jar /app/kafka-monitor.jar
COPY config/application.properties /app/config/application.properties

# 健康檢查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8080/health || exit 1

EXPOSE 8080

ENTRYPOINT ["java", "-jar", "kafka-monitor.jar"]

5.2 Kubernetes 部署配置

yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-consumer-monitor
spec:
  replicas: 2
  selector:
    matchLabels:
      app: kafka-monitor
  template:
    metadata:
      labels:
        app: kafka-monitor
    spec:
      containers:
      - name: monitor
        image: kafka-monitor:latest
        ports:
        - containerPort: 8080
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka-cluster:9092"
        - name: MONITORING_INTERVAL
          value: "60000"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-monitor-service
spec:
  selector:
    app: kafka-monitor
  ports:
  - port: 8080
    targetPort: 8080

6. 總結

本文詳細介紹了從 Scala 到 Java 的 Kafka 消費者監控實現遷移過程,涵蓋了:

  1. 基礎監控功能:消費者組列表、偏移量獲取、延遲計算
  2. 語言特性遷移:Scala Future 到 Java CompletableFuture 的轉換
  3. 性能優化:緩存機制、批量操作、異步處理
  4. 生產級特性:指標導出、預警機制、容器化部署

通過這種遷移,我們獲得了:

  • 更好的 Java 生態系統集成
  • 更易於維護的代碼結構
  • 更強的類型安全性
  • 更好的性能表現