diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java index 3eab40b8..07c92bc6 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java @@ -42,6 +42,13 @@ public interface ConsumerService { */ List getConsumerGroupConsumedTopicList(Long clusterId, String consumerGroup, String location); + /** + * 获取消费者offset + * @param clusterDO 集群 + * @param topicName topic + * @param consumerGroup 消费组 + * @return Map + */ Map getConsumerOffset(ClusterDO clusterDO, String topicName, ConsumerGroup consumerGroup); /** @@ -52,7 +59,20 @@ public interface ConsumerService { ConsumerGroup consumerGroup, List partitionOffsetDTOList); + /** + * 获取每个集群消费组的个数 + * @param clusterDOList 物理集群列表 + * @return Map + */ Map getConsumerGroupNumMap(List clusterDOList); + /** + * 验证消费组是否存在 + * @param offsetLocation offset存放位置 + * @param id 集群id + * @param topicName topic + * @param consumerGroup 消费组 + * @return true:存在,false:不存在 + */ boolean checkConsumerGroupExist(OffsetLocationEnum offsetLocation, Long id, String topicName, String consumerGroup); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java index 913316ef..e59aa2bc 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java @@ -159,7 +159,7 @@ public class ConsumerServiceImpl implements ConsumerService { if (topicMetadata == null) { logger.warn("class=ConsumerServiceImpl||method=getConsumeDetail||clusterId={}||topicName={}||msg=topicMetadata is null!", clusterDO.getId(), topicName); - return null; + return Collections.emptyList(); } List consumerGroupDetailDTOList = null; @@ -170,7 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService { } if (consumerGroupDetailDTOList == null) { logger.info("class=ConsumerServiceImpl||method=getConsumeDetail||msg=consumerGroupDetailDTOList is null!"); - return null; + return Collections.emptyList(); } Map topicPartitionLongMap = topicService.getPartitionOffset(clusterDO, topicName, OffsetPosEnum.END); @@ -317,9 +317,6 @@ public class ConsumerServiceImpl implements ConsumerService { String consumerGroup) { Map stringOffsetMap = getOffsetByGroupAndTopicFromBroker(clusterDO, consumerGroup, topicName); - if (ValidateUtils.isNull(stringOffsetMap)) { - return new HashMap<>(0); - } Map offsetMap = new HashMap<>(stringOffsetMap.size()); for (Map.Entry entry: stringOffsetMap.entrySet()) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java index 611dc203..1dc3b011 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java @@ -164,9 +164,11 @@ public class JmxServiceImpl implements JmxService { if (ValidateUtils.isNull(jmxConnectorWrap)|| !jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) { return null; } + + KafkaVersion kafkaVersion = physicalClusterMetadataManager.getKafkaVersion(clusterId, brokerId); + TopicMetrics metrics = new TopicMetrics(clusterId, topicName); for (MbeanV2 mbeanV2: mbeanV2List) { - KafkaVersion kafkaVersion = physicalClusterMetadataManager.getKafkaVersion(clusterId, brokerId); try { getAndSupplyAttributes2BaseMetrics( metrics, diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 70ef139c..94b3f88f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -416,9 +416,6 @@ public class TopicServiceImpl implements TopicService { topicDO, appDO ); - if (ValidateUtils.isNull(overview)) { - continue; - } dtoList.add(overview); }