From e06712397e8ba1f984233f4317babd70f2905f03 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 20 Sep 2022 10:27:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=9B=A0DB=E4=B8=ADBroker?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E4=B8=8D=E5=AD=98=E5=9C=A8=E5=AF=BC=E8=87=B4?= =?UTF-8?q?TotalLogSize=E6=8C=87=E6=A0=87=E8=8E=B7=E5=8F=96=E6=97=B6?= =?UTF-8?q?=E6=8A=9B=E7=A9=BA=E6=8C=87=E9=92=88=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cluster/impl/ClusterMetricServiceImpl.java | 15 ++++++++------- .../km/persistence/kafka/KafkaJMXClient.java | 4 ++++ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java index 075c53c2..9fdd9ec0 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java @@ -751,8 +751,8 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust private Result getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric){ List topics = topicService.listTopicsFromCacheFirst(clusterId); - float metricsSum = 0f; - for(Topic topic : topics){ + float sumMetricValue = 0f; + for(Topic topic : topics) { Result> ret = topicMetricService.collectTopicMetricsFromKafkaWithCacheFirst( clusterId, topic.getTopicName(), @@ -763,14 +763,15 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust continue; } - List topicMetrics = ret.getData(); - for (TopicMetrics metrics : topicMetrics) { - if(metrics.isBBrokerAgg()){ - metricsSum += Double.valueOf(metrics.getMetrics().get(topicMetric)); + for (TopicMetrics metrics : ret.getData()) { + if(metrics.isBBrokerAgg()) { + Float metricValue = metrics.getMetric(topicMetric); + sumMetricValue += (metricValue == null? 0f: metricValue); + break; } } } - return Result.buildSuc(initWithMetrics(clusterId, metric, metricsSum)); + return Result.buildSuc(initWithMetrics(clusterId, metric, sumMetricValue)); } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java index 68d1011e..39ae1ebe 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java @@ -191,6 +191,10 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler { lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE); BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper); + if (brokerPO == null) { + return null; + } + return Broker.buildFrom(brokerPO); } }