From dd2e29dd40af2138f31e6bef759d0cab6cc53317 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 16 Sep 2020 21:04:47 +0800 Subject: [PATCH] bugfix, fix collect consumer metrics task --- .../entity/dto/consumer/ConsumerDTO.java | 61 +++++-- .../collector/CollectConsumerMetricsTask.java | 42 +++-- .../service/service/ConsumerService.java | 3 +- .../service/impl/ConsumerServiceImpl.java | 172 +++++++++--------- .../service/impl/TopicServiceImpl.java | 6 +- 5 files changed, 150 insertions(+), 134 deletions(-) diff --git a/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/consumer/ConsumerDTO.java b/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/consumer/ConsumerDTO.java index 5f686efa..c96c6496 100644 --- a/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/consumer/ConsumerDTO.java +++ b/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/consumer/ConsumerDTO.java @@ -1,8 +1,5 @@ package com.xiaojukeji.kafka.manager.common.entity.dto.consumer; -import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState; - -import java.util.List; import java.util.Map; /** @@ -11,20 +8,33 @@ import java.util.Map; * @date 2015/11/12 */ public class ConsumerDTO { - /** - * 消费group名 - */ + private Long clusterId; + + private String topicName; + private String consumerGroup; - /** - * 消费类型,一般为static - */ private String location; - /** - * 订阅的每个topic的partition状态列表 - */ - private Map> topicPartitionMap; + private Map partitionOffsetMap; + + private Map consumerOffsetMap; + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } public String getConsumerGroup() { return consumerGroup; @@ -42,20 +52,31 @@ public class ConsumerDTO { this.location = location; } - public Map> getTopicPartitionMap() { - return topicPartitionMap; + public Map getPartitionOffsetMap() { + return partitionOffsetMap; } - public void setTopicPartitionMap(Map> topicPartitionMap) { - this.topicPartitionMap = topicPartitionMap; + public void setPartitionOffsetMap(Map partitionOffsetMap) { + this.partitionOffsetMap = partitionOffsetMap; + } + + public Map getConsumerOffsetMap() { + return consumerOffsetMap; + } + + public void setConsumerOffsetMap(Map consumerOffsetMap) { + this.consumerOffsetMap = consumerOffsetMap; } @Override public String toString() { - return "Consumer{" + - "consumerGroup='" + consumerGroup + '\'' + + return "ConsumerDTO{" + + "clusterId=" + clusterId + + ", topicName='" + topicName + '\'' + + ", consumerGroup='" + consumerGroup + '\'' + ", location='" + location + '\'' + - ", topicPartitionMap=" + topicPartitionMap + + ", partitionOffsetMap=" + partitionOffsetMap + + ", consumerOffsetMap=" + consumerOffsetMap + '}'; } } diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/collector/CollectConsumerMetricsTask.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/collector/CollectConsumerMetricsTask.java index d589b4d0..78770927 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/collector/CollectConsumerMetricsTask.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/collector/CollectConsumerMetricsTask.java @@ -2,12 +2,12 @@ package com.xiaojukeji.kafka.manager.service.collector; import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetrics; -import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState; import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerDTO; import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO; import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache; import com.xiaojukeji.kafka.manager.service.service.ConsumerService; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +34,8 @@ public class CollectConsumerMetricsTask extends BaseCollectTask { if (clusterDO == null) { return; } - Map> topicNamePartitionStateListMap = new HashMap<>(); - List consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, topicNamePartitionStateListMap); + Map allPartitionOffsetMap = new HashMap<>(); + List consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, allPartitionOffsetMap); List consumerMetricsList = convert2ConsumerMetrics(consumerDTOList); KafkaMetricsCache.putConsumerMetricsToCache(clusterId, consumerMetricsList); @@ -47,23 +47,27 @@ public class CollectConsumerMetricsTask extends BaseCollectTask { private List convert2ConsumerMetrics(List consumerDTOList) { List consumerMetricsList = new ArrayList<>(); for (ConsumerDTO consumerDTO : consumerDTOList) { - Map> topicNamePartitionStateListMap = consumerDTO.getTopicPartitionMap(); - for(Map.Entry> entry : topicNamePartitionStateListMap.entrySet()){ - String topicName = entry.getKey(); - List partitionStateList = entry.getValue(); - ConsumerMetrics consumerMetrics = new ConsumerMetrics(); - consumerMetrics.setClusterId(clusterId); - consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup()); - consumerMetrics.setLocation(consumerDTO.getLocation()); - consumerMetrics.setTopicName(topicName); - long sumLag = 0; - for (PartitionState partitionState : partitionStateList) { - Map.Entry offsetEntry = new AbstractMap.SimpleEntry<>(partitionState.getOffset(), partitionState.getConsumeOffset()); - sumLag += (offsetEntry.getKey() - offsetEntry.getValue() > 0 ? offsetEntry.getKey() - offsetEntry.getValue(): 0); - } - consumerMetrics.setSumLag(sumLag); - consumerMetricsList.add(consumerMetrics); + if (consumerDTO.getPartitionOffsetMap() == null || consumerDTO.getConsumerOffsetMap() == null) { + continue; } + + ConsumerMetrics consumerMetrics = new ConsumerMetrics(); + consumerMetrics.setClusterId(consumerDTO.getClusterId()); + consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup()); + consumerMetrics.setLocation(consumerDTO.getLocation()); + consumerMetrics.setTopicName(consumerDTO.getTopicName()); + + long sumLag = 0; + for(Map.Entry entry : consumerDTO.getPartitionOffsetMap().entrySet()){ + Long partitionOffset = entry.getValue(); + Long consumerOffset = consumerDTO.getConsumerOffsetMap().get(entry.getKey()); + if (partitionOffset == null || consumerOffset == null) { + continue; + } + sumLag += Math.max(partitionOffset - consumerOffset, 0); + } + consumerMetrics.setSumLag(sumLag); + consumerMetricsList.add(consumerMetrics); } return consumerMetricsList; } diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java index 94f4e3b3..929aa11a 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java @@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerGroupDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.PartitionOffsetDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO; import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO; +import org.apache.kafka.common.TopicPartition; import java.util.List; import java.util.Map; @@ -57,7 +58,7 @@ public interface ConsumerService { * @return */ List getMonitoredConsumerList(ClusterDO clusterDO, - Map> topicNamePartitionStateListMap); + Map partitionOffsetMap); /** * 重置offset diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java index fb7d635e..1ae6a26f 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java @@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.constant.OffsetStoreLocation; import com.xiaojukeji.kafka.manager.common.constant.StatusCode; -import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO; import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO; @@ -18,7 +17,6 @@ import com.xiaojukeji.kafka.manager.service.cache.ConsumerMetadataCache; import com.xiaojukeji.kafka.manager.service.cache.KafkaClientCache; import com.xiaojukeji.kafka.manager.service.service.ConsumerService; import com.xiaojukeji.kafka.manager.service.service.TopicService; -import com.xiaojukeji.kafka.manager.service.service.ZookeeperService; import com.xiaojukeji.kafka.manager.common.utils.zk.ZkPathUtil; import kafka.admin.AdminClient; import org.apache.commons.lang.StringUtils; @@ -49,9 +47,6 @@ public class ConsumerServiceImpl implements ConsumerService { @Autowired private TopicService topicService; - @Autowired - private ZookeeperService zkService; - private final ExecutorService consumerListThreadPool = Executors.newFixedThreadPool(50, new DefaultThreadFactory("ConsumerPool")); @Override @@ -135,20 +130,20 @@ public class ConsumerServiceImpl implements ConsumerService { @Override public List getMonitoredConsumerList(final ClusterDO clusterDO, - final Map> partitionStateListMap) { + final Map allPartitionOffsetMap) { List consumerGroupDTOList = getConsumerGroupList(clusterDO.getId()); if (consumerGroupDTOList == null || consumerGroupDTOList.isEmpty()) { return new ArrayList<>(); } - FutureTask[] taskList = new FutureTask[consumerGroupDTOList.size()]; + FutureTask>[] taskList = new FutureTask[consumerGroupDTOList.size()]; for (int i = 0; i < consumerGroupDTOList.size(); i++) { final ConsumerGroupDTO consumerGroupDTO = consumerGroupDTOList.get(i); - taskList[i] = new FutureTask<>(new Callable() { + taskList[i] = new FutureTask<>(new Callable>() { @Override - public ConsumerDTO call() throws Exception { + public List call() throws Exception { try { - return getMonitoredConsumer(clusterDO, consumerGroupDTO, partitionStateListMap); + return getMonitoredConsumer(clusterDO, consumerGroupDTO, allPartitionOffsetMap); } catch (Exception e) { logger.error("get monitored consumer error, group:{}", consumerGroupDTO.getConsumerGroup(), e); } @@ -159,31 +154,70 @@ public class ConsumerServiceImpl implements ConsumerService { } List consumerList = new ArrayList<>(); - for (FutureTask task : taskList) { - ConsumerDTO consumer = null; + for (FutureTask> task : taskList) { + List dtoList = null; try { - consumer = task.get(); + dtoList = task.get(); } catch (Exception e) { logger.error("getMonitoredConsumerList@ConsumeServiceImpl, ", e); } - if (consumer == null) { + if (dtoList == null) { continue; } - consumerList.add(consumer); + consumerList.addAll(dtoList); } return consumerList; } - private ConsumerDTO getMonitoredConsumer(ClusterDO cluster, ConsumerGroupDTO consumerGroupDTO, Map> globalTopicNamePartitionStateListMap) { - // 获取当前consumerGroup下的所有的topic的partitionState信息 - Map> topicNamePartitionStateListMap = getConsumerGroupPartitionStateList(cluster, consumerGroupDTO, globalTopicNamePartitionStateListMap); + private List getMonitoredConsumer(ClusterDO clusterDO, + ConsumerGroupDTO consumerGroupDTO, + Map allPartitionOffsetMap) { + List dtoList = new ArrayList<>(); - //将没有对应consumer的partition信息统一放到一个consumer中 - ConsumerDTO consumerDTO = new ConsumerDTO(); - consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup()); - consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().name()); - consumerDTO.setTopicPartitionMap(topicNamePartitionStateListMap); - return consumerDTO; + List topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList( + clusterDO.getId(), + consumerGroupDTO.getOffsetStoreLocation().getLocation(), + consumerGroupDTO.getConsumerGroup() + ); + for (String topicName : topicNameList) { + TopicMetadata metadata = ClusterMetadataManager.getTopicMetaData(clusterDO.getId(), topicName); + if (metadata == null || metadata.getPartitionNum() <= 0) { + continue; + } + if (!allPartitionOffsetMap.containsKey(new TopicPartition(topicName, 0))) { + Map offsetMap = topicService.getTopicPartitionOffset(clusterDO, topicName); + if (offsetMap == null) { + offsetMap = new HashMap<>(); + } + allPartitionOffsetMap.putAll(offsetMap); + } + + Map consumerOffsetMap = null; + if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) { + consumerOffsetMap = getTopicConsumerOffsetInZK(clusterDO, metadata, consumerGroupDTO); + } else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) { + consumerOffsetMap = getTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO); + } + + Map partitionOffsetMap = new HashMap<>(); + for (int partitionId = 0; partitionId < metadata.getPartitionNum(); ++partitionId) { + Long offset = allPartitionOffsetMap.get(new TopicPartition(topicName, partitionId)); + if (offset == null) { + continue; + } + partitionOffsetMap.put(partitionId, offset); + } + + ConsumerDTO consumerDTO = new ConsumerDTO(); + consumerDTO.setClusterId(clusterDO.getId()); + consumerDTO.setTopicName(topicName); + consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup()); + consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().getLocation()); + consumerDTO.setPartitionOffsetMap(partitionOffsetMap); + consumerDTO.setConsumerOffsetMap(consumerOffsetMap); + dtoList.add(consumerDTO); + } + return dtoList; } @Override @@ -264,52 +298,15 @@ public class ConsumerServiceImpl implements ConsumerService { kafkaConsumer.commitSync(); } - /** - * 获取属于该集群和consumerGroup下的所有topic的信息 - */ - private Map> getConsumerGroupPartitionStateList(ClusterDO clusterDO, - ConsumerGroupDTO consumerGroupDTO, - Map> globalTopicNamePartitionStateListMap) { - Map> topicNamePartitionStateListMap = new HashMap<>(2); + private Map getTopicConsumerOffsetInZK(ClusterDO clusterDO, + TopicMetadata topicMetadata, + ConsumerGroupDTO consumerGroupDTO) { + Map offsetMap = new HashMap<>(); - List topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(clusterDO.getId(),consumerGroupDTO.getOffsetStoreLocation().getLocation(), consumerGroupDTO.getConsumerGroup()); - for (String topicName : topicNameList) { - if (!ClusterMetadataManager.isTopicExist(clusterDO.getId(), topicName)) { - continue; - } - - List partitionStateList = globalTopicNamePartitionStateListMap.get(topicName); - if (partitionStateList == null) { - try { - partitionStateList = zkService.getTopicPartitionState(clusterDO.getId(), topicName); - } catch (Exception e) { - logger.error("get topic partition state failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e); - } - if (partitionStateList == null) { - continue; - } - globalTopicNamePartitionStateListMap.put(topicName, partitionStateList); - } - List consumerGroupPartitionStateList = new ArrayList<>(); - for (PartitionState partitionState: partitionStateList) { - consumerGroupPartitionStateList.add((PartitionState) partitionState.clone()); - } - - if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) { - updateTopicConsumerOffsetInZK(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList); - } else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) { - updateTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList); - } - topicNamePartitionStateListMap.put(topicName, consumerGroupPartitionStateList); - } - return topicNamePartitionStateListMap; - } - - private void updateTopicConsumerOffsetInZK(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List partitionStateList) { - ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(cluster.getId()); - for (PartitionState partitionState : partitionStateList) { + ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(clusterDO.getId()); + for (int partitionId = 0; partitionId < topicMetadata.getPartitionNum(); ++partitionId) { //offset存储于zk中 - String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId()); + String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicMetadata.getTopic(), partitionId); String offset = null; try { Stat stat = zkConfig.getNodeStat(consumerGroupOffsetLocation); @@ -317,39 +314,32 @@ public class ConsumerServiceImpl implements ConsumerService { continue; } offset = zkConfig.get(consumerGroupOffsetLocation); + offsetMap.put(partitionId, Long.valueOf(offset)); } catch (ConfigException e) { e.printStackTrace(); } - - String consumerId = null; - try { - consumerId = zkConfig.get(ZkPathUtil.getConsumerGroupOwnersTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId())); - } catch (ConfigException e) { -// logger.error("get consumerId error in updateTopicConsumerOffsetInZK cluster:{} topic:{} consumerGroup:{}", cluster.getClusterName(), topicName, consumerGroupDTO.getConsumerGroup()); - } - partitionState.setConsumerGroup(consumerGroupDTO.getConsumerGroup()); - updatePartitionStateOffset(partitionState, offset, consumerId); } + return offsetMap; } - private void updateTopicConsumerOffsetInBroker(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List partitionStateList) { - Map offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(cluster, consumerGroupDTO.getConsumerGroup(), topicName); + private Map getTopicConsumerOffsetInBroker(ClusterDO clusterDO, + String topicName, + ConsumerGroupDTO consumerGroupDTO) { + Map offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(clusterDO, consumerGroupDTO.getConsumerGroup(), topicName); if (offsetsFromBroker == null || offsetsFromBroker.isEmpty()) { - return; + return new HashMap<>(0); } - for (PartitionState partitionState : partitionStateList) { - int partitionId = partitionState.getPartitionId(); - updatePartitionStateOffset(partitionState, offsetsFromBroker.get(partitionId), null); + Map offsetMap = new HashMap<>(offsetsFromBroker.size()); + for (Map.Entry entry: offsetsFromBroker.entrySet()) { + try { + offsetMap.put(entry.getKey(), Long.valueOf(entry.getValue())); + } catch (Exception e) { + logger.error("get topic consumer offset failed, clusterId:{} topicName:{} consumerGroup:{}." + , clusterDO.getId(), topicName, consumerGroupDTO.getConsumerGroup()); + } } - } - - private void updatePartitionStateOffset(PartitionState partitionState, String offset, String consumerId) { - partitionState.setConsumeOffset(0); - if (!StringUtils.isEmpty(offset)) { - partitionState.setConsumeOffset(Long.parseLong(offset)); - } - partitionState.setConsumerGroup(consumerId); + return offsetMap; } private Map getConsumeIdMap(Long clusterId, String topicName, String consumerGroup) { diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 282e8562..0bbc6f55 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -343,9 +343,9 @@ public class TopicServiceImpl implements TopicService { topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec()); topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec()); } else { - topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true); - topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec()); - topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec()); +// topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true); + topicOverviewDTO.setBytesInPerSec(0.0); + topicOverviewDTO.setProduceRequestPerSec(0.0); } return topicOverviewDTO; }