From 4510c62ebd1ce9d2563782b109a02d5f10ddd4d7 Mon Sep 17 00:00:00 2001 From: _haoqi <1148648445@qq.com> Date: Thu, 20 Oct 2022 14:39:44 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#677]=20=E9=87=8D=E5=90=AF=E4=BC=9A?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E9=83=A8=E5=88=86=E4=BF=A1=E6=81=AF=E9=87=87?= =?UTF-8?q?=E9=9B=86=E6=8A=9B=E5=87=BA=E7=A9=BA=E6=8C=87=E9=92=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../persistence/es/dao/BrokerMetricESDAO.java | 17 +++--- .../es/dao/ClusterMetricESDAO.java | 10 ++-- .../persistence/es/dao/GroupMetricESDAO.java | 10 ++-- .../persistence/es/dao/TopicMetricESDAO.java | 5 +- .../es/ClusterMetricESDAOTest.java | 53 +++++++++++++++++-- .../es/ReplicationMetricESDAOTest.java | 23 +++++++- .../persistence/es/TopicMetricESDAOTest.java | 33 +++++++++--- 7 files changed, 123 insertions(+), 28 deletions(-) diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java index 83145381..7ee76a3e 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java @@ -207,11 +207,12 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { } for(String metric : metrics){ - String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString(); + Object value = esAggrMap.get(metric).getUnusedMap().get(VALUE); + if(null == value){continue;} MetricPointVO metricPoint = new MetricPointVO(); metricPoint.setAggType(aggType); - metricPoint.setValue(value); + metricPoint.setValue(value.toString()); metricPoint.setName(metric); metricMap.put(metric, metricPoint); @@ -243,12 +244,13 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { try { if (null != esBucket.getUnusedMap().get(KEY)) { Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString(); + Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); + if(null == value){return;} MetricPointVO metricPoint = new MetricPointVO(); metricPoint.setAggType(aggType); metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value); + metricPoint.setValue(value.toString()); metricPoint.setName(metric); metricPoints.add(metricPoint); @@ -290,13 +292,14 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { try { if (null != esBucket.getUnusedMap().get(KEY)) { Long brokerId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - Double value = Double.valueOf(esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap() - .get(metric).getUnusedMap().get(VALUE).toString()); + Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap() + .get(metric).getUnusedMap().get(VALUE); + if(null == value){return;} List> brokerValue = (null == metricBrokerValueMap.get(metric)) ? new ArrayList<>() : metricBrokerValueMap.get(metric); - brokerValue.add(new Tuple<>(brokerId, value)); + brokerValue.add(new Tuple<>(brokerId, Double.valueOf(value.toString()))); metricBrokerValueMap.put(metric, brokerValue); } }catch (Exception e){ diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java index 82a86253..d53f83bf 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java @@ -169,11 +169,12 @@ public class ClusterMetricESDAO extends BaseMetricESDAO { } for(String metric : metrics){ - String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString(); + Object value = esAggrMap.get(metric).getUnusedMap().get(VALUE); + if(null == value){continue;} MetricPointVO metricPoint = new MetricPointVO(); metricPoint.setAggType(aggType); - metricPoint.setValue(value); + metricPoint.setValue(value.toString()); metricPoint.setName(metric); metricMap.put(metric, metricPoint); @@ -194,12 +195,13 @@ public class ClusterMetricESDAO extends BaseMetricESDAO { try { if (null != esBucket.getUnusedMap().get(KEY)) { Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString(); + Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); + if(null == value){return;} MetricPointVO metricPoint = new MetricPointVO(); metricPoint.setAggType(aggType); metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value); + metricPoint.setValue(value.toString()); metricPoint.setName(metric); metricPoints.add(metricPoint); diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java index cf65e6ef..782adc2f 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java @@ -173,8 +173,9 @@ public class GroupMetricESDAO extends BaseMetricESDAO { } for(String metric : metrics){ - String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString(); - groupMetricPO.getMetrics().put(metric, Float.valueOf(value)); + Object value = esAggrMap.get(metric).getUnusedMap().get(VALUE); + if(value == null){continue;} + groupMetricPO.getMetrics().put(metric, Float.parseFloat(value.toString())); } return groupMetricPO; @@ -192,12 +193,13 @@ public class GroupMetricESDAO extends BaseMetricESDAO { try { if (null != esBucket.getUnusedMap().get(KEY)) { Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString(); + Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); + if(value == null){return;} MetricPointVO metricPoint = new MetricPointVO(); metricPoint.setAggType(aggType); metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value); + metricPoint.setValue(value.toString()); metricPoint.setName(metric); metricPoints.add(metricPoint); diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java index e9089c17..e70f2656 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java @@ -337,12 +337,13 @@ public class TopicMetricESDAO extends BaseMetricESDAO { try { if (null != esBucket.getUnusedMap().get(KEY)) { Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString(); + Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); + if(value == null){return;} MetricPointVO metricPoint = new MetricPointVO(); metricPoint.setAggType(aggType); metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value); + metricPoint.setValue(value.toString()); metricPoint.setName(metric); metricPoints.add(metricPoint); diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java index c69f7129..d0f96bff 100644 --- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java +++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java @@ -5,13 +5,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchPage; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchRange; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.persistence.es.dao.ClusterMetricESDAO; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; public class ClusterMetricESDAOTest extends KnowStreamApplicationTest { @@ -28,6 +28,53 @@ public class ClusterMetricESDAOTest extends KnowStreamApplicationTest { clusterMetricESDAO.listClusterMetricsByClusterIds(metrics, "avg", clusterIds, startTime, endTime); } + /** + * 测试 + * 获取集群 clusterPhyId 中每个 metric 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值 + */ + @Test + public void getClusterMetricsPointTest(){ + Long clusterId = 1L; + List metrics = Arrays.asList( + "Connections", "BytesIn_min_15", "PartitionURP", + "HealthScore_Topics", "EventQueueSize", "ActiveControllerCount", + "GroupDeads", "BytesIn_min_5", "HealthCheckTotal_Topics", + "Partitions", "BytesOut", "Groups", + "BytesOut_min_15", "TotalRequestQueueSize", "HealthCheckPassed_Groups", + "TotalProduceRequests", "HealthCheckPassed", "TotalLogSize", + "GroupEmptys", "PartitionNoLeader", "HealthScore_Brokers", + "Messages", "Topics", "PartitionMinISR_E", + "HealthCheckTotal", "Brokers", "Replicas", + "HealthCheckTotal_Groups", "GroupRebalances", "MessageIn", + "HealthScore", "HealthCheckPassed_Topics", "HealthCheckTotal_Brokers", + "PartitionMinISR_S", "BytesIn", "BytesOut_min_5", + "GroupActives", "MessagesIn", "GroupReBalances", + "HealthCheckPassed_Brokers", "HealthScore_Groups", "TotalResponseQueueSize", + "Zookeepers", "LeaderMessages", "HealthScore_Cluster", + "HealthCheckPassed_Cluster", "HealthCheckTotal_Cluster"); + Long endTime = System.currentTimeMillis(); + Long startTime = endTime - 4 * 60 * 60 * 1000; + + Map metricPointVOS = clusterMetricESDAO.getClusterMetricsPoint( + clusterId, metrics, "avg", startTime, endTime); + + assert null != metricPointVOS; + } + + /** + * 测试 + * 获取集群 clusterId 最新的统计指标 + */ + @Test + public void getClusterLatestMetricsTest(){ + Long clusterId = 1L; + List metrics = Collections.emptyList(); + + ClusterMetricPO clusterLatestMetrics = clusterMetricESDAO.getClusterLatestMetrics(clusterId, metrics); + + assert null != clusterLatestMetrics; + } + @Test public void pagingClusterWithLatestMetricsTest(){ List clusterIds = new ArrayList<>(); diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ReplicationMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ReplicationMetricESDAOTest.java index 6fe0ab4e..98224a3d 100644 --- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ReplicationMetricESDAOTest.java +++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ReplicationMetricESDAOTest.java @@ -2,11 +2,14 @@ package com.xiaojukeji.know.streaming.km.persistence.es; import com.xiaojukeji.know.streaming.km.KnowStreamApplicationTest; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.persistence.es.dao.ReplicationMetricESDAO; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest { @@ -15,7 +18,7 @@ public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest { @Test public void getReplicationLatestMetricsTest(){ - Long clusterPhyId = 2l; + Long clusterPhyId = 2L; Integer brokerId = 1; String topic = "know-streaming-test-251"; Integer partitionId = 1; @@ -24,4 +27,22 @@ public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest { assert null != replicationMetricPO; } + + /** + * 测试 + * 获取集群 clusterPhyId 中每个 metric 的指定 partitionId 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值 + */ + @Test + public void getReplicationMetricsPointTest(){ + Long clusterPhyId = 2L; + Integer brokerId = 1; + String topic = "know-streaming-test-251"; + Integer partitionId = 1; + Long endTime = System.currentTimeMillis(); + Long startTime = endTime - 4 * 60 * 60 * 1000; + Map metricPointVOMap = replicationMetricESDAO.getReplicationMetricsPoint( + clusterPhyId, topic, brokerId, partitionId, Collections.emptyList(), "avg", startTime, endTime); + + assert null != metricPointVOMap; + } } diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/TopicMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/TopicMetricESDAOTest.java index e3da2f2c..09db0971 100644 --- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/TopicMetricESDAOTest.java +++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/TopicMetricESDAOTest.java @@ -15,7 +15,6 @@ import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; public class TopicMetricESDAOTest extends KnowStreamApplicationTest { @@ -38,14 +37,20 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest { @Test public void getTopicsAggsMetricsValueTest(){ Long clusterId = 2L; - String topic = "know-streaming-test-251"; - String topic1 = "topic_test01"; - List metrics = Arrays.asList("BytesIn", "BytesIn_min_5"); + List topicList = Arrays.asList("know-streaming-test-251", "topic_test01"); + List metrics = Arrays.asList( + "Messages", "BytesIn_min_15", "BytesRejected", + "PartitionURP", "HealthCheckTotal", "ReplicationCount", + "CollectMetricsCostTimeUnitSec", "FailedFetchRequests", "BytesIn_min_5", + "HealthScore", "LogSize", "BytesOut", + "FailedProduceRequests", "BytesOut_min_15", "BytesIn", + "BytesOut_min_5", "MessagesIn", "TotalProduceRequests", + "HealthCheckPassed"); Long endTime = System.currentTimeMillis(); Long startTime = endTime - 4 * 60 * 60 * 1000; Table ret = topicMetricESDAO.getTopicsAggsMetricsValue( - clusterId, Arrays.asList(topic, topic1), metrics, "max", startTime, endTime); + clusterId, topicList, metrics, "max", startTime, endTime); assert null != ret; } @@ -90,7 +95,14 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest { String topic = "know-streaming-test-251"; String topic1 = "know-streaming-123"; String topic2 = "1209test"; - List metrics = Arrays.asList("BytesIn", "BytesIn_min_5"); + List metrics = Arrays.asList( + "Messages", "BytesIn_min_15", "BytesRejected", + "PartitionURP", "HealthCheckTotal", "ReplicationCount", + "CollectMetricsCostTimeUnitSec", "FailedFetchRequests", "BytesIn_min_5", + "HealthScore", "LogSize", "BytesOut", + "FailedProduceRequests", "BytesOut_min_15", "BytesIn", + "BytesOut_min_5", "MessagesIn", "TotalProduceRequests", + "HealthCheckPassed"); List topicMetricPO = topicMetricESDAO.listTopicLatestMetric(clusterId, Arrays.asList(topic,topic1,topic2), metrics); @@ -101,7 +113,14 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest { @Test public void listBrokerMetricsByTopicsTest(){ Long clusterId = 2L; - List metrics = Arrays.asList("BytesIn", "BytesIn_min_5"); + List metrics = Arrays.asList( + "Messages", "BytesIn_min_15", "BytesRejected", + "PartitionURP", "HealthCheckTotal", "ReplicationCount", + "CollectMetricsCostTimeUnitSec", "FailedFetchRequests", "BytesIn_min_5", + "HealthScore", "LogSize", "BytesOut", + "FailedProduceRequests", "BytesOut_min_15", "BytesIn", + "BytesOut_min_5", "MessagesIn", "TotalProduceRequests", + "HealthCheckPassed"); List topics = Arrays.asList("QAtest_1_13", "__consumer_offsets"); Long endTime = System.currentTimeMillis(); Long startTime = endTime - 4 * 60 * 60 * 1000;