1. Kafka 消費者監控概述
1.1 為什麼需要監控 Kafka 消費者
Kafka 消費者監控是確保消息系統穩定運行的關鍵環節。通過監控消費者,我們可以:
- 實時瞭解消息消費進度
- 及時發現消費延遲或積壓問題
- 診斷消費者組重新平衡問題
- 優化消費者性能和資源配置
- 保障業務數據的及時處理
1.2 監控指標分類
|
指標類別
|
具體指標
|
説明
|
|
消費進度
|
當前偏移量、最新偏移量、消費延遲
|
反映消息消費的及時性
|
|
消費者狀態
|
消費者活躍性、分區分配情況
|
反映消費者健康狀態
|
|
性能指標
|
消費速率、處理時長
|
反映消費者性能表現
|
2. Scala 實現 Kafka 消費者監控
2.1 基礎環境配置
scala
// build.sbt
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "3.4.0",
"org.apache.kafka" %% "kafka" % "3.4.0",
"com.typesafe" % "config" % "1.4.2",
"org.slf4j" % "slf4j-api" % "2.0.6",
"ch.qos.logback" % "logback-classic" % "1.4.5"
)
// application.conf
kafka {
bootstrap.servers = "localhost:9092"
group.id = "consumer-monitor"
enable.auto.commit = false
auto.offset.reset = "latest"
}
2.2 Scala 消費者組信息獲取
scala
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, ListConsumerGroupsOptions, ListConsumerGroupOffsetsOptions, ConsumerGroupListing}
import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.{TopicPartition, Node}
import scala.collection.JavaConverters._
import scala.util.{Try, Success, Failure}
import java.util.Properties
import java.time.Duration
class ScalaKafkaConsumerMonitor(bootstrapServers: String) {
private val adminClient: AdminClient = createAdminClient()
private def createAdminClient(): AdminClient = {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000")
AdminClient.create(props)
}
// 獲取所有消費者組列表
def listConsumerGroups(): Try[Seq[String]] = {
Try {
val options = new ListConsumerGroupsOptions()
.timeoutMs(10000)
val groups = adminClient.listConsumerGroups(options).all().get()
groups.asScala.map(_.groupId()).toSeq
}
}
// 獲取消費者組詳情
def getConsumerGroupDetails(groupId: String): Try[ConsumerGroupInfo] = {
Try {
// 獲取消費者組偏移量
val offsetOptions = new ListConsumerGroupOffsetsOptions()
.timeoutMs(10000)
val offsetsFuture = adminClient.listConsumerGroupOffsets(groupId, offsetOptions)
val offsets = offsetsFuture.partitionsToOffsetAndMetadata().get()
// 獲取消費者組成員信息
val describeResult = adminClient.describeConsumerGroups(Seq(groupId).asJava)
val groupDescription = describeResult.describedGroups().get(groupId).get()
ConsumerGroupInfo(
groupId = groupId,
state = groupDescription.state().toString,
members = groupDescription.members().asScala.map { member =>
ConsumerMember(
memberId = member.memberId(),
clientId = member.clientId(),
host = member.host(),
assignments = member.assignment().topicPartitions().asScala.map(_.toString).toSeq
)
}.toSeq,
offsets = offsets.asScala.map { case (tp, offsetMetadata) =>
TopicPartitionOffset(
topic = tp.topic(),
partition = tp.partition(),
offset = Option(offsetMetadata).map(_.offset()).getOrElse(-1L),
metadata = Option(offsetMetadata).flatMap(om => Option(om.metadata())).getOrElse("")
)
}.toSeq
)
}
}
// 計算消費延遲
def calculateConsumerLag(groupId: String): Try[Seq[PartitionLag]] = {
for {
groupInfo <- getConsumerGroupDetails(groupId)
lagInfo <- calculateLagForGroup(groupInfo)
} yield lagInfo
}
private def calculateLagForGroup(groupInfo: ConsumerGroupInfo): Try[Seq[PartitionLag]] = {
Try {
groupInfo.offsets.flatMap { consumerOffset =>
getTopicEndOffsets(consumerOffset.topic).map { endOffsets =>
endOffsets.map { case (partition, endOffset) =>
val consumerOffsetForPartition = groupInfo.offsets
.find(offset => offset.topic == consumerOffset.topic && offset.partition == partition)
.map(_.offset)
.getOrElse(0L)
val lag = Math.max(0, endOffset - consumerOffsetForPartition)
PartitionLag(
topic = consumerOffset.topic,
partition = partition,
consumerOffset = consumerOffsetForPartition,
endOffset = endOffset,
lag = lag
)
}
}.getOrElse(Seq.empty)
}.flatten
}
}
private def getTopicEndOffsets(topic: String): Option[Map[Int, Long]] = {
Try {
val consumer = createConsumer()
try {
val partitions = consumer.partitionsFor(topic).asScala.map(_.partition()).toSeq
val topicPartitions = partitions.map(new TopicPartition(topic, _)).asJava
val endOffsets = consumer.endOffsets(topicPartitions)
endOffsets.asScala.map { case (tp, offset) => tp.partition() -> offset }.toMap
} finally {
consumer.close()
}
}.toOption
}
private def createConsumer(): KafkaConsumer[String, String] = {
val props = new Properties()
props.put("bootstrap.servers", bootstrapServers)
props.put("group.id", "temp-monitor")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
new KafkaConsumer[String, String](props)
}
def close(): Unit = {
adminClient.close()
}
}
// 數據模型
case class ConsumerGroupInfo(
groupId: String,
state: String,
members: Seq[ConsumerMember],
offsets: Seq[TopicPartitionOffset]
)
case class ConsumerMember(
memberId: String,
clientId: String,
host: String,
assignments: Seq[String]
)
case class TopicPartitionOffset(
topic: String,
partition: Int,
offset: Long,
metadata: String
)
case class PartitionLag(
topic: String,
partition: Int,
consumerOffset: Long,
endOffset: Long,
lag: Long
)
2.3 Scala 監控服務實現
scala
import akka.actor.ActorSystem
import akka.stream.Materializer
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._
class ScalaConsumerMonitoringService(monitor: ScalaKafkaConsumerMonitor)(
implicit system: ActorSystem,
ec: ExecutionContext,
mat: Materializer
) {
// 定時監控任務
def startPeriodicMonitoring(interval: FiniteDuration = 1.minute): Unit = {
system.scheduler.scheduleAtFixedRate(0.seconds, interval) { () =>
monitorConsumerGroups().onComplete {
case Success(report) =>
println(s"監控報告生成時間: ${java.time.Instant.now()}")
printMonitoringReport(report)
case Failure(exception) =>
println(s"監控失敗: ${exception.getMessage}")
}
}
}
private def monitorConsumerGroups(): Future[MonitoringReport] = {
Future {
val groups = monitor.listConsumerGroups().get
val groupDetails = groups.flatMap { groupId =>
monitor.getConsumerGroupDetails(groupId).toOption
}
val lagInfo = groupDetails.flatMap { group =>
monitor.calculateConsumerLag(group.groupId).getOrElse(Seq.empty)
}
MonitoringReport(
timestamp = java.time.Instant.now(),
totalGroups = groups.size,
activeGroups = groupDetails.count(_.state == "Stable"),
totalLag = lagInfo.map(_.lag).sum,
groupDetails = groupDetails,
partitionLags = lagInfo
)
}
}
private def printMonitoringReport(report: MonitoringReport): Unit = {
println("=" * 80)
println(s"Kafka消費者監控報告 - ${report.timestamp}")
println("=" * 80)
println(s"消費者組總數: ${report.totalGroups}")
println(s"活躍消費者組: ${report.activeGroups}")
println(s"總消費延遲: ${report.totalLag} 條消息")
report.groupDetails.foreach { group =>
println(s"\n消費者組: ${group.groupId} [${group.state}]")
group.members.foreach { member =>
println(s" 成員: ${member.memberId} (${member.clientId})")
println(s" 主機: ${member.host}")
println(s" 分配分區: ${member.assignments.mkString(", ")}")
}
}
if (report.partitionLags.nonEmpty) {
println("\n分區消費延遲詳情:")
report.partitionLags.groupBy(_.topic).foreach { case (topic, lags) =>
println(s" 主題: $topic")
lags.foreach { lag =>
println(s" 分區 ${lag.partition}: 消費者偏移量=${lag.consumerOffset}, " +
s"最新偏移量=${lag.endOffset}, 延遲=${lag.lag}")
}
}
}
}
}
case class MonitoringReport(
timestamp: java.time.Instant,
totalGroups: Int,
activeGroups: Int,
totalLag: Long,
groupDetails: Seq[ConsumerGroupInfo],
partitionLags: Seq[PartitionLag]
)
3. 從 Scala 遷移到 Java 的全面指南
3.1 語言特性對比與遷移策略
|
Scala 特性
|
Java 替代方案
|
遷移建議
|
|
Case Class
|
Record (Java 14+) / Lombok
|
使用不可變數據類
|
|
Option
|
Optional
|
顯式處理空值
|
|
Future
|
CompletableFuture
|
異步編程轉換
|
|
隱式參數
|
顯式依賴注入
|
提高代碼可讀性
|
|
集合操作
|
Stream API
|
函數式數據處理
|
3.2 Java 環境配置
xml
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
</dependencies>
3.3 Java 版本消費者監控實現
java
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.ConsumerGroupState;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class JavaKafkaConsumerMonitor {
private final AdminClient adminClient;
private final String bootstrapServers;
public JavaKafkaConsumerMonitor(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
this.adminClient = createAdminClient();
}
private AdminClient createAdminClient() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return AdminClient.create(props);
}
// 獲取所有消費者組
public CompletableFuture<List<String>> listConsumerGroups() {
ListConsumerGroupsOptions options = new ListConsumerGroupsOptions()
.timeoutMs(10000);
return adminClient.listConsumerGroups(options)
.all()
.thenApply(groups ->
groups.stream()
.map(ConsumerGroupListing::groupId)
.collect(Collectors.toList())
)
.toCompletableFuture();
}
// 獲取消費者組詳情
public CompletableFuture<ConsumerGroupInfo> getConsumerGroupDetails(String groupId) {
// 並行獲取偏移量和組成員信息
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsFuture =
getConsumerGroupOffsets(groupId);
CompletableFuture<ConsumerGroupDescription> descriptionFuture =
getConsumerGroupDescription(groupId);
return offsetsFuture.thenCombine(descriptionFuture, (offsets, description) -> {
List<ConsumerMember> members = description.members().stream()
.map(member -> new ConsumerMember(
member.memberId(),
member.clientId(),
member.host(),
member.assignment().topicPartitions().stream()
.map(TopicPartition::toString)
.collect(Collectors.toList())
))
.collect(Collectors.toList());
List<TopicPartitionOffset> topicOffsets = offsets.entrySet().stream()
.map(entry -> {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offsetMeta = entry.getValue();
return new TopicPartitionOffset(
tp.topic(),
tp.partition(),
offsetMeta.offset(),
offsetMeta.metadata() != null ? offsetMeta.metadata() : ""
);
})
.collect(Collectors.toList());
return new ConsumerGroupInfo(
groupId,
description.state().toString(),
members,
topicOffsets
);
});
}
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
getConsumerGroupOffsets(String groupId) {
ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions()
.timeoutMs(10000);
return adminClient.listConsumerGroupOffsets(groupId, options)
.partitionsToOffsetAndMetadata()
.thenApply(Collections::unmodifiableMap)
.toCompletableFuture();
}
private CompletableFuture<ConsumerGroupDescription>
getConsumerGroupDescription(String groupId) {
return adminClient.describeConsumerGroups(Collections.singletonList(groupId))
.describedGroups()
.get(groupId)
.thenApply(description -> {
try {
return description.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to get group description", e);
}
})
.toCompletableFuture();
}
// 計算消費延遲
public CompletableFuture<List<PartitionLag>> calculateConsumerLag(String groupId) {
return getConsumerGroupDetails(groupId)
.thenCompose(this::calculateLagForGroup);
}
private CompletableFuture<List<PartitionLag>> calculateLagForGroup(ConsumerGroupInfo groupInfo) {
// 按主題分組獲取末端偏移量
Map<String, List<TopicPartitionOffset>> offsetsByTopic = groupInfo.offsets().stream()
.collect(Collectors.groupingBy(TopicPartitionOffset::topic));
List<CompletableFuture<List<PartitionLag>>> lagFutures =
offsetsByTopic.entrySet().stream()
.map(entry -> {
String topic = entry.getKey();
List<TopicPartitionOffset> topicOffsets = entry.getValue();
return getTopicEndOffsets(topic)
.thenApply(endOffsets -> calculateLagForTopic(topic, topicOffsets, endOffsets));
})
.collect(Collectors.toList());
return CompletableFuture.allOf(
lagFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> lagFutures.stream()
.flatMap(future -> future.join().stream())
.collect(Collectors.toList()));
}
private List<PartitionLag> calculateLagForTopic(String topic,
List<TopicPartitionOffset> consumerOffsets,
Map<Integer, Long> endOffsets) {
return consumerOffsets.stream()
.map(consumerOffset -> {
int partition = consumerOffset.partition();
long endOffset = endOffsets.getOrDefault(partition, -1L);
long consumerOffsetValue = consumerOffset.offset();
long lag = Math.max(0, endOffset - consumerOffsetValue);
return new PartitionLag(
topic,
partition,
consumerOffsetValue,
endOffset,
lag
);
})
.collect(Collectors.toList());
}
private CompletableFuture<Map<Integer, Long>> getTopicEndOffsets(String topic) {
return CompletableFuture.supplyAsync(() -> {
try (KafkaConsumer<String, String> consumer = createTempConsumer()) {
List<org.apache.kafka.common.PartitionInfo> partitions =
consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = partitions.stream()
.map(partitionInfo ->
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList());
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
return endOffsets.entrySet().stream()
.collect(Collectors.toMap(
entry -> entry.getKey().partition(),
Map.Entry::getValue
));
}
});
}
private KafkaConsumer<String, String> createTempConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "java-monitor-temp");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
return new KafkaConsumer<>(props);
}
public void close() {
if (adminClient != null) {
adminClient.close();
}
}
}
// Java 16+ Record 類替代 Scala Case Class
public record ConsumerGroupInfo(
String groupId,
String state,
List<ConsumerMember> members,
List<TopicPartitionOffset> offsets
) {}
public record ConsumerMember(
String memberId,
String clientId,
String host,
List<String> assignments
) {}
public record TopicPartitionOffset(
String topic,
int partition,
long offset,
String metadata
) {}
public record PartitionLag(
String topic,
int partition,
long consumerOffset,
long endOffset,
long lag
) {}
3.4 Java 監控服務實現
java
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
public class JavaConsumerMonitoringService {
private final JavaKafkaConsumerMonitor monitor;
private final ScheduledExecutorService scheduler;
private final ObjectMapper objectMapper;
private final AtomicReference<MonitoringReport> lastReport;
public JavaConsumerMonitoringService(String bootstrapServers) {
this.monitor = new JavaKafkaConsumerMonitor(bootstrapServers);
this.scheduler = Executors.newScheduledThreadPool(1);
this.objectMapper = new ObjectMapper();
this.objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
this.lastReport = new AtomicReference<>();
}
// 啓動定時監控
public void startPeriodicMonitoring(long initialDelay, long period, TimeUnit unit) {
scheduler.scheduleAtFixedRate(
this::monitorConsumerGroups,
initialDelay,
period,
unit
);
}
private void monitorConsumerGroups() {
CompletableFuture<MonitoringReport> reportFuture =
monitor.listConsumerGroups()
.thenCompose(this::createMonitoringReport);
reportFuture.whenComplete((report, throwable) -> {
if (throwable != null) {
System.err.println("監控失敗: " + throwable.getMessage());
throwable.printStackTrace();
} else {
lastReport.set(report);
printMonitoringReport(report);
exportReportToJson(report);
}
});
}
private CompletableFuture<MonitoringReport> createMonitoringReport(List<String> groupIds) {
// 並行獲取所有消費者組詳情
List<CompletableFuture<ConsumerGroupInfo>> groupFutures = groupIds.stream()
.map(monitor::getConsumerGroupDetails)
.collect(Collectors.toList());
// 並行計算所有消費者組的延遲
List<CompletableFuture<List<PartitionLag>>> lagFutures = groupIds.stream()
.map(monitor::calculateConsumerLag)
.collect(Collectors.toList());
return CompletableFuture.allOf(
groupFutures.toArray(new CompletableFuture[0]),
lagFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<ConsumerGroupInfo> groupDetails = groupFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
List<PartitionLag> allLags = lagFutures.stream()
.flatMap(future -> future.join().stream())
.collect(Collectors.toList());
long totalLag = allLags.stream()
.mapToLong(PartitionLag::lag)
.sum();
long activeGroups = groupDetails.stream()
.filter(group -> "Stable".equals(group.state()))
.count();
return new MonitoringReport(
Instant.now(),
groupIds.size(),
(int) activeGroups,
totalLag,
groupDetails,
allLags
);
});
}
private void printMonitoringReport(MonitoringReport report) {
System.out.println("=".repeat(80));
System.out.printf("Kafka消費者監控報告 - %s%n", report.timestamp());
System.out.println("=".repeat(80));
System.out.printf("消費者組總數: %d%n", report.totalGroups());
System.out.printf("活躍消費者組: %d%n", report.activeGroups());
System.out.printf("總消費延遲: %d 條消息%n", report.totalLag());
report.groupDetails().forEach(group -> {
System.out.printf("%n消費者組: %s [%s]%n", group.groupId(), group.state());
group.members().forEach(member -> {
System.out.printf(" 成員: %s (%s)%n", member.memberId(), member.clientId());
System.out.printf(" 主機: %s%n", member.host());
System.out.printf(" 分配分區: %s%n",
String.join(", ", member.assignments()));
});
});
if (!report.partitionLags().isEmpty()) {
System.out.println("\n分區消費延遲詳情:");
report.partitionLags().stream()
.collect(Collectors.groupingBy(PartitionLag::topic))
.forEach((topic, lags) -> {
System.out.printf(" 主題: %s%n", topic);
lags.forEach(lag -> {
System.out.printf(" 分區 %d: 消費者偏移量=%d, 最新偏移量=%d, 延遲=%d%n",
lag.partition(), lag.consumerOffset(), lag.endOffset(), lag.lag());
});
});
}
// 預警檢查
checkAndAlert(report);
}
private void checkAndAlert(MonitoringReport report) {
// 延遲預警
if (report.totalLag() > 10000) {
System.out.println("\n⚠️ 警告: 總消費延遲超過閾值!");
}
// 檢查是否有消費者組處於異常狀態
List<String> unstableGroups = report.groupDetails().stream()
.filter(group -> !"Stable".equals(group.state()))
.map(ConsumerGroupInfo::groupId)
.collect(Collectors.toList());
if (!unstableGroups.isEmpty()) {
System.out.printf("⚠️ 警告: 以下消費者組狀態異常: %s%n",
String.join(", ", unstableGroups));
}
// 檢查高延遲分區
List<PartitionLag> highLagPartitions = report.partitionLags().stream()
.filter(lag -> lag.lag() > 1000)
.collect(Collectors.toList());
if (!highLagPartitions.isEmpty()) {
System.out.println("⚠️ 警告: 以下分區延遲較高:");
highLagPartitions.forEach(lag ->
System.out.printf(" %s-%d: 延遲=%d%n",
lag.topic(), lag.partition(), lag.lag()));
}
}
private void exportReportToJson(MonitoringReport report) {
try {
String json = objectMapper.writeValueAsString(report);
// 這裏可以保存到文件或發送到監控系統
System.out.println("\nJSON報告已生成");
} catch (Exception e) {
System.err.println("生成JSON報告失敗: " + e.getMessage());
}
}
public Optional<MonitoringReport> getLastReport() {
return Optional.ofNullable(lastReport.get());
}
public void shutdown() {
scheduler.shutdown();
monitor.close();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
public record MonitoringReport(
Instant timestamp,
int totalGroups,
int activeGroups,
long totalLag,
List<ConsumerGroupInfo> groupDetails,
List<PartitionLag> partitionLags
) {}
4. 高級特性與最佳實踐
4.1 性能優化實現
java
public class OptimizedKafkaMonitor extends JavaKafkaConsumerMonitor {
private final Cache<String, Map<Integer, Long>> endOffsetCache;
private final Cache<String, ConsumerGroupInfo> groupInfoCache;
public OptimizedKafkaMonitor(String bootstrapServers) {
super(bootstrapServers);
// 使用Caffeine緩存提升性能
this.endOffsetCache = Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.maximumSize(1000)
.build();
this.groupInfoCache = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.SECONDS)
.maximumSize(100)
.build();
}
@Override
public CompletableFuture<Map<Integer, Long>> getTopicEndOffsets(String topic) {
// 緩存命中檢查
Map<Integer, Long> cached = endOffsetCache.getIfPresent(topic);
if (cached != null) {
return CompletableFuture.completedFuture(cached);
}
return super.getTopicEndOffsets(topic)
.thenApply(offsets -> {
endOffsetCache.put(topic, offsets);
return offsets;
});
}
@Override
public CompletableFuture<ConsumerGroupInfo> getConsumerGroupDetails(String groupId) {
ConsumerGroupInfo cached = groupInfoCache.getIfPresent(groupId);
if (cached != null) {
return CompletableFuture.completedFuture(cached);
}
return super.getConsumerGroupDetails(groupId)
.thenApply(groupInfo -> {
groupInfoCache.put(groupId, groupInfo);
return groupInfo;
});
}
// 批量操作優化
public CompletableFuture<Map<String, ConsumerGroupInfo>>
getMultipleConsumerGroupDetails(List<String> groupIds) {
List<CompletableFuture<ConsumerGroupInfo>> futures = groupIds.stream()
.map(this::getConsumerGroupDetails)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
Map<String, ConsumerGroupInfo> result = new HashMap<>();
for (int i = 0; i < groupIds.size(); i++) {
result.put(groupIds.get(i), futures.get(i).join());
}
return result;
});
}
}
4.2 監控指標導出
java
public class MetricsExporter {
private final JavaConsumerMonitoringService monitoringService;
private final MeterRegistry meterRegistry;
public MetricsExporter(JavaConsumerMonitoringService monitoringService,
MeterRegistry meterRegistry) {
this.monitoringService = monitoringService;
this.meterRegistry = meterRegistry;
}
public void exportMetrics() {
monitoringService.getLastReport().ifPresent(report -> {
// 導出到Micrometer指標
Gauge.builder("kafka.consumer.total_groups")
.register(meterRegistry, report::totalGroups);
Gauge.builder("kafka.consumer.active_groups")
.register(meterRegistry, report::activeGroups);
Gauge.builder("kafka.consumer.total_lag")
.register(meterRegistry, report::totalLag);
// 按消費者組導出延遲
report.groupDetails().forEach(group -> {
String groupId = group.groupId().replaceAll("[^a-zA-Z0-9_]", "_");
Gauge.builder("kafka.consumer.group_state")
.tag("group", groupId)
.register(meterRegistry, () ->
"Stable".equals(group.state()) ? 1 : 0);
});
// 按分區導出延遲
report.partitionLags().forEach(lag -> {
Gauge.builder("kafka.consumer.partition_lag")
.tag("topic", lag.topic())
.tag("partition", String.valueOf(lag.partition()))
.register(meterRegistry, lag::lag);
});
});
}
}
5. 部署與運維
5.1 Docker 容器化部署
dockerfile
FROM openjdk:17-jdk-slim
WORKDIR /app
# 安裝必要的工具
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# 複製應用
COPY target/kafka-monitor.jar /app/kafka-monitor.jar
COPY config/application.properties /app/config/application.properties
# 健康檢查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "kafka-monitor.jar"]
5.2 Kubernetes 部署配置
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-consumer-monitor
spec:
replicas: 2
selector:
matchLabels:
app: kafka-monitor
template:
metadata:
labels:
app: kafka-monitor
spec:
containers:
- name: monitor
image: kafka-monitor:latest
ports:
- containerPort: 8080
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka-cluster:9092"
- name: MONITORING_INTERVAL
value: "60000"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: kafka-monitor-service
spec:
selector:
app: kafka-monitor
ports:
- port: 8080
targetPort: 8080
6. 總結
本文詳細介紹了從 Scala 到 Java 的 Kafka 消費者監控實現遷移過程,涵蓋了:
- 基礎監控功能:消費者組列表、偏移量獲取、延遲計算
- 語言特性遷移:Scala Future 到 Java CompletableFuture 的轉換
- 性能優化:緩存機制、批量操作、異步處理
- 生產級特性:指標導出、預警機制、容器化部署
通過這種遷移,我們獲得了:
- 更好的 Java 生態系統集成
- 更易於維護的代碼結構
- 更強的類型安全性
- 更好的性能表現