From 346aee8fe7f4963d99df72703eb4650013176bb1 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 9 Feb 2023 14:14:41 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E4=BF=AE=E5=A4=8DTopic=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E5=A4=A7=E7=9B=98=E8=8E=B7=E5=8F=96TopN=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E5=AD=98=E5=9C=A8=E9=94=99=E8=AF=AF=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98(#896)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、将ES排序调整为基于本地cache的排序; 2、将database的本地cache从core模块移动到persistence模块; --- .../km/core/flusher/DatabaseDataFlusher.java | 2 +- .../impl/ClusterMetricServiceImpl.java | 2 +- .../impl/HealthCheckResultServiceImpl.java | 2 +- .../partition/impl/PartitionServiceImpl.java | 2 +- .../topic/impl/TopicMetricServiceImpl.java | 2 +- .../cache/DataBaseDataLocalCache.java | 61 +++++++++++++------ .../persistence/es/dao/TopicMetricESDAO.java | 42 ++++++++----- 7 files changed, 76 insertions(+), 37 deletions(-) rename {km-core/src/main/java/com/xiaojukeji/know/streaming/km/core => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence}/cache/DataBaseDataLocalCache.java (59%) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java index 8f71a5ea..b91e27c1 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java @@ -10,7 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; -import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; +import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java index 9d2a7f70..d8c1387f 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java @@ -33,7 +33,7 @@ import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; import com.xiaojukeji.know.streaming.km.common.utils.*; -import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; +import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java index 4f0640c2..0d15561f 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java @@ -13,7 +13,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigGroupEnum; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; -import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; +import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.config.PlatformClusterConfigService; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java index f4688729..1c5ad2e5 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java @@ -22,7 +22,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; import com.xiaojukeji.know.streaming.km.common.utils.Triple; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; +import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap; import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionState; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java index d4a12b41..bc275821 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java @@ -28,7 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache; -import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; +import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/cache/DataBaseDataLocalCache.java similarity index 59% rename from km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/cache/DataBaseDataLocalCache.java index 5412988c..5c6a1fbf 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/cache/DataBaseDataLocalCache.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.core.cache; +package com.xiaojukeji.know.streaming.km.persistence.cache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -7,31 +7,58 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +@Component public class DataBaseDataLocalCache { - private static final Cache> topicLatestMetricsCache = Caffeine.newBuilder() - .expireAfterWrite(360, TimeUnit.SECONDS) - .maximumSize(500) - .build(); + @Value(value = "${cache.metric.topic-size:2000}") + private Long topicLatestMetricsCacheSize; - private static final Cache clusterLatestMetricsCache = Caffeine.newBuilder() - .expireAfterWrite(180, TimeUnit.SECONDS) - .maximumSize(500) - .build(); + @Value(value = "${cache.metric.cluster-size:2000}") + private Long clusterLatestMetricsCacheSize; - private static final Cache>> partitionsCache = Caffeine.newBuilder() - .expireAfterWrite(60, TimeUnit.SECONDS) - .maximumSize(500) - .build(); + @Value(value = "${cache.metadata.partition-size:2000}") + private Long partitionsCacheSize; - private static final Cache>> healthCheckResultCache = Caffeine.newBuilder() - .expireAfterWrite(90, TimeUnit.SECONDS) - .maximumSize(1000) - .build(); + @Value(value = "${cache.metadata.health-check-result-size:10000}") + private Long healthCheckResultCacheSize; + + private static Cache> topicLatestMetricsCache; + + private static Cache clusterLatestMetricsCache; + + private static Cache>> partitionsCache; + + private static Cache>> healthCheckResultCache; + + @PostConstruct + private void init() { + topicLatestMetricsCache = Caffeine.newBuilder() + .expireAfterWrite(360, TimeUnit.SECONDS) + .maximumSize(topicLatestMetricsCacheSize) + .build(); + + clusterLatestMetricsCache = Caffeine.newBuilder() + .expireAfterWrite(180, TimeUnit.SECONDS) + .maximumSize(clusterLatestMetricsCacheSize) + .build(); + + partitionsCache = Caffeine.newBuilder() + .expireAfterWrite(60, TimeUnit.SECONDS) + .maximumSize(partitionsCacheSize) + .build(); + + healthCheckResultCache = Caffeine.newBuilder() + .expireAfterWrite(90, TimeUnit.SECONDS) + .maximumSize(healthCheckResultCacheSize) + .build(); + } public static Map getTopicMetrics(Long clusterPhyId) { return topicLatestMetricsCache.getIfPresent(clusterPhyId); diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java index 88c0a82f..5c35f221 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java @@ -4,14 +4,19 @@ import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResp import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchFuzzy; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchShould; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.TopicMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; +import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; +import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -308,25 +313,32 @@ public class TopicMetricESDAO extends BaseMetricESDAO { return table; } - //public for test - public Map> getTopNTopics(Long clusterPhyId, List metrics, - String aggType, int topN, - Long startTime, Long endTime){ - //1、获取需要查下的索引 - String realIndex = realIndex(startTime, endTime); + public Map> getTopNTopics(Long clusterPhyId, + List metricNameList, + String aggType, + int topN, + Long startTime, + Long endTime) { + Map metricsMap = DataBaseDataLocalCache.getTopicMetrics(clusterPhyId); + if (metricsMap == null) { + return new HashMap<>(); + } - //2、根据查询的时间区间大小来确定指标点的聚合区间大小 - String interval = MetricsUtils.getInterval(endTime - startTime); + List metricsList = new ArrayList<>(metricsMap.values()); - //3、构造agg查询条件 - String aggDsl = buildAggsDSL(metrics, aggType); + Map> resultMap = new HashMap<>(); + for (String metricName: metricNameList) { + metricsList = PaginationMetricsUtil.sortMetrics( + metricsList.stream().map(elem -> (BaseMetrics)elem).collect(Collectors.toList()), + metricName, + "topic", + SortTypeEnum.DESC.getSortType() + ).stream().map(elem -> (TopicMetrics)elem).collect(Collectors.toList()); - //4、查询es - String dsl = dslLoaderUtil.getFormatDslByFileName( - DslConstant.GET_TOPIC_AGG_TOP_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl); + resultMap.put(metricName, metricsList.subList(0, Math.min(topN, metricsList.size())).stream().map(elem -> elem.getTopic()).collect(Collectors.toList())); + } - return esOpClient.performRequest(realIndex, dsl, - s -> handleTopTopicESQueryResponse(s, metrics, topN), 3); + return resultMap; } /**************************************************** private method ****************************************************/