From 6e058240b334f66aa5b5af995fc5fd76e3de3f84 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 31 Aug 2022 17:15:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8C=87=E6=A0=87=E9=87=87=E9=9B=86=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E6=97=B6=E9=97=B4=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../enums/version/VersionItemTypeEnum.java | 2 + .../cache/CollectedMetricsLocalCache.java | 53 ++++++++++--------- .../broker/impl/BrokerMetricServiceImpl.java | 7 +-- .../impl/ClusterMetricServiceImpl.java | 7 +-- .../impl/PartitionMetricServiceImpl.java | 11 ++-- .../impl/ReplicaMetricServiceImpl.java | 17 +++--- .../topic/impl/TopicMetricServiceImpl.java | 11 ++-- .../metrics/ClusterMetricVersionItems.java | 1 + 8 files changed, 53 insertions(+), 56 deletions(-) diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java index 270999b4..15f13175 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java @@ -31,9 +31,11 @@ public enum VersionItemTypeEnum { SERVICE_OP_PARTITION(320, "service_partition_operation"), + SERVICE_OP_PARTITION_LEADER(321, "service_partition-leader_operation"), SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"), + /** * 前端操作 */ diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java index c0469fb6..bc5b1c34 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java @@ -10,13 +10,13 @@ import java.util.concurrent.TimeUnit; public class CollectedMetricsLocalCache { private static final Cache brokerMetricsCache = Caffeine.newBuilder() - .expireAfterWrite(60, TimeUnit.SECONDS) - .maximumSize(2000) + .expireAfterWrite(90, TimeUnit.SECONDS) + .maximumSize(10000) .build(); private static final Cache> topicMetricsCache = Caffeine.newBuilder() .expireAfterWrite(90, TimeUnit.SECONDS) - .maximumSize(5000) + .maximumSize(10000) .build(); private static final Cache> partitionMetricsCache = Caffeine.newBuilder() @@ -29,63 +29,64 @@ public class CollectedMetricsLocalCache { .maximumSize(20000) .build(); - public static Float getBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName) { - return brokerMetricsCache.getIfPresent(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName)); + public static Float getBrokerMetrics(String brokerMetricKey) { + return brokerMetricsCache.getIfPresent(brokerMetricKey); } - public static void putBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName, Float value) { + public static void putBrokerMetrics(String brokerMetricKey, Float value) { if (value == null) { return; } - brokerMetricsCache.put(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value); + + brokerMetricsCache.put(brokerMetricKey, value); } - public static List getTopicMetrics(Long clusterPhyId, String topicName, String metricName) { - return topicMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName)); + public static List getTopicMetrics(String topicMetricKey) { + return topicMetricsCache.getIfPresent(topicMetricKey); } - public static void putTopicMetrics(Long clusterPhyId, String topicName, String metricName, List metricsList) { + public static void putTopicMetrics(String topicMetricKey, List metricsList) { if (metricsList == null) { return; } - topicMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList); + + topicMetricsCache.put(topicMetricKey, metricsList); } - public static List getPartitionMetricsList(Long clusterPhyId, String topicName, String metricName) { - return partitionMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName)); + public static List getPartitionMetricsList(String partitionMetricKey) { + return partitionMetricsCache.getIfPresent(partitionMetricKey); } - public static void putPartitionMetricsList(Long clusterPhyId, String topicName, String metricName, List metricsList) { + public static void putPartitionMetricsList(String partitionMetricsKey, List metricsList) { if (metricsList == null) { return; } - partitionMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList); + partitionMetricsCache.put(partitionMetricsKey, metricsList); } - public static Float getReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) { - return replicaMetricsValueCache.getIfPresent(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName)); + public static Float getReplicaMetrics(String replicaMetricsKey) { + return replicaMetricsValueCache.getIfPresent(replicaMetricsKey); } - public static void putReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName, Float value) { + public static void putReplicaMetrics(String replicaMetricsKey, Float value) { if (value == null) { return; } - replicaMetricsValueCache.put(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value); + replicaMetricsValueCache.put(replicaMetricsKey, value); } - - /**************************************************** private method ****************************************************/ - - - private static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) { + public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) { return clusterPhyId + "@" + brokerId + "@" + metricName; } - private static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) { + public static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) { return clusterPhyId + "@" + topicName + "@" + metricName; } - private static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) { + public static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) { return clusterPhyId + "@" + brokerId + "@" + topicName + "@" + partitionId + "@" + metricName; } + + /**************************************************** private method ****************************************************/ + } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java index 6c7dec0e..93c343ff 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java @@ -110,9 +110,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker } @Override - public Result collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric){ + public Result collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric) { + String brokerMetricKey = CollectedMetricsLocalCache.genBrokerMetricKey(clusterId, brokerId, metric); - Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric); + Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(brokerMetricKey); if(null != keyValue) { BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId); brokerMetrics.putMetric(metric, keyValue); @@ -124,7 +125,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker Map metricsMap = ret.getData().getMetrics(); for(Map.Entry metricNameAndValueEntry : metricsMap.entrySet()){ - CollectedMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue()); + CollectedMetricsLocalCache.putBrokerMetrics(brokerMetricKey, metricNameAndValueEntry.getValue()); } return ret; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java index fee8fb0e..075c53c2 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java @@ -126,7 +126,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust private TopicMetricService topicMetricService; @Autowired - private TopicService topicService; + private TopicService topicService; @Autowired private PartitionService partitionService; @@ -728,13 +728,10 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust Long clusterId = param.getClusterId(); //1、获取jmx的属性信息 - VersionJmxInfo jmxInfo = getJMXInfo(clusterId, metric); - if(null == jmxInfo){return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);} - List brokers = brokerService.listAliveBrokersFromDB(clusterId); float metricVale = 0f; - for(Broker broker : brokers){ + for(Broker broker : brokers) { Result ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterId, broker.getBrokerId(), metric); if(null == ret || ret.failed() || null == ret.getData()){continue;} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java index 9e354634..9104b398 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java @@ -75,7 +75,9 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par @Override public Result> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName) { - List metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(clusterPhyId, topicName, metricName); + String partitionMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName); + + List metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(partitionMetricsKey); if(null != metricsList) { return Result.buildSuc(metricsList); } @@ -88,12 +90,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par // 更新cache PartitionMetrics metrics = metricsResult.getData().get(0); metrics.getMetrics().entrySet().forEach( - metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList( - clusterPhyId, - metrics.getTopic(), - metricEntry.getKey(), - metricsResult.getData() - ) + metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(partitionMetricsKey, metricsResult.getData()) ); return metricsResult; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java index 5240e8b9..460e6520 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java @@ -77,9 +77,14 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli } @Override - public Result collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic, - Integer brokerId, Integer partitionId, String metric){ - Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(clusterPhyId, brokerId, topic, partitionId, metric); + public Result collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, + String topic, + Integer brokerId, + Integer partitionId, + String metric) { + String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric); + + Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey); if(null != keyValue){ ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId); replicationMetrics.putMetric(metric, keyValue); @@ -92,11 +97,7 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli // 更新cache ret.getData().getMetrics().entrySet().stream().forEach( metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics( - clusterPhyId, - brokerId, - topic, - partitionId, - metricNameAndValueEntry.getKey(), + replicaMetricsKey, metricNameAndValueEntry.getValue() ) ); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java index d7cca017..478c142b 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java @@ -120,7 +120,9 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe @Override public Result> collectTopicMetricsFromKafkaWithCacheFirst(Long clusterPhyId, String topicName, String metricName) { - List metricsList = CollectedMetricsLocalCache.getTopicMetrics(clusterPhyId, topicName, metricName); + String topicMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName); + + List metricsList = CollectedMetricsLocalCache.getTopicMetrics(topicMetricsKey); if(null != metricsList) { return Result.buildSuc(metricsList); } @@ -133,12 +135,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe // 更新cache TopicMetrics metrics = metricsResult.getData().get(0); metrics.getMetrics().entrySet().forEach( - metricEntry -> CollectedMetricsLocalCache.putTopicMetrics( - clusterPhyId, - metrics.getTopic(), - metricEntry.getKey(), - metricsResult.getData() - ) + metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(topicMetricsKey, metricsResult.getData()) ); return metricsResult; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java index d4c58d69..d3357ab4 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java @@ -35,6 +35,7 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric { public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster"; public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster"; public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster"; + public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize"; public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize"; public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";