mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
[Bugfix]分批从ES查询Topic最近一条指标(#817)
This commit is contained in:
@@ -36,6 +36,11 @@ public class ESConstant {
|
|||||||
|
|
||||||
public static final Integer DEFAULT_RETRY_TIME = 3;
|
public static final Integer DEFAULT_RETRY_TIME = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取Topic-Latest指标时,单次允许的Topic数
|
||||||
|
*/
|
||||||
|
public static final int SEARCH_LATEST_TOPIC_METRIC_CNT_PER_REQUEST = 500;
|
||||||
|
|
||||||
private ESConstant() {
|
private ESConstant() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -130,20 +130,31 @@ public class DatabaseDataFlusher {
|
|||||||
private void flushTopicLatestMetricsCache() {
|
private void flushTopicLatestMetricsCache() {
|
||||||
for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) {
|
for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) {
|
||||||
FutureUtil.quickStartupFutureUtil.submitTask(() -> {
|
FutureUtil.quickStartupFutureUtil.submitTask(() -> {
|
||||||
try {
|
List<String> topicNameList = topicService.listTopicsFromCacheFirst(clusterPhy.getId()).stream().map(Topic::getTopicName).collect(Collectors.toList());
|
||||||
|
|
||||||
List<String> topicNameList = topicService.listTopicsFromCacheFirst(clusterPhy.getId()).stream().map(Topic::getTopicName).collect(Collectors.toList());
|
for (int i = 0; i < 3; ++i) {
|
||||||
|
try {
|
||||||
|
List<TopicMetrics> metricsList = topicMetricService.listTopicLatestMetricsFromES(
|
||||||
|
clusterPhy.getId(),
|
||||||
|
topicNameList,
|
||||||
|
Collections.emptyList()
|
||||||
|
);
|
||||||
|
|
||||||
List<TopicMetrics> metricsList = topicMetricService.listTopicLatestMetricsFromES(clusterPhy.getId(), topicNameList, Collections.emptyList());
|
if (!topicNameList.isEmpty() && metricsList.isEmpty()) {
|
||||||
|
// 没有指标时,重试
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
Map<String, TopicMetrics> metricsMap = metricsList
|
Map<String, TopicMetrics> metricsMap = metricsList
|
||||||
.stream()
|
.stream()
|
||||||
.collect(Collectors.toMap(TopicMetrics::getTopic, Function.identity()));
|
.collect(Collectors.toMap(TopicMetrics::getTopic, Function.identity()));
|
||||||
|
|
||||||
DataBaseDataLocalCache.putTopicMetrics(clusterPhy.getId(), metricsMap);
|
DataBaseDataLocalCache.putTopicMetrics(clusterPhy.getId(), metricsMap);
|
||||||
|
|
||||||
} catch (Exception e) {
|
break;
|
||||||
LOGGER.error("method=flushTopicLatestMetricsCache||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("method=flushTopicLatestMetricsCache||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,12 +37,9 @@ public interface TopicMetricService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取Topic维度最新的一条指标
|
* 获取Topic维度最新的一条指标
|
||||||
* @param clusterPhyId
|
|
||||||
* @param topicNames
|
|
||||||
* @param metricNameList
|
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
List<TopicMetrics> listTopicLatestMetricsFromES(Long clusterPhyId, List<String> topicNames, List<String> metricNameList);
|
List<TopicMetrics> listTopicLatestMetricsFromES(Long clusterPhyId, List<String> topicNames, List<String> metricNameList);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取Topic维度最新的一条指标
|
* 获取Topic维度最新的一条指标
|
||||||
* @param clusterPhyId
|
* @param clusterPhyId
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInf
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.TopicMetricPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.TopicMetricPO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
|
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.constant.ESConstant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||||
@@ -152,9 +153,17 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<TopicMetrics> listTopicLatestMetricsFromES(Long clusterPhyId, List<String> topicNames, List<String> metricNames) {
|
public List<TopicMetrics> listTopicLatestMetricsFromES(Long clusterPhyId, List<String> topicNames, List<String> metricNames) {
|
||||||
List<TopicMetricPO> topicMetricPOs = topicMetricESDAO.listTopicLatestMetric(clusterPhyId, topicNames, metricNames);
|
List<TopicMetricPO> poList = new ArrayList<>();
|
||||||
|
|
||||||
return ConvertUtil.list2List(topicMetricPOs, TopicMetrics.class);
|
for (int i = 0; i < topicNames.size(); i += ESConstant.SEARCH_LATEST_TOPIC_METRIC_CNT_PER_REQUEST) {
|
||||||
|
poList.addAll(topicMetricESDAO.listTopicLatestMetric(
|
||||||
|
clusterPhyId,
|
||||||
|
topicNames.subList(i, Math.min(i + ESConstant.SEARCH_LATEST_TOPIC_METRIC_CNT_PER_REQUEST, topicNames.size())),
|
||||||
|
Collections.emptyList())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ConvertUtil.list2List(poList, TopicMetrics.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -150,8 +150,7 @@ public class ESOpClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public <T> List<T> performRequest(String indexName, String queryDsl, Class<T> clzz) {
|
public <T> List<T> performRequest(String indexName, String queryDsl, Class<T> clzz) {
|
||||||
ESQueryResponse esQueryResponse = doQuery(
|
ESQueryResponse esQueryResponse = this.doQuery(new ESQueryRequest().indices(indexName).source(queryDsl).clazz(clzz));
|
||||||
new ESQueryRequest().indices(indexName).source(queryDsl).clazz(clzz));
|
|
||||||
if (esQueryResponse == null) {
|
if (esQueryResponse == null) {
|
||||||
return new ArrayList<>();
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
@@ -447,8 +446,13 @@ public class ESOpClient {
|
|||||||
|
|
||||||
return response;
|
return response;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error( "method=doQuery||indexName={}||queryDsl={}||errMsg=query error. ",
|
LOGGER.error(
|
||||||
request.indices(), bytesReferenceConvertDsl(request.source()), e);
|
"method=doQuery||indexName={}||queryDsl={}||errMsg=query error. ",
|
||||||
|
request.indices(),
|
||||||
|
bytesReferenceConvertDsl(request.source()),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user