之前写过一篇如何监控 kafka 消费 Lag 情况,五年前写的,在 google 上访问量很大,最近正好需要再写这个功能,就查看了最新 API,发现从2.5.0
版本后新增了listOffsets
方法,让计算 Lag 更简单方便和安全,代码量有质的下降,因为舍弃一些功能,代码精简的了很多。
这里我用最新版做演示,在 pom 文件中增加依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.1</version></dependency>
首先初始化 AdminClient
Properties config = new Properties();config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);AdminClient adminClient = AdminClient.create(config);
然后根据 topic 和 groupId 计算 Lag,这种方案要比之前方式优雅了很多。
public long getConsumerLag(String topicName,String consumerGroupId) {try {// 获取主题描述TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topicName)).topicNameValues().get(topicName).get();List<TopicPartition> partitions = topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition())).collect(Collectors.toList());// 获取每个分区的最新偏移量Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>(partitions.size());for (TopicPartition partition : partitions) {requestLatestOffsets.put(partition, OffsetSpec.latest());}ListOffsetsResult listOffsetsResult = adminClient.listOffsets(requestLatestOffsets);Map<TopicPartition, Long> latestOffsets = new HashMap<>(partitions.size());for (TopicPartition partition : partitions) {latestOffsets.put(partition, listOffsetsResult.partitionResult(partition).get().offset());}// 获取消费者组的当前偏移量ListConsumerGroupOffsetsSpec listConsumerGroupOffsetsSpe = new ListConsumerGroupOffsetsSpec();listConsumerGroupOffsetsSpe.topicPartitions(partitions);Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = new HashMap<String, ListConsumerGroupOffsetsSpec>(){{put(consumerGroupId, listConsumerGroupOffsetsSpe);}};Map<TopicPartition, OffsetAndMetadata> currentOffsets = adminClient.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get();long lag = 0;for (TopicPartition partition : partitions) {OffsetAndMetadata offsetMetadata = currentOffsets.get(partition);long currentOffset = offsetMetadata != null ? offsetMetadata.offset() : 0;lag += latestOffsets.get(partition) - currentOffset;}return lag;} catch (InterruptedException | ExecutionException e) {return -1;}}
注意
API 对 kafka 的兼容性,我在 kakfa 服务器版本2.6.0
测试通过,更低版本,建议自测!
该方法对消息过期,计算 Lag 存在一定错误,请注意!!!
对于如何监控 kafka 消费 Lag 情况原文中 consumerOffset 存在一处错误,为避免 reblance,不要subscribe
,建议更改为以下
KafkaConsumers<String, String> kafkaConsumer = new KafkaConsumers<>(consumerProps);List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName);List<TopicPartition>topicPatitions = new ArrayList<>();patitions.forEach(patition->{TopicPartition topicPartition = new TopicPartition(topicName,patition.partition());topicPatitions.add(topicPartition);});Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);