diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java index 075c53c2..9fdd9ec0 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java @@ -751,8 +751,8 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust private Result getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric){ List topics = topicService.listTopicsFromCacheFirst(clusterId); - float metricsSum = 0f; - for(Topic topic : topics){ + float sumMetricValue = 0f; + for(Topic topic : topics) { Result> ret = topicMetricService.collectTopicMetricsFromKafkaWithCacheFirst( clusterId, topic.getTopicName(), @@ -763,14 +763,15 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust continue; } - List topicMetrics = ret.getData(); - for (TopicMetrics metrics : topicMetrics) { - if(metrics.isBBrokerAgg()){ - metricsSum += Double.valueOf(metrics.getMetrics().get(topicMetric)); + for (TopicMetrics metrics : ret.getData()) { + if(metrics.isBBrokerAgg()) { + Float metricValue = metrics.getMetric(topicMetric); + sumMetricValue += (metricValue == null? 0f: metricValue); + break; } } } - return Result.buildSuc(initWithMetrics(clusterId, metric, metricsSum)); + return Result.buildSuc(initWithMetrics(clusterId, metric, sumMetricValue)); } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java index 68d1011e..39ae1ebe 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java @@ -191,6 +191,10 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler { lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE); BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper); + if (brokerPO == null) { + return null; + } + return Broker.buildFrom(brokerPO); } }