mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
[Bugfix]修复Topic指标大盘获取TopN指标存在错误的问题(#896)
1、将ES排序调整为基于本地cache的排序; 2、将database的本地cache从core模块移动到persistence模块;
This commit is contained in:
@@ -10,7 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
|
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
|
||||||
import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache;
|
import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
|
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
|
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
|
||||||
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
|
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.*;
|
import com.xiaojukeji.know.streaming.km.common.utils.*;
|
||||||
import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache;
|
import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
|
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigGroupEnum;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache;
|
import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.config.PlatformClusterConfigService;
|
import com.xiaojukeji.know.streaming.km.core.service.config.PlatformClusterConfigService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
|
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache;
|
import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache;
|
||||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap;
|
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap;
|
||||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionState;
|
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionState;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
|
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
|
||||||
import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache;
|
import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
|
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
|
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.xiaojukeji.know.streaming.km.core.cache;
|
package com.xiaojukeji.know.streaming.km.persistence.cache;
|
||||||
|
|
||||||
import com.github.benmanes.caffeine.cache.Cache;
|
import com.github.benmanes.caffeine.cache.Cache;
|
||||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
@@ -7,31 +7,58 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@Component
|
||||||
public class DataBaseDataLocalCache {
|
public class DataBaseDataLocalCache {
|
||||||
private static final Cache<Long, Map<String, TopicMetrics>> topicLatestMetricsCache = Caffeine.newBuilder()
|
@Value(value = "${cache.metric.topic-size:2000}")
|
||||||
.expireAfterWrite(360, TimeUnit.SECONDS)
|
private Long topicLatestMetricsCacheSize;
|
||||||
.maximumSize(500)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
private static final Cache<Long, ClusterMetrics> clusterLatestMetricsCache = Caffeine.newBuilder()
|
@Value(value = "${cache.metric.cluster-size:2000}")
|
||||||
.expireAfterWrite(180, TimeUnit.SECONDS)
|
private Long clusterLatestMetricsCacheSize;
|
||||||
.maximumSize(500)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
private static final Cache<Long, Map<String, List<Partition>>> partitionsCache = Caffeine.newBuilder()
|
@Value(value = "${cache.metadata.partition-size:2000}")
|
||||||
.expireAfterWrite(60, TimeUnit.SECONDS)
|
private Long partitionsCacheSize;
|
||||||
.maximumSize(500)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
private static final Cache<Long, Map<String, List<HealthCheckResultPO>>> healthCheckResultCache = Caffeine.newBuilder()
|
@Value(value = "${cache.metadata.health-check-result-size:10000}")
|
||||||
.expireAfterWrite(90, TimeUnit.SECONDS)
|
private Long healthCheckResultCacheSize;
|
||||||
.maximumSize(1000)
|
|
||||||
.build();
|
private static Cache<Long, Map<String, TopicMetrics>> topicLatestMetricsCache;
|
||||||
|
|
||||||
|
private static Cache<Long, ClusterMetrics> clusterLatestMetricsCache;
|
||||||
|
|
||||||
|
private static Cache<Long, Map<String, List<Partition>>> partitionsCache;
|
||||||
|
|
||||||
|
private static Cache<Long, Map<String, List<HealthCheckResultPO>>> healthCheckResultCache;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
private void init() {
|
||||||
|
topicLatestMetricsCache = Caffeine.newBuilder()
|
||||||
|
.expireAfterWrite(360, TimeUnit.SECONDS)
|
||||||
|
.maximumSize(topicLatestMetricsCacheSize)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
clusterLatestMetricsCache = Caffeine.newBuilder()
|
||||||
|
.expireAfterWrite(180, TimeUnit.SECONDS)
|
||||||
|
.maximumSize(clusterLatestMetricsCacheSize)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
partitionsCache = Caffeine.newBuilder()
|
||||||
|
.expireAfterWrite(60, TimeUnit.SECONDS)
|
||||||
|
.maximumSize(partitionsCacheSize)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
healthCheckResultCache = Caffeine.newBuilder()
|
||||||
|
.expireAfterWrite(90, TimeUnit.SECONDS)
|
||||||
|
.maximumSize(healthCheckResultCacheSize)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
public static Map<String, TopicMetrics> getTopicMetrics(Long clusterPhyId) {
|
public static Map<String, TopicMetrics> getTopicMetrics(Long clusterPhyId) {
|
||||||
return topicLatestMetricsCache.getIfPresent(clusterPhyId);
|
return topicLatestMetricsCache.getIfPresent(clusterPhyId);
|
||||||
@@ -4,14 +4,19 @@ import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResp
|
|||||||
import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;
|
import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;
|
||||||
import com.google.common.collect.HashBasedTable;
|
import com.google.common.collect.HashBasedTable;
|
||||||
import com.google.common.collect.Table;
|
import com.google.common.collect.Table;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchFuzzy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchFuzzy;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchShould;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchShould;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.TopicMetricPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.TopicMetricPO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
|
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||||
|
import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache;
|
||||||
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
|
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
@@ -308,25 +313,32 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
|
|||||||
return table;
|
return table;
|
||||||
}
|
}
|
||||||
|
|
||||||
//public for test
|
public Map<String, List<String>> getTopNTopics(Long clusterPhyId,
|
||||||
public Map<String, List<String>> getTopNTopics(Long clusterPhyId, List<String> metrics,
|
List<String> metricNameList,
|
||||||
String aggType, int topN,
|
String aggType,
|
||||||
Long startTime, Long endTime){
|
int topN,
|
||||||
//1、获取需要查下的索引
|
Long startTime,
|
||||||
String realIndex = realIndex(startTime, endTime);
|
Long endTime) {
|
||||||
|
Map<String, TopicMetrics> metricsMap = DataBaseDataLocalCache.getTopicMetrics(clusterPhyId);
|
||||||
|
if (metricsMap == null) {
|
||||||
|
return new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
//2、根据查询的时间区间大小来确定指标点的聚合区间大小
|
List<TopicMetrics> metricsList = new ArrayList<>(metricsMap.values());
|
||||||
String interval = MetricsUtils.getInterval(endTime - startTime);
|
|
||||||
|
|
||||||
//3、构造agg查询条件
|
Map<String, List<String>> resultMap = new HashMap<>();
|
||||||
String aggDsl = buildAggsDSL(metrics, aggType);
|
for (String metricName: metricNameList) {
|
||||||
|
metricsList = PaginationMetricsUtil.sortMetrics(
|
||||||
|
metricsList.stream().map(elem -> (BaseMetrics)elem).collect(Collectors.toList()),
|
||||||
|
metricName,
|
||||||
|
"topic",
|
||||||
|
SortTypeEnum.DESC.getSortType()
|
||||||
|
).stream().map(elem -> (TopicMetrics)elem).collect(Collectors.toList());
|
||||||
|
|
||||||
//4、查询es
|
resultMap.put(metricName, metricsList.subList(0, Math.min(topN, metricsList.size())).stream().map(elem -> elem.getTopic()).collect(Collectors.toList()));
|
||||||
String dsl = dslLoaderUtil.getFormatDslByFileName(
|
}
|
||||||
DslConstant.GET_TOPIC_AGG_TOP_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl);
|
|
||||||
|
|
||||||
return esOpClient.performRequest(realIndex, dsl,
|
return resultMap;
|
||||||
s -> handleTopTopicESQueryResponse(s, metrics, topN), 3);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**************************************************** private method ****************************************************/
|
/**************************************************** private method ****************************************************/
|
||||||
|
|||||||
Reference in New Issue
Block a user