From f3c4133cd258c28a1470e87f99ea5c00181ffd07 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 7 Dec 2022 16:09:54 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E5=88=86=E6=89=B9=E4=BB=8EES=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2Topic=E6=9C=80=E8=BF=91=E4=B8=80=E6=9D=A1=E6=8C=87?= =?UTF-8?q?=E6=A0=87(#817)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/common/constant/ESConstant.java | 5 ++++ .../km/core/flusher/DatabaseDataFlusher.java | 29 +++++++++++++------ .../service/topic/TopicMetricService.java | 5 +--- .../topic/impl/TopicMetricServiceImpl.java | 13 +++++++-- .../km/persistence/es/ESOpClient.java | 12 +++++--- 5 files changed, 45 insertions(+), 19 deletions(-) diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java index 1b8a7740..85bf2084 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java @@ -36,6 +36,11 @@ public class ESConstant { public static final Integer DEFAULT_RETRY_TIME = 3; + /** + * 获取Topic-Latest指标时,单次允许的Topic数 + */ + public static final int SEARCH_LATEST_TOPIC_METRIC_CNT_PER_REQUEST = 500; + private ESConstant() { } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java index 70d138be..8f71a5ea 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java @@ -130,20 +130,31 @@ public class DatabaseDataFlusher { private void flushTopicLatestMetricsCache() { for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) { FutureUtil.quickStartupFutureUtil.submitTask(() -> { - try { + List topicNameList = topicService.listTopicsFromCacheFirst(clusterPhy.getId()).stream().map(Topic::getTopicName).collect(Collectors.toList()); - List topicNameList = topicService.listTopicsFromCacheFirst(clusterPhy.getId()).stream().map(Topic::getTopicName).collect(Collectors.toList()); + for (int i = 0; i < 3; ++i) { + try { + List metricsList = topicMetricService.listTopicLatestMetricsFromES( + clusterPhy.getId(), + topicNameList, + Collections.emptyList() + ); - List metricsList = topicMetricService.listTopicLatestMetricsFromES(clusterPhy.getId(), topicNameList, Collections.emptyList()); + if (!topicNameList.isEmpty() && metricsList.isEmpty()) { + // 没有指标时,重试 + continue; + } - Map metricsMap = metricsList - .stream() - .collect(Collectors.toMap(TopicMetrics::getTopic, Function.identity())); + Map metricsMap = metricsList + .stream() + .collect(Collectors.toMap(TopicMetrics::getTopic, Function.identity())); - DataBaseDataLocalCache.putTopicMetrics(clusterPhy.getId(), metricsMap); + DataBaseDataLocalCache.putTopicMetrics(clusterPhy.getId(), metricsMap); - } catch (Exception e) { - LOGGER.error("method=flushTopicLatestMetricsCache||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + break; + } catch (Exception e) { + LOGGER.error("method=flushTopicLatestMetricsCache||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + } } }); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicMetricService.java index 014b460a..f9318bcd 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicMetricService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicMetricService.java @@ -37,12 +37,9 @@ public interface TopicMetricService { /** * 获取Topic维度最新的一条指标 - * @param clusterPhyId - * @param topicNames - * @param metricNameList - * @return */ List listTopicLatestMetricsFromES(Long clusterPhyId, List topicNames, List metricNameList); + /** * 获取Topic维度最新的一条指标 * @param clusterPhyId diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java index fe680901..731bc548 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java @@ -18,6 +18,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInf import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.TopicMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; +import com.xiaojukeji.know.streaming.km.common.constant.ESConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; @@ -152,9 +153,17 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe @Override public List listTopicLatestMetricsFromES(Long clusterPhyId, List topicNames, List metricNames) { - List topicMetricPOs = topicMetricESDAO.listTopicLatestMetric(clusterPhyId, topicNames, metricNames); + List poList = new ArrayList<>(); - return ConvertUtil.list2List(topicMetricPOs, TopicMetrics.class); + for (int i = 0; i < topicNames.size(); i += ESConstant.SEARCH_LATEST_TOPIC_METRIC_CNT_PER_REQUEST) { + poList.addAll(topicMetricESDAO.listTopicLatestMetric( + clusterPhyId, + topicNames.subList(i, Math.min(i + ESConstant.SEARCH_LATEST_TOPIC_METRIC_CNT_PER_REQUEST, topicNames.size())), + Collections.emptyList()) + ); + } + + return ConvertUtil.list2List(poList, TopicMetrics.class); } @Override diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index 822c44e1..5764dfd6 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -150,8 +150,7 @@ public class ESOpClient { } public List performRequest(String indexName, String queryDsl, Class clzz) { - ESQueryResponse esQueryResponse = doQuery( - new ESQueryRequest().indices(indexName).source(queryDsl).clazz(clzz)); + ESQueryResponse esQueryResponse = this.doQuery(new ESQueryRequest().indices(indexName).source(queryDsl).clazz(clzz)); if (esQueryResponse == null) { return new ArrayList<>(); } @@ -447,8 +446,13 @@ public class ESOpClient { return response; } catch (Exception e) { - LOGGER.error( "method=doQuery||indexName={}||queryDsl={}||errMsg=query error. ", - request.indices(), bytesReferenceConvertDsl(request.source()), e); + LOGGER.error( + "method=doQuery||indexName={}||queryDsl={}||errMsg=query error. ", + request.indices(), + bytesReferenceConvertDsl(request.source()), + e + ); + return null; } }