diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java index c647b222..72b5b31e 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java @@ -2,7 +2,6 @@ package com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -11,7 +10,6 @@ import lombok.NoArgsConstructor; */ @Data @NoArgsConstructor -@AllArgsConstructor @ApiModel(description = "指标点") public class MetricPointVO implements Comparable { @ApiModelProperty(value = "指标名", example = "HealthScore") @@ -26,6 +24,13 @@ public class MetricPointVO implements Comparable { @ApiModelProperty(value = "指标值聚合方式:avg、max、min、sum") private String aggType; + public MetricPointVO(String name, Long timeStamp, String value, String aggType) { + this.name = name; + this.timeStamp = timeStamp; + this.value = value; + this.aggType = aggType; + } + @Override public int compareTo(MetricPointVO o) { if(null == o){return 0;} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseMetricService.java index 82f841e1..c6cc8275 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseMetricService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseMetricService.java @@ -38,40 +38,41 @@ public abstract class BaseMetricService extends BaseKafkaVersionControlService { protected abstract void initRegisterVCHandler(); - protected List metricMap2VO(Long clusterId, - Map>> map){ - List multiLinesVOS = new ArrayList<>(); - if (map == null || map.isEmpty()) { + protected List metricMap2VO(Long clusterId, Map>> metricsMap ){ + List lineVOList = new ArrayList<>(); + if (metricsMap == null || metricsMap.isEmpty()) { // 如果为空,则直接返回 - return multiLinesVOS; + return lineVOList; } - for(String metric : map.keySet()){ + for(Map.Entry>> entry : metricsMap.entrySet()){ try { MetricMultiLinesVO multiLinesVO = new MetricMultiLinesVO(); - multiLinesVO.setMetricName(metric); + multiLinesVO.setMetricName(entry.getKey()); - List metricLines = new ArrayList<>(); - - Map> metricPointMap = map.get(metric); - if(null == metricPointMap || metricPointMap.isEmpty()){continue;} - for(Map.Entry> entry : metricPointMap.entrySet()){ - MetricLineVO metricLineVO = new MetricLineVO(); - metricLineVO.setName(entry.getKey().toString()); - metricLineVO.setMetricName(metric); - metricLineVO.setMetricPoints(entry.getValue()); - - metricLines.add(metricLineVO); + if(null == entry.getValue() || entry.getValue().isEmpty()){ + continue; } + List metricLines = new ArrayList<>(); + entry.getValue().entrySet().forEach(resNameAndMetricsEntry -> { + MetricLineVO metricLineVO = new MetricLineVO(); + metricLineVO.setName(resNameAndMetricsEntry.getKey().toString()); + metricLineVO.setMetricName(entry.getKey()); + metricLineVO.setMetricPoints(resNameAndMetricsEntry.getValue()); + + metricLines.add(metricLineVO); + }); + multiLinesVO.setMetricLines(metricLines); - multiLinesVOS.add(multiLinesVO); - }catch (Exception e){ - LOGGER.error("method=metricMap2VO||cluster={}||msg=exception!", clusterId, e); + + lineVOList.add(multiLinesVO); + } catch (Exception e){ + LOGGER.error("method=metricMap2VO||clusterId={}||msg=exception!", clusterId, e); } } - return multiLinesVOS; + return lineVOList; } /** diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESTPService.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESTPService.java new file mode 100644 index 00000000..194f2458 --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESTPService.java @@ -0,0 +1,41 @@ +package com.xiaojukeji.know.streaming.km.persistence.es; + +import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; + +/** + * 处理ES请求的线程池 + */ +@Service +@NoArgsConstructor +public class ESTPService { + @Value("${thread-pool.es.search.thread-num:10}") + private Integer esSearchThreadCnt; + + @Value("${thread-pool.es.search.queue-size:5000}") + private Integer esSearchThreadQueueSize; + + private FutureWaitUtil searchESTP; + + @PostConstruct + private void init() { + searchESTP = FutureWaitUtil.init( + "SearchESTP", + esSearchThreadCnt, + esSearchThreadCnt, + esSearchThreadQueueSize + ); + } + + public void submitSearchTask(String taskName, Integer timeoutUnisMs, Runnable runnable) { + searchESTP.runnableTask(taskName, timeoutUnisMs, runnable); + } + + public void waitExecute() { + searchESTP.waitExecute(); + } +} diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java index 609b63d0..48651dd7 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java @@ -12,7 +12,9 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPoint import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO; +import com.xiaojukeji.know.streaming.km.persistence.es.ESTPService; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant; import com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateLoaderUtil; import lombok.NoArgsConstructor; @@ -48,6 +50,9 @@ public class BaseMetricESDAO extends BaseESDAO { @Autowired private TemplateLoaderUtil templateLoaderUtil; + @Autowired + protected ESTPService esTPService; + /** * es 地址 */ @@ -364,21 +369,16 @@ public class BaseMetricESDAO extends BaseESDAO { sb.append(str, 1, str.length() - 1); } - protected Map checkBucketsAndHitsOfResponseAggs(ESQueryResponse response){ - if(null == response || null == response.getAggs()){ + protected Map checkBucketsAndHitsOfResponseAggs(ESQueryResponse response) { + if(null == response + || null == response.getAggs() + || null == response.getAggs().getEsAggrMap() + || null == response.getAggs().getEsAggrMap().get(HIST) + || ValidateUtils.isEmptyList(response.getAggs().getEsAggrMap().get(HIST).getBucketList())) { return null; } - Map esAggrMap = response.getAggs().getEsAggrMap(); - if (null == esAggrMap || null == esAggrMap.get(HIST)) { - return null; - } - - if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){ - return null; - } - - return esAggrMap; + return response.getAggs().getEsAggrMap(); } protected int handleESQueryResponseCount(ESQueryResponse response){ diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java index d3d52f9e..d6b47c73 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java @@ -6,12 +6,10 @@ import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BrokerMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import java.util.*; @@ -29,8 +27,6 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { register( this); } - protected FutureWaitUtil queryFuture = FutureWaitUtil.init("BrokerMetricESDAO", 4,8, 500); - /** * 获取集群 clusterId 中 brokerId 最新的统计指标 */ @@ -140,7 +136,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { aggDsl ); - queryFuture.runnableTask( + esTPService.submitSearchTask( String.format("class=BrokerMetricESDAO||method=listBrokerMetricsByBrokerIds||ClusterPhyId=%d", clusterPhyId), 5000, () -> { @@ -163,7 +159,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { } } - queryFuture.waitExecute(); + esTPService.waitExecute(); return table; } @@ -220,106 +216,86 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { return metricMap; } - private Map> handleListESQueryResponse(ESQueryResponse response, List metrics, String aggType){ + private Map> handleListESQueryResponse(ESQueryResponse response, List metricNameList, String aggType){ Map> metricMap = new HashMap<>(); - if(null == response || null == response.getAggs()){ + Map esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response); + if (esAggrMap == null) { return metricMap; } - Map esAggrMap = response.getAggs().getEsAggrMap(); - if (null == esAggrMap || null == esAggrMap.get(HIST)) { - return metricMap; - } - - if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){ - return metricMap; - } - - for(String metric : metrics){ + for(String metricName : metricNameList){ List metricPoints = new ArrayList<>(); - esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { + esAggrMap.get(HIST).getBucketList().forEach(esBucket -> { try { - if (null != esBucket.getUnusedMap().get(KEY)) { - Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); - if(null == value){return;} - - MetricPointVO metricPoint = new MetricPointVO(); - metricPoint.setAggType(aggType); - metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value.toString()); - metricPoint.setName(metric); - - metricPoints.add(metricPoint); - }else { - LOGGER.info(""); + if (null == esBucket.getUnusedMap().get(KEY)) { + return; } - }catch (Exception e){ - LOGGER.error("metric={}||errMsg=exception!", metric, e); + + Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); + Object value = esBucket.getAggrMap().get(metricName).getUnusedMap().get(VALUE); + if(null == value) { + return; + } + + metricPoints.add(new MetricPointVO(metricName, timestamp, value.toString(), aggType)); + } catch (Exception e){ + LOGGER.error("method=handleListESQueryResponse||metricName={}||errMsg=exception!", metricName, e); } } ); - metricMap.put(metric, optimizeMetricPoints(metricPoints)); + metricMap.put(metricName, optimizeMetricPoints(metricPoints)); } return metricMap; } - private Map> handleTopBrokerESQueryResponse(ESQueryResponse response, List metrics, int topN){ + private Map> handleTopBrokerESQueryResponse(ESQueryResponse response, List metricNameList, int topN) { Map> ret = new HashMap<>(); - if(null == response || null == response.getAggs()){ + Map esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response); + if (esAggrMap == null) { return ret; } - Map esAggrMap = response.getAggs().getEsAggrMap(); - if (null == esAggrMap || null == esAggrMap.get(HIST)) { - return ret; - } - - if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){ - return ret; - } - - Map>> metricBrokerValueMap = new HashMap<>(); + Map>> metricNameBrokerValueMap = new HashMap<>(); //1、先获取每个指标对应的所有brokerIds以及指标的值 - for(String metric : metrics) { - esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { + for(String metricName : metricNameList) { + esAggrMap.get(HIST).getBucketList().forEach(esBucket -> { try { - if (null != esBucket.getUnusedMap().get(KEY)) { - Long brokerId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap() - .get(metric).getUnusedMap().get(VALUE); - if(null == value){return;} - - List> brokerValue = (null == metricBrokerValueMap.get(metric)) ? - new ArrayList<>() : metricBrokerValueMap.get(metric); - - brokerValue.add(new Tuple<>(brokerId, Double.valueOf(value.toString()))); - metricBrokerValueMap.put(metric, brokerValue); + if (null == esBucket.getUnusedMap().get(KEY)) { + return; } - }catch (Exception e){ - LOGGER.error("metrice={}||errMsg=exception!", metric, e); + + Long brokerId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); + Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap().get(metricName).getUnusedMap().get(VALUE); + if(null == value) { + return; + } + + metricNameBrokerValueMap.putIfAbsent(metricName, new ArrayList<>()); + metricNameBrokerValueMap.get(metricName).add(new Tuple<>(brokerId, Double.valueOf(value.toString()))); + } catch (Exception e) { + LOGGER.error("method=handleTopBrokerESQueryResponse||metric={}||errMsg=exception!", metricName, e); } - } ); + }); } //2、对每个指标的broker按照指标值排序,并截取前topN个brokerIds - for(String metric : metricBrokerValueMap.keySet()){ - List> brokerValue = metricBrokerValueMap.get(metric); + for(Map.Entry>> entry : metricNameBrokerValueMap.entrySet()){ + entry.getValue().sort((o1, o2) -> { + if(null == o1 || null == o2){ + return 0; + } - brokerValue.sort((o1, o2) -> { - if(null == o1 || null == o2){return 0;} return o2.getV2().compareTo(o1.getV2()); } ); - List> temp = (brokerValue.size() > topN) ? brokerValue.subList(0, topN) : brokerValue; - List brokerIds = temp.stream().map(t -> t.getV1()).collect( Collectors.toList()); - - ret.put(metric, brokerIds); + // 获取TopN的Broker + List brokerIdList = entry.getValue().subList(0, Math.min(topN, entry.getValue().size())).stream().map(elem -> elem.getV1()).collect(Collectors.toList()); + ret.put(entry.getKey(), brokerIdList); } return ret; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java index 5d53aa16..ea6d3852 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java @@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchRange; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant; import org.springframework.stereotype.Component; @@ -35,8 +34,6 @@ public class ClusterMetricESDAO extends BaseMetricESDAO { register(this); } - protected FutureWaitUtil queryFuture = FutureWaitUtil.init("ClusterMetricESDAO", 4,8, 500); - /** * 获取集群 clusterId 最新的统计指标 */ @@ -127,30 +124,39 @@ public class ClusterMetricESDAO extends BaseMetricESDAO { //4、构造dsl查询条件,开始查询 for(Long clusterPhyId : clusterPhyIds){ try { - queryFuture.runnableTask( + esTPService.submitSearchTask( String.format("class=ClusterMetricESDAO||method=listClusterMetricsByClusterIds||ClusterPhyId=%d", clusterPhyId), 5000, () -> { String dsl = dslLoaderUtil.getFormatDslByFileName( - DslConstant.GET_CLUSTER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl); + DslConstant.GET_CLUSTER_AGG_LIST_METRICS, + clusterPhyId, + startTime, + endTime, + interval, + aggDsl + ); Map> metricMap = esOpClient.performRequestWithRouting( - String.valueOf(clusterPhyId), realIndex, dsl, - s -> handleListESQueryResponse(s, metrics, aggType), 3); + String.valueOf(clusterPhyId), + realIndex, + dsl, + s -> handleListESQueryResponse(s, metrics, aggType), + DEFAULT_RETRY_TIME + ); synchronized (table){ - for(String metric : metricMap.keySet()){ - table.put(metric, clusterPhyId, metricMap.get(metric)); + for(Map.Entry> entry : metricMap.entrySet()){ + table.put(entry.getKey(), clusterPhyId, entry.getValue()); } } }); }catch (Exception e){ - LOGGER.error("method=listClusterMetricsByClusterIds||clusterPhyId={}||errMsg=exception!", - clusterPhyId, e); + LOGGER.error("method=listClusterMetricsByClusterIds||clusterPhyId={}||errMsg=exception!", clusterPhyId, e); } } - queryFuture.waitExecute(); + esTPService.waitExecute(); return table; } @@ -182,35 +188,33 @@ public class ClusterMetricESDAO extends BaseMetricESDAO { return metricMap; } - private Map> handleListESQueryResponse(ESQueryResponse response, List metrics, String aggType){ - Map esAggrMap = checkBucketsAndHitsOfResponseAggs(response); - if(null == esAggrMap){return new HashMap<>();} + private Map> handleListESQueryResponse(ESQueryResponse response, List metricNameList, String aggType){ + Map esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response); + if(null == esAggrMap) { + return new HashMap<>(); + } Map> metricMap = new HashMap<>(); - for(String metric : metrics){ + for(String metricName : metricNameList) { List metricPoints = new ArrayList<>(); esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { try { if (null != esBucket.getUnusedMap().get(KEY)) { Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); - if(null == value){return;} + Object value = esBucket.getAggrMap().get(metricName).getUnusedMap().get(VALUE); + if(null == value) { + return; + } - MetricPointVO metricPoint = new MetricPointVO(); - metricPoint.setAggType(aggType); - metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value.toString()); - metricPoint.setName(metric); - - metricPoints.add(metricPoint); + metricPoints.add(new MetricPointVO(metricName, timestamp, value.toString(), aggType)); } - }catch (Exception e){ - LOGGER.error("method=handleESQueryResponse||metric={}||errMsg=exception!", metric, e); + } catch (Exception e){ + LOGGER.error("method=handleListESQueryResponse||metricName={}||errMsg=exception!", metricName, e); } } ); - metricMap.put(metric, optimizeMetricPoints(metricPoints)); + metricMap.put(metricName, optimizeMetricPoints(metricPoints)); } return metricMap; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java index 26ebdea1..0b3ab0e6 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java @@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionK import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.GroupMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; -import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant; import org.springframework.stereotype.Component; @@ -35,8 +34,6 @@ public class GroupMetricESDAO extends BaseMetricESDAO { register(this); } - protected FutureWaitUtil queryFuture = FutureWaitUtil.init("GroupMetricESDAO", 4,8, 500); - public List listLatestMetricsAggByGroupTopic(Long clusterPhyId, List groupTopicList, List metrics, AggTypeEnum aggType){ Long latestTime = getLatestMetricTime(); Long startTime = latestTime - FIVE_MIN; @@ -49,7 +46,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO { List groupMetricPOS = new CopyOnWriteArrayList<>(); for(GroupTopic groupTopic : groupTopicList){ - queryFuture.runnableTask( + esTPService.submitSearchTask( String.format("class=GroupMetricESDAO||method=listLatestMetricsAggByGroupTopic||ClusterPhyId=%d||groupName=%s||topicName=%s", clusterPhyId, groupTopic.getGroupName(), groupTopic.getTopicName()), 5000, @@ -73,7 +70,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO { }); } - queryFuture.waitExecute(); + esTPService.waitExecute(); return groupMetricPOS; } @@ -126,13 +123,26 @@ public class GroupMetricESDAO extends BaseMetricESDAO { Integer partition = tp.getPartition(); String dsl = dslLoaderUtil.getFormatDslByFileName( - DslConstant.LIST_GROUP_METRICS, clusterId, groupName, topic, partition, startTime, endTime, interval, aggDsl); + DslConstant.LIST_GROUP_METRICS, + clusterId, + groupName, + topic, + partition, + startTime, + endTime, + interval, + aggDsl + ); - Map> metricMap = esOpClient.performRequest(realIndex, dsl, - s -> handleGroupMetrics(s, aggType, metrics), 3); + Map> metricMap = esOpClient.performRequest( + realIndex, + dsl, + s -> handleGroupMetrics(s, aggType, metrics), + DEFAULT_RETRY_TIME + ); - for(String metric : metricMap.keySet()){ - table.put(metric, topic + "&" + partition, metricMap.get(metric)); + for(Map.Entry> entry: metricMap.entrySet()){ + table.put(entry.getKey(), topic + "&" + partition, entry.getValue()); } } @@ -188,23 +198,27 @@ public class GroupMetricESDAO extends BaseMetricESDAO { for(String metric : metrics){ List metricPoints = new ArrayList<>(); - esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { + esAggrMap.get(HIST).getBucketList().forEach(esBucket -> { try { - if (null != esBucket.getUnusedMap().get(KEY)) { - Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); - if(value == null){return;} - - MetricPointVO metricPoint = new MetricPointVO(); - metricPoint.setAggType(aggType); - metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value.toString()); - metricPoint.setName(metric); - - metricPoints.add(metricPoint); + if (null == esBucket.getUnusedMap().get(KEY)) { + return; } + + Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); + Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); + if(value == null) { + return; + } + + MetricPointVO metricPoint = new MetricPointVO(); + metricPoint.setAggType(aggType); + metricPoint.setTimeStamp(timestamp); + metricPoint.setValue(value.toString()); + metricPoint.setName(metric); + + metricPoints.add(metricPoint); }catch (Exception e){ - LOGGER.error("method=handleESQueryResponse||metric={}||errMsg=exception!", metric, e); + LOGGER.error("method=handleGroupMetrics||metric={}||errMsg=exception!", metric, e); } } ); diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java index 49749160..88c0a82f 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java @@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.TopicMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant; @@ -26,7 +25,6 @@ import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateC @Component public class TopicMetricESDAO extends BaseMetricESDAO { - @PostConstruct public void init() { super.indexName = TOPIC_INDEX; @@ -34,8 +32,6 @@ public class TopicMetricESDAO extends BaseMetricESDAO { register(this); } - protected FutureWaitUtil queryFuture = FutureWaitUtil.init("TopicMetricESDAO", 4,8, 500); - public List listTopicMaxMinMetrics(Long clusterPhyId, List topics, String metric, boolean max, Long startTime, Long endTime){ //1、获取需要查下的索引 String realIndex = realIndex(startTime, endTime); @@ -128,8 +124,13 @@ public class TopicMetricESDAO extends BaseMetricESDAO { ? dslLoaderUtil.getFormatDslByFileName( DslConstant.COUNT_TOPIC_METRIC_VALUE, clusterPhyId, topic, startTime, endTime, termDsl) : dslLoaderUtil.getFormatDslByFileName( DslConstant.COUNT_TOPIC_NOT_METRIC_VALUE, clusterPhyId, topic, startTime, endTime, termDsl); - return esOpClient.performRequestWithRouting(topic, realIndex, dsl, - s -> handleESQueryResponseCount(s), 3); + return esOpClient.performRequestWithRouting( + topic, + realIndex, + dsl, + s -> handleESQueryResponseCount(s), + DEFAULT_RETRY_TIME + ); } /** @@ -207,38 +208,52 @@ public class TopicMetricESDAO extends BaseMetricESDAO { * 获取每个 metric 的 topN 个 topic 的指标,如果获取不到 topN 的topics, 则默认返回 defaultTopics 的指标 */ public Table> listTopicMetricsByTopN(Long clusterPhyId, - List defaultTopics, - List metrics, + List defaultTopicNameList, + List metricNameList, String aggType, int topN, Long startTime, Long endTime){ //1、获取topN要查询的topic,每一个指标的topN的topic可能不一样 - Map> metricTopics = this.getTopNTopics(clusterPhyId, metrics, aggType, topN, startTime, endTime); + Map> metricTopicsMap = this.getTopNTopics(clusterPhyId, metricNameList, aggType, topN, startTime, endTime); - Table> table = HashBasedTable.create(); + //2、获取topics列表 + Set topicNameSet = new HashSet<>(defaultTopicNameList); + metricTopicsMap.values().forEach(elem -> topicNameSet.addAll(elem)); - for(String metric : metrics) { - table.putAll(this.listTopicMetricsByTopics( - clusterPhyId, - Arrays.asList(metric), - aggType, - metricTopics.getOrDefault(metric, defaultTopics), - startTime, - endTime) - ); + //3、批量获取信息 + Table> allMetricsTable = this.listTopicMetricsByTopics( + clusterPhyId, + metricNameList, + aggType, + new ArrayList<>(topicNameSet), + startTime, + endTime + ); + + //4、获取Top-Metric + Table> metricsTable = HashBasedTable.create(); + for(String metricName: metricNameList) { + for (String topicName: metricTopicsMap.getOrDefault(metricName, defaultTopicNameList)) { + List voList = allMetricsTable.get(metricName, topicName); + if (voList == null) { + continue; + } + + metricsTable.put(metricName, topicName, voList); + } } - return table; + return metricsTable; } /** * 获取每个 metric 指定个 topic 的指标 */ public Table> listTopicMetricsByTopics(Long clusterPhyId, - List metrics, + List metricNameList, String aggType, - List topics, + List topicNameList, Long startTime, Long endTime){ //1、获取需要查下的索引 @@ -248,37 +263,47 @@ public class TopicMetricESDAO extends BaseMetricESDAO { String interval = MetricsUtils.getInterval(endTime - startTime); //3、构造agg查询条件 - String aggDsl = buildAggsDSL(metrics, aggType); + String aggDsl = buildAggsDSL(metricNameList, aggType); final Table> table = HashBasedTable.create(); //4、构造dsl查询条件 - for(String topic : topics){ + for(String topicName : topicNameList){ try { - queryFuture.runnableTask( - String.format("class=TopicMetricESDAO||method=listTopicMetricsByTopics||ClusterPhyId=%d||topicName=%s", - clusterPhyId, topic), + esTPService.submitSearchTask( + String.format("class=TopicMetricESDAO||method=listTopicMetricsByTopics||ClusterPhyId=%d||topicName=%s", clusterPhyId, topicName), 3000, () -> { String dsl = dslLoaderUtil.getFormatDslByFileName( - DslConstant.GET_TOPIC_AGG_LIST_METRICS, clusterPhyId, topic, startTime, endTime, interval, aggDsl); + DslConstant.GET_TOPIC_AGG_LIST_METRICS, + clusterPhyId, + topicName, + startTime, + endTime, + interval, + aggDsl + ); - Map> metricMap = esOpClient.performRequestWithRouting(topic, realIndex, dsl, - s -> handleListESQueryResponse(s, metrics, aggType), 3); + Map> metricMap = esOpClient.performRequestWithRouting( + topicName, + realIndex, + dsl, + s -> handleListESQueryResponse(s, metricNameList, aggType), + DEFAULT_RETRY_TIME + ); synchronized (table){ - for(String metric : metricMap.keySet()){ - table.put(metric, topic, metricMap.get(metric)); + for(Map.Entry> entry: metricMap.entrySet()){ + table.put(entry.getKey(), topicName, entry.getValue()); } } }); }catch (Exception e){ - LOGGER.error("method=listBrokerMetricsByBrokerIds||clusterPhyId={}||brokerId{}||errMsg=exception!", - clusterPhyId, topic, e); + LOGGER.error("method=listTopicMetricsByTopics||clusterPhyId={}||topicName={}||errMsg=exception!", clusterPhyId, topicName, e); } } - queryFuture.waitExecute(); + esTPService.waitExecute(); return table; } @@ -339,7 +364,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO { private Map> handleListESQueryResponse(ESQueryResponse response, List metrics, String aggType){ Map> metricMap = new HashMap<>(); - Map esAggrMap = checkBucketsAndHitsOfResponseAggs(response); + Map esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response); if(null == esAggrMap){return metricMap;} for(String metric : metrics){ @@ -352,15 +377,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO { Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); if(value == null){return;} - MetricPointVO metricPoint = new MetricPointVO(); - metricPoint.setAggType(aggType); - metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value.toString()); - metricPoint.setName(metric); - - metricPoints.add(metricPoint); - }else { - LOGGER.info(""); + metricPoints.add(new MetricPointVO(metric, timestamp, value.toString(), aggType)); } }catch (Exception e){ LOGGER.error("method=handleListESQueryResponse||metric={}||errMsg=exception!", metric, e); @@ -373,7 +390,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO { return metricMap; } - private Map> handleTopTopicESQueryResponse(ESQueryResponse response, List metrics, int topN){ + private Map> handleTopTopicESQueryResponse(ESQueryResponse response, List metricNameList, int topN){ Map> ret = new HashMap<>(); Map esAggrMap = checkBucketsAndHitsOfResponseAggs(response); @@ -382,57 +399,37 @@ public class TopicMetricESDAO extends BaseMetricESDAO { Map>> metricsTopicValueMap = new HashMap<>(); //1、先获取每个指标对应的所有 topic 以及指标的值 - for(String metric : metrics) { + for(String metricName: metricNameList) { esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { try { if (null != esBucket.getUnusedMap().get(KEY)) { String topic = esBucket.getUnusedMap().get(KEY).toString(); - Double value = Double.valueOf(esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap() - .get(metric).getUnusedMap().get(VALUE).toString()); + Double value = Double.valueOf( + esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap().get(metricName).getUnusedMap().get(VALUE).toString() + ); - List> brokerValue = (null == metricsTopicValueMap.get(metric)) ? - new ArrayList<>() : metricsTopicValueMap.get(metric); - - brokerValue.add(new Tuple<>(topic, value)); - metricsTopicValueMap.put(metric, brokerValue); + metricsTopicValueMap.putIfAbsent(metricName, new ArrayList<>()); + metricsTopicValueMap.get(metricName).add(new Tuple<>(topic, value)); } }catch (Exception e){ - LOGGER.error("method=handleTopBrokerESQueryResponse||metric={}||errMsg=exception!", metric, e); + LOGGER.error("method=handleTopTopicESQueryResponse||metricName={}||errMsg=exception!", metricName, e); } } ); } //2、对每个指标的broker按照指标值排序,并截取前topN个brokerIds - for(String metric : metricsTopicValueMap.keySet()){ - List> brokerValue = metricsTopicValueMap.get(metric); + for(Map.Entry>> entry: metricsTopicValueMap.entrySet()){ + entry.getValue().sort((o1, o2) -> { + if(null == o1 || null == o2) { + return 0; + } - brokerValue.sort((o1, o2) -> { - if(null == o1 || null == o2){return 0;} return o2.getV2().compareTo(o1.getV2()); } ); - List> temp = (brokerValue.size() > topN) ? brokerValue.subList(0, topN) : brokerValue; - List topics = temp.stream().map(t -> t.getV1()).collect(Collectors.toList()); + List topicNameList = entry.getValue().subList(0, Math.min(entry.getValue().size(), topN)).stream().map(t -> t.getV1()).collect(Collectors.toList()); - ret.put(metric, topics); - } - - return ret; - } - - private Map>> topicMetricMap2MetricTopicMap( - Map>> topicMetricMap){ - Map>> ret = new HashMap<>(); - - for(String topic : topicMetricMap.keySet()){ - Map> metricMap = topicMetricMap.get(topic); - - for(String metric : metricMap.keySet()){ - Map> brokerMap = (null == ret.get(metric)) ? new HashMap<>() : ret.get(metric); - - brokerMap.put(topic, metricMap.get(metric)); - ret.put(metric, brokerMap); - } + ret.put(entry.getKey(), topicNameList); } return ret; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectClusterMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectClusterMetricESDAO.java index 07394f51..31256efe 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectClusterMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectClusterMetricESDAO.java @@ -5,13 +5,11 @@ import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import java.util.*; @@ -29,8 +27,6 @@ public class ConnectClusterMetricESDAO extends BaseMetricESDAO { register( this); } - protected FutureWaitUtil queryFuture = FutureWaitUtil.init("ConnectClusterMetricESDAO", 4,8, 500); - /** * 获取集群 clusterPhyId 中每个 metric 的 topN 的 connectCluster 在指定时间[startTime、endTime]区间内所有的指标 * topN 按照[startTime, endTime] 时间段内最后一个值来排序 @@ -97,7 +93,7 @@ public class ConnectClusterMetricESDAO extends BaseMetricESDAO { aggDsl ); - queryFuture.runnableTask( + esTPService.submitSearchTask( String.format("class=ConnectClusterMetricESDAO||method=listMetricsByConnectClusterIdList||ClusterPhyId=%d", clusterPhyId), 5000, () -> { @@ -106,24 +102,24 @@ public class ConnectClusterMetricESDAO extends BaseMetricESDAO { realIndex, dsl, s -> handleListESQueryResponse(s, metricNameList, aggType), - 3 + DEFAULT_RETRY_TIME ); synchronized (table) { - for(String metric : metricMap.keySet()){ - table.put(metric, connectClusterId, metricMap.get(metric)); + for(Map.Entry> entry : metricMap.entrySet()){ + table.put(entry.getKey(), connectClusterId, entry.getValue()); } } }); } catch (Exception e) { LOGGER.error( - "class=ConnectClusterMetricESDAO||method=listMetricsByConnectClusterIdList||clusterPhyId={}||connectClusterId{}||errMsg=exception!", + "method=listMetricsByConnectClusterIdList||clusterPhyId={}||connectClusterId={}||errMsg=exception!", clusterPhyId, connectClusterId, e ); } } - queryFuture.waitExecute(); + esTPService.waitExecute(); return table; } @@ -167,107 +163,89 @@ public class ConnectClusterMetricESDAO extends BaseMetricESDAO { /**************************************************** private method ****************************************************/ - private Map> handleListESQueryResponse(ESQueryResponse response, List metrics, String aggType){ + private Map> handleListESQueryResponse(ESQueryResponse response, List metricNameList, String aggType){ Map> metricMap = new HashMap<>(); - if(null == response || null == response.getAggs()){ + Map esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response); + if(null == esAggrMap) { return metricMap; } - Map esAggrMap = response.getAggs().getEsAggrMap(); - if (null == esAggrMap || null == esAggrMap.get(HIST)) { - return metricMap; - } - - if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){ - return metricMap; - } - - for(String metric : metrics){ + for(String metricName : metricNameList){ List metricPoints = new ArrayList<>(); - esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { + esAggrMap.get(HIST).getBucketList().forEach(esBucket -> { try { if (null != esBucket.getUnusedMap().get(KEY)) { Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); - if(null == value){return;} + Object value = esBucket.getAggrMap().get(metricName).getUnusedMap().get(VALUE); + if(null == value) { + return; + } - MetricPointVO metricPoint = new MetricPointVO(); - metricPoint.setAggType(aggType); - metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value.toString()); - metricPoint.setName(metric); - - metricPoints.add(metricPoint); - }else { - LOGGER.info(""); + metricPoints.add(new MetricPointVO(metricName, timestamp, value.toString(), aggType)); } }catch (Exception e){ - LOGGER.error("metric={}||errMsg=exception!", metric, e); + LOGGER.error("method=handleListESQueryResponse||metricName={}||errMsg=exception!", metricName, e); } } ); - metricMap.put(metric, optimizeMetricPoints(metricPoints)); + metricMap.put(metricName, optimizeMetricPoints(metricPoints)); } return metricMap; } - private Map> handleTopConnectClusterESQueryResponse(ESQueryResponse response, List metrics, int topN){ + private Map> handleTopConnectClusterESQueryResponse(ESQueryResponse response, List metricNameList, int topN){ Map> ret = new HashMap<>(); - if(null == response || null == response.getAggs()){ + Map esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response); + if(null == esAggrMap) { return ret; } - Map esAggrMap = response.getAggs().getEsAggrMap(); - if (null == esAggrMap || null == esAggrMap.get(HIST)) { - return ret; - } - - if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){ - return ret; - } - - Map>> metricBrokerValueMap = new HashMap<>(); + Map>> metricConnectClusterValueMap = new HashMap<>(); //1、先获取每个指标对应的所有brokerIds以及指标的值 - for(String metric : metrics) { - esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { + for(String metricName : metricNameList) { + esAggrMap.get(HIST).getBucketList().forEach(esBucket -> { try { - if (null != esBucket.getUnusedMap().get(KEY)) { - Long connectorClusterId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); - Object value = esBucket.getAggrMap().get(HIST).getBucketList() - .get(0).getAggrMap().get(metric).getUnusedMap().get(VALUE); - - if(null == value){return;} - - List> connectorClusterValue = (null == metricBrokerValueMap.get(metric)) ? - new ArrayList<>() : metricBrokerValueMap.get(metric); - - connectorClusterValue.add(new Tuple<>(connectorClusterId, Double.valueOf(value.toString()))); - metricBrokerValueMap.put(metric, connectorClusterValue); + if (null == esBucket.getUnusedMap().get(KEY)) { + return; } + + Long connectorClusterId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); + Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap().get(metricName).getUnusedMap().get(VALUE); + if(null == value) { + return; + } + + metricConnectClusterValueMap.putIfAbsent(metricName, new ArrayList<>()); + metricConnectClusterValueMap.get(metricName).add(new Tuple<>(connectorClusterId, Double.valueOf(value.toString()))); }catch (Exception e){ - LOGGER.error("metric={}||errMsg=exception!", metric, e); + LOGGER.error("method=handleTopConnectClusterESQueryResponse||metricName={}||errMsg=exception!", metricName, e); } } ); } - //2、对每个指标的broker按照指标值排序,并截取前topN个brokerIds - for(String metric : metricBrokerValueMap.keySet()){ - List> connectorClusterValue = metricBrokerValueMap.get(metric); + //2、对每个指标的connect按照指标值排序,并截取前topN个connectIds + for(Map.Entry>> entry : metricConnectClusterValueMap.entrySet()){ + + entry.getValue().sort((o1, o2) -> { + if(null == o1 || null == o2) { + return 0; + } - connectorClusterValue.sort((o1, o2) -> { - if(null == o1 || null == o2){return 0;} return o2.getV2().compareTo(o1.getV2()); - } ); + }); - List> temp = (connectorClusterValue.size() > topN) ? connectorClusterValue.subList(0, topN) : connectorClusterValue; - List connectorClusterIds = temp.stream().map(t -> t.getV1()).collect(Collectors.toList()); + List connectorClusterIdList = entry.getValue() + .subList(0, Math.min(entry.getValue().size(), topN)) + .stream() + .map(t -> t.getV1()) + .collect(Collectors.toList()); - ret.put(metric, connectorClusterIds); + ret.put(entry.getKey(), connectorClusterIdList); } return ret; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectorMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectorMetricESDAO.java index 060f57be..1c47e862 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectorMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectorMetricESDAO.java @@ -8,7 +8,6 @@ import com.google.common.collect.Table; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect.ConnectorMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; import com.xiaojukeji.know.streaming.km.common.utils.Triple; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; @@ -33,43 +32,55 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { register( this); } - protected FutureWaitUtil queryFuture = FutureWaitUtil.init("ConnectorMetricESDAO", 4,8, 500); - - /** - * 获取每个 metric 的 topN 个 connector 的指标,如果获取不到 topN 的topics, 则默认返回 defaultTopics 的指标 + * 获取每个 metric 的 topN 个 connector 的指标,如果获取不到 topN 的 connectors, 则默认返回 defaultTopics 的指标 */ public Table, List> listMetricsByTopN(Long clusterPhyId, - List> defaultConnectorList, - List metricNameList, - String aggType, - int topN, - Long startTime, - Long endTime){ + List> defaultConnectorList, + List metricNameList, + String aggType, + int topN, + Long startTime, + Long endTime) { //1、获取topN要查询的topic,每一个指标的topN的topic可能不一样 Map>> metricsMap = this.getTopNConnectors(clusterPhyId, metricNameList, aggType, topN, startTime, endTime); - Table, List> table = HashBasedTable.create(); + //2、获取connector列表 + Set> connectorSet = new HashSet<>(defaultConnectorList); + metricsMap.values().forEach(elem -> connectorSet.addAll(elem)); - for(String metricName : metricNameList){ - table.putAll(this.listMetricsByConnectors( - clusterPhyId, - Arrays.asList(metricName), - aggType, - metricsMap.getOrDefault(metricName, defaultConnectorList), - startTime, - endTime) - ); + //3、批量获取信息 + Table, List> allMetricsTable = this.listMetricsByConnectors( + clusterPhyId, + metricNameList, + aggType, + new ArrayList<>(connectorSet), + startTime, + endTime + ); + + //4、获取Top-Metric + Table, List> metricTable = HashBasedTable.create(); + for (String metricName: metricNameList) { + for (Tuple connector: metricsMap.getOrDefault(metricName, defaultConnectorList)) { + List voList = allMetricsTable.get(metricName, connector); + if (voList == null) { + continue; + } + + metricTable.put(metricName, connector, voList); + } } - return table; + // 返回结果 + return metricTable; } public List getConnectorLatestMetric(Long clusterPhyId, List> connectClusterIdAndConnectorNameList, List metricsNames){ List connectorMetricPOS = new CopyOnWriteArrayList<>(); for(Tuple connectClusterIdAndConnectorName : connectClusterIdAndConnectorNameList){ - queryFuture.runnableTask( + esTPService.submitSearchTask( "getConnectorLatestMetric", 30000, () -> { @@ -78,7 +89,7 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { }); } - queryFuture.waitExecute(); + esTPService.waitExecute(); return connectorMetricPOS; } @@ -112,11 +123,11 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { * 获取每个 metric 指定个 topic 的指标 */ public Table, List> listMetricsByConnectors(Long clusterPhyId, - List metrics, - String aggType, - List> connectorList, - Long startTime, - Long endTime) { + List metricNameList, + String aggType, + List> connectorList, + Long startTime, + Long endTime) { //1、获取需要查下的索引 String realIndex = realIndex(startTime, endTime); @@ -124,17 +135,15 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { String interval = MetricsUtils.getInterval(endTime - startTime); //3、构造agg查询条件 - String aggDsl = buildAggsDSL(metrics, aggType); + String aggDsl = buildAggsDSL(metricNameList, aggType); final Table, List> table = HashBasedTable.create(); //4、构造dsl查询条件 for(Tuple connector : connectorList) { try { - queryFuture.runnableTask( - String.format( - "method=listConnectorMetricsByConnectors||ClusterPhyId=%d||connectorName=%s", - clusterPhyId, connector.getV2() ), + esTPService.submitSearchTask( + String.format("class=ConnectorMetricESDAO||method=listMetricsByConnectors||ClusterPhyId=%d||connectorName=%s", clusterPhyId, connector.getV2()), 3000, () -> { String dsl = dslLoaderUtil.getFormatDslByFileName( @@ -152,25 +161,25 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { connector.getV1().toString(), realIndex, dsl, - s -> handleListESQueryResponse(s, metrics, aggType), + s -> handleListESQueryResponse(s, metricNameList, aggType), 3 ); - synchronized (table){ - for(String metric : metricMap.keySet()){ - table.put(metric, connector, metricMap.get(metric)); + synchronized (table) { + for(Map.Entry> entry: metricMap.entrySet()){ + table.put(entry.getKey(), connector, entry.getValue()); } } }); } catch (Exception e) { LOGGER.error( - "method=listConnectorMetricsByConnectors||clusterPhyId={}||connectorName{}||errMsg=exception!", - clusterPhyId, connector.getV2(), e + "method=listMetricsByConnectors||clusterPhyId={}||connectClusterId={}||connectorName{}||errMsg=exception!", + clusterPhyId, connector.getV1(), connector.getV2(), e ); } } - queryFuture.waitExecute(); + esTPService.waitExecute(); return table; } @@ -181,7 +190,7 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { String aggType, int topN, Long startTime, - Long endTime){ + Long endTime) { //1、获取需要查下的索引 String realIndex = realIndex(startTime, endTime); @@ -209,46 +218,19 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { ); } + /**************************************************** private method ****************************************************/ - private Table handleSingleESQueryResponse(ESQueryResponse response, List metrics, String aggType){ - Table table = HashBasedTable.create(); - - Map esAggrMap = checkBucketsAndHitsOfResponseAggs(response); - if(null == esAggrMap){return table;} - - for(String metric : metrics){ - esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { - try { - if (null != esBucket.getUnusedMap().get(KEY)) { - String topic = esBucket.getUnusedMap().get(KEY).toString(); - String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString(); - - MetricPointVO metricPoint = new MetricPointVO(); - metricPoint.setAggType(aggType); - metricPoint.setValue(value); - metricPoint.setName(metric); - - table.put(topic, metric, metricPoint); - }else { - LOGGER.debug("method=handleListESQueryResponse||metric={}||errMsg=get topic is null!", metric); - } - }catch (Exception e){ - LOGGER.error("method=handleListESQueryResponse||metric={}||errMsg=exception!", metric, e); - } - }); - } - - return table; - } private Map> handleListESQueryResponse(ESQueryResponse response, List metrics, String aggType){ Map> metricMap = new HashMap<>(); - Map esAggrMap = checkBucketsAndHitsOfResponseAggs(response); - if(null == esAggrMap){return metricMap;} + Map esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response); + if(null == esAggrMap) { + return metricMap; + } - for(String metric : metrics){ + for(String metric : metrics) { List metricPoints = new ArrayList<>(); esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { @@ -256,17 +238,11 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { if (null != esBucket.getUnusedMap().get(KEY)) { Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE); - if(value == null){return;} + if(value == null){ + return; + } - MetricPointVO metricPoint = new MetricPointVO(); - metricPoint.setAggType(aggType); - metricPoint.setTimeStamp(timestamp); - metricPoint.setValue(value.toString()); - metricPoint.setName(metric); - - metricPoints.add(metricPoint); - }else { - LOGGER.info(""); + metricPoints.add(new MetricPointVO(metric, timestamp, value.toString(), aggType)); } }catch (Exception e){ LOGGER.error("method=handleListESQueryResponse||metric={}||errMsg=exception!", metric, e); @@ -279,10 +255,12 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { return metricMap; } - private Map>> handleTopConnectorESQueryResponse(ESQueryResponse response, List metricNameList, int topN){ + private Map>> handleTopConnectorESQueryResponse(ESQueryResponse response, + List metricNameList, + int topN) { Map>> ret = new HashMap<>(); - Map esAggrMap = checkBucketsAndHitsOfResponseAggs(response); + Map esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response); if(null == esAggrMap) { return ret; } @@ -291,7 +269,7 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { // 1、先获取每个指标对应的所有 connector 以及指标的值 for(String metricName : metricNameList) { - esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { + esAggrMap.get(HIST).getBucketList().forEach(esBucket -> { try { if (null != esBucket.getUnusedMap().get(KEY)) { String connectorNameAndClusterId = esBucket.getUnusedMap().get(KEY).toString(); @@ -338,24 +316,6 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { return ret; } - private Map>> topicMetricMap2MetricTopicMap( - Map>> topicMetricMap){ - Map>> ret = new HashMap<>(); - - for(String topic : topicMetricMap.keySet()){ - Map> metricMap = topicMetricMap.get(topic); - - for(String metric : metricMap.keySet()){ - Map> brokerMap = (null == ret.get(metric)) ? new HashMap<>() : ret.get(metric); - - brokerMap.put(topic, metricMap.get(metric)); - ret.put(metric, brokerMap); - } - } - - return ret; - } - private Tuple splitConnectorNameAndClusterId(String connectorNameAndClusterId){ String[] ss = connectorNameAndClusterId.split("#"); if(null == ss || ss.length != 2){return null;} diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index c9753b7e..291c6741 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -50,7 +50,6 @@ logging: thread-pool: scheduled: thread-num: 2 # @Scheduled任务的线程池大小,默认是一个 - collector: # 采集模块的配置 future-util: # 采集模块线程池配置 num: 3 # 线程池个数 @@ -58,7 +57,6 @@ thread-pool: queue-size: 10000 # 每个线程池队列大小 select-suitable-enable: true # 任务是否自动选择合适的线程池,非主要,可不修改 suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改 - task: # 任务模块的配置 metrics: # metrics采集任务配置 thread-num: 18 # metrics采集任务线程池核心线程数 @@ -69,6 +67,10 @@ thread-pool: common: # 剩余其他任务配置 thread-num: 15 # 剩余其他任务线程池核心线程数 queue-size: 150 # 剩余其他任务线程池队列大小 + es: + search: # es查询线程池 + thread-num: 10 # 线程池大小 + queue-size: 5000 # 队列大小 # 客户端池大小相关配置