[Optimize]Overview页面的TopN查询ES流程优化(#823)

1、复用线程池,同时支持线程池的线程数可配置;
2、优化查询TopN指标时,可能会出现重复查询的问题;
3、处理代码扫描(SonarLint)反馈的问题;
This commit is contained in:
zengqiao
2022-12-09 14:34:58 +08:00
committed by EricZeng
parent c4fb18a73c
commit b2f0f69365
11 changed files with 398 additions and 420 deletions

View File

@@ -2,7 +2,6 @@ package com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -11,7 +10,6 @@ import lombok.NoArgsConstructor;
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ApiModel(description = "指标点")
public class MetricPointVO implements Comparable<MetricPointVO> {
@ApiModelProperty(value = "指标名", example = "HealthScore")
@@ -26,6 +24,13 @@ public class MetricPointVO implements Comparable<MetricPointVO> {
@ApiModelProperty(value = "指标值聚合方式avg、max、min、sum")
private String aggType;
public MetricPointVO(String name, Long timeStamp, String value, String aggType) {
this.name = name;
this.timeStamp = timeStamp;
this.value = value;
this.aggType = aggType;
}
@Override
public int compareTo(MetricPointVO o) {
if(null == o){return 0;}

View File

@@ -38,40 +38,41 @@ public abstract class BaseMetricService extends BaseKafkaVersionControlService {
protected abstract void initRegisterVCHandler();
protected <T> List<MetricMultiLinesVO> metricMap2VO(Long clusterId,
Map<String/*metric*/, Map<T, List<MetricPointVO>>> map){
List<MetricMultiLinesVO> multiLinesVOS = new ArrayList<>();
if (map == null || map.isEmpty()) {
protected <T> List<MetricMultiLinesVO> metricMap2VO(Long clusterId, Map<String/*metric*/, Map<T, List<MetricPointVO>>> metricsMap ){
List<MetricMultiLinesVO> lineVOList = new ArrayList<>();
if (metricsMap == null || metricsMap.isEmpty()) {
// 如果为空,则直接返回
return multiLinesVOS;
return lineVOList;
}
for(String metric : map.keySet()){
for(Map.Entry<String/*metric*/, Map<T, List<MetricPointVO>>> entry : metricsMap.entrySet()){
try {
MetricMultiLinesVO multiLinesVO = new MetricMultiLinesVO();
multiLinesVO.setMetricName(metric);
multiLinesVO.setMetricName(entry.getKey());
List<MetricLineVO> metricLines = new ArrayList<>();
Map<T, List<MetricPointVO>> metricPointMap = map.get(metric);
if(null == metricPointMap || metricPointMap.isEmpty()){continue;}
for(Map.Entry<T, List<MetricPointVO>> entry : metricPointMap.entrySet()){
MetricLineVO metricLineVO = new MetricLineVO();
metricLineVO.setName(entry.getKey().toString());
metricLineVO.setMetricName(metric);
metricLineVO.setMetricPoints(entry.getValue());
metricLines.add(metricLineVO);
if(null == entry.getValue() || entry.getValue().isEmpty()){
continue;
}
List<MetricLineVO> metricLines = new ArrayList<>();
entry.getValue().entrySet().forEach(resNameAndMetricsEntry -> {
MetricLineVO metricLineVO = new MetricLineVO();
metricLineVO.setName(resNameAndMetricsEntry.getKey().toString());
metricLineVO.setMetricName(entry.getKey());
metricLineVO.setMetricPoints(resNameAndMetricsEntry.getValue());
metricLines.add(metricLineVO);
});
multiLinesVO.setMetricLines(metricLines);
multiLinesVOS.add(multiLinesVO);
}catch (Exception e){
LOGGER.error("method=metricMap2VO||cluster={}||msg=exception!", clusterId, e);
lineVOList.add(multiLinesVO);
} catch (Exception e){
LOGGER.error("method=metricMap2VO||clusterId={}||msg=exception!", clusterId, e);
}
}
return multiLinesVOS;
return lineVOList;
}
/**

View File

@@ -0,0 +1,41 @@
package com.xiaojukeji.know.streaming.km.persistence.es;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
* 处理ES请求的线程池
*/
@Service
@NoArgsConstructor
public class ESTPService {
@Value("${thread-pool.es.search.thread-num:10}")
private Integer esSearchThreadCnt;
@Value("${thread-pool.es.search.queue-size:5000}")
private Integer esSearchThreadQueueSize;
private FutureWaitUtil<Object> searchESTP;
@PostConstruct
private void init() {
searchESTP = FutureWaitUtil.init(
"SearchESTP",
esSearchThreadCnt,
esSearchThreadCnt,
esSearchThreadQueueSize
);
}
public void submitSearchTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
searchESTP.runnableTask(taskName, timeoutUnisMs, runnable);
}
public void waitExecute() {
searchESTP.waitExecute();
}
}

View File

@@ -12,7 +12,9 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPoint
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.ESTPService;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateLoaderUtil;
import lombok.NoArgsConstructor;
@@ -48,6 +50,9 @@ public class BaseMetricESDAO extends BaseESDAO {
@Autowired
private TemplateLoaderUtil templateLoaderUtil;
@Autowired
protected ESTPService esTPService;
/**
* es 地址
*/
@@ -364,21 +369,16 @@ public class BaseMetricESDAO extends BaseESDAO {
sb.append(str, 1, str.length() - 1);
}
protected Map<String, ESAggr> checkBucketsAndHitsOfResponseAggs(ESQueryResponse response){
if(null == response || null == response.getAggs()){
protected Map<String, ESAggr> checkBucketsAndHitsOfResponseAggs(ESQueryResponse response) {
if(null == response
|| null == response.getAggs()
|| null == response.getAggs().getEsAggrMap()
|| null == response.getAggs().getEsAggrMap().get(HIST)
|| ValidateUtils.isEmptyList(response.getAggs().getEsAggrMap().get(HIST).getBucketList())) {
return null;
}
Map<String, ESAggr> esAggrMap = response.getAggs().getEsAggrMap();
if (null == esAggrMap || null == esAggrMap.get(HIST)) {
return null;
}
if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){
return null;
}
return esAggrMap;
return response.getAggs().getEsAggrMap();
}
protected int handleESQueryResponseCount(ESQueryResponse response){

View File

@@ -6,12 +6,10 @@ import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BrokerMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.*;
@@ -29,8 +27,6 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
register( this);
}
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("BrokerMetricESDAO", 4,8, 500);
/**
* 获取集群 clusterId 中 brokerId 最新的统计指标
*/
@@ -140,7 +136,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
aggDsl
);
queryFuture.runnableTask(
esTPService.submitSearchTask(
String.format("class=BrokerMetricESDAO||method=listBrokerMetricsByBrokerIds||ClusterPhyId=%d", clusterPhyId),
5000,
() -> {
@@ -163,7 +159,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
}
}
queryFuture.waitExecute();
esTPService.waitExecute();
return table;
}
@@ -220,106 +216,86 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
return metricMap;
}
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metricNameList, String aggType){
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
if(null == response || null == response.getAggs()){
Map<String, ESAggr> esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response);
if (esAggrMap == null) {
return metricMap;
}
Map<String, ESAggr> esAggrMap = response.getAggs().getEsAggrMap();
if (null == esAggrMap || null == esAggrMap.get(HIST)) {
return metricMap;
}
if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){
return metricMap;
}
for(String metric : metrics){
for(String metricName : metricNameList){
List<MetricPointVO> metricPoints = new ArrayList<>();
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
esAggrMap.get(HIST).getBucketList().forEach(esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(null == value){return;}
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value.toString());
metricPoint.setName(metric);
metricPoints.add(metricPoint);
}else {
LOGGER.info("");
if (null == esBucket.getUnusedMap().get(KEY)) {
return;
}
}catch (Exception e){
LOGGER.error("metric={}||errMsg=exception!", metric, e);
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(metricName).getUnusedMap().get(VALUE);
if(null == value) {
return;
}
metricPoints.add(new MetricPointVO(metricName, timestamp, value.toString(), aggType));
} catch (Exception e){
LOGGER.error("method=handleListESQueryResponse||metricName={}||errMsg=exception!", metricName, e);
}
} );
metricMap.put(metric, optimizeMetricPoints(metricPoints));
metricMap.put(metricName, optimizeMetricPoints(metricPoints));
}
return metricMap;
}
private Map<String, List<Long>> handleTopBrokerESQueryResponse(ESQueryResponse response, List<String> metrics, int topN){
private Map<String, List<Long>> handleTopBrokerESQueryResponse(ESQueryResponse response, List<String> metricNameList, int topN) {
Map<String, List<Long>> ret = new HashMap<>();
if(null == response || null == response.getAggs()){
Map<String, ESAggr> esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response);
if (esAggrMap == null) {
return ret;
}
Map<String, ESAggr> esAggrMap = response.getAggs().getEsAggrMap();
if (null == esAggrMap || null == esAggrMap.get(HIST)) {
return ret;
}
if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){
return ret;
}
Map<String, List<Tuple<Long, Double>>> metricBrokerValueMap = new HashMap<>();
Map<String, List<Tuple<Long, Double>>> metricNameBrokerValueMap = new HashMap<>();
//1、先获取每个指标对应的所有brokerIds以及指标的值
for(String metric : metrics) {
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
for(String metricName : metricNameList) {
esAggrMap.get(HIST).getBucketList().forEach(esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long brokerId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap()
.get(metric).getUnusedMap().get(VALUE);
if(null == value){return;}
List<Tuple<Long, Double>> brokerValue = (null == metricBrokerValueMap.get(metric)) ?
new ArrayList<>() : metricBrokerValueMap.get(metric);
brokerValue.add(new Tuple<>(brokerId, Double.valueOf(value.toString())));
metricBrokerValueMap.put(metric, brokerValue);
if (null == esBucket.getUnusedMap().get(KEY)) {
return;
}
}catch (Exception e){
LOGGER.error("metrice={}||errMsg=exception!", metric, e);
Long brokerId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap().get(metricName).getUnusedMap().get(VALUE);
if(null == value) {
return;
}
metricNameBrokerValueMap.putIfAbsent(metricName, new ArrayList<>());
metricNameBrokerValueMap.get(metricName).add(new Tuple<>(brokerId, Double.valueOf(value.toString())));
} catch (Exception e) {
LOGGER.error("method=handleTopBrokerESQueryResponse||metric={}||errMsg=exception!", metricName, e);
}
} );
});
}
//2、对每个指标的broker按照指标值排序并截取前topN个brokerIds
for(String metric : metricBrokerValueMap.keySet()){
List<Tuple<Long, Double>> brokerValue = metricBrokerValueMap.get(metric);
for(Map.Entry<String, List<Tuple<Long, Double>>> entry : metricNameBrokerValueMap.entrySet()){
entry.getValue().sort((o1, o2) -> {
if(null == o1 || null == o2){
return 0;
}
brokerValue.sort((o1, o2) -> {
if(null == o1 || null == o2){return 0;}
return o2.getV2().compareTo(o1.getV2());
} );
List<Tuple<Long, Double>> temp = (brokerValue.size() > topN) ? brokerValue.subList(0, topN) : brokerValue;
List<Long> brokerIds = temp.stream().map(t -> t.getV1()).collect( Collectors.toList());
ret.put(metric, brokerIds);
// 获取TopN的Broker
List<Long> brokerIdList = entry.getValue().subList(0, Math.min(topN, entry.getValue().size())).stream().map(elem -> elem.getV1()).collect(Collectors.toList());
ret.put(entry.getKey(), brokerIdList);
}
return ret;

View File

@@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchRange;
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component;
@@ -35,8 +34,6 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
register(this);
}
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("ClusterMetricESDAO", 4,8, 500);
/**
* 获取集群 clusterId 最新的统计指标
*/
@@ -127,30 +124,39 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
//4、构造dsl查询条件开始查询
for(Long clusterPhyId : clusterPhyIds){
try {
queryFuture.runnableTask(
esTPService.submitSearchTask(
String.format("class=ClusterMetricESDAO||method=listClusterMetricsByClusterIds||ClusterPhyId=%d", clusterPhyId),
5000,
() -> {
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.GET_CLUSTER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl);
DslConstant.GET_CLUSTER_AGG_LIST_METRICS,
clusterPhyId,
startTime,
endTime,
interval,
aggDsl
);
Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(
String.valueOf(clusterPhyId), realIndex, dsl,
s -> handleListESQueryResponse(s, metrics, aggType), 3);
String.valueOf(clusterPhyId),
realIndex,
dsl,
s -> handleListESQueryResponse(s, metrics, aggType),
DEFAULT_RETRY_TIME
);
synchronized (table){
for(String metric : metricMap.keySet()){
table.put(metric, clusterPhyId, metricMap.get(metric));
for(Map.Entry<String/*metric*/, List<MetricPointVO>> entry : metricMap.entrySet()){
table.put(entry.getKey(), clusterPhyId, entry.getValue());
}
}
});
}catch (Exception e){
LOGGER.error("method=listClusterMetricsByClusterIds||clusterPhyId={}||errMsg=exception!",
clusterPhyId, e);
LOGGER.error("method=listClusterMetricsByClusterIds||clusterPhyId={}||errMsg=exception!", clusterPhyId, e);
}
}
queryFuture.waitExecute();
esTPService.waitExecute();
return table;
}
@@ -182,35 +188,33 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
return metricMap;
}
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap){return new HashMap<>();}
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metricNameList, String aggType){
Map<String, ESAggr> esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap) {
return new HashMap<>();
}
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
for(String metric : metrics){
for(String metricName : metricNameList) {
List<MetricPointVO> metricPoints = new ArrayList<>();
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(null == value){return;}
Object value = esBucket.getAggrMap().get(metricName).getUnusedMap().get(VALUE);
if(null == value) {
return;
}
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value.toString());
metricPoint.setName(metric);
metricPoints.add(metricPoint);
metricPoints.add(new MetricPointVO(metricName, timestamp, value.toString(), aggType));
}
}catch (Exception e){
LOGGER.error("method=handleESQueryResponse||metric={}||errMsg=exception!", metric, e);
} catch (Exception e){
LOGGER.error("method=handleListESQueryResponse||metricName={}||errMsg=exception!", metricName, e);
}
} );
metricMap.put(metric, optimizeMetricPoints(metricPoints));
metricMap.put(metricName, optimizeMetricPoints(metricPoints));
}
return metricMap;

View File

@@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionK
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.GroupMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component;
@@ -35,8 +34,6 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
register(this);
}
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("GroupMetricESDAO", 4,8, 500);
public List<GroupMetricPO> listLatestMetricsAggByGroupTopic(Long clusterPhyId, List<GroupTopic> groupTopicList, List<String> metrics, AggTypeEnum aggType){
Long latestTime = getLatestMetricTime();
Long startTime = latestTime - FIVE_MIN;
@@ -49,7 +46,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
List<GroupMetricPO> groupMetricPOS = new CopyOnWriteArrayList<>();
for(GroupTopic groupTopic : groupTopicList){
queryFuture.runnableTask(
esTPService.submitSearchTask(
String.format("class=GroupMetricESDAO||method=listLatestMetricsAggByGroupTopic||ClusterPhyId=%d||groupName=%s||topicName=%s",
clusterPhyId, groupTopic.getGroupName(), groupTopic.getTopicName()),
5000,
@@ -73,7 +70,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
});
}
queryFuture.waitExecute();
esTPService.waitExecute();
return groupMetricPOS;
}
@@ -126,13 +123,26 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
Integer partition = tp.getPartition();
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.LIST_GROUP_METRICS, clusterId, groupName, topic, partition, startTime, endTime, interval, aggDsl);
DslConstant.LIST_GROUP_METRICS,
clusterId,
groupName,
topic,
partition,
startTime,
endTime,
interval,
aggDsl
);
Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequest(realIndex, dsl,
s -> handleGroupMetrics(s, aggType, metrics), 3);
Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequest(
realIndex,
dsl,
s -> handleGroupMetrics(s, aggType, metrics),
DEFAULT_RETRY_TIME
);
for(String metric : metricMap.keySet()){
table.put(metric, topic + "&" + partition, metricMap.get(metric));
for(Map.Entry<String/*metric*/, List<MetricPointVO>> entry: metricMap.entrySet()){
table.put(entry.getKey(), topic + "&" + partition, entry.getValue());
}
}
@@ -188,23 +198,27 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
for(String metric : metrics){
List<MetricPointVO> metricPoints = new ArrayList<>();
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
esAggrMap.get(HIST).getBucketList().forEach(esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(value == null){return;}
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value.toString());
metricPoint.setName(metric);
metricPoints.add(metricPoint);
if (null == esBucket.getUnusedMap().get(KEY)) {
return;
}
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(value == null) {
return;
}
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value.toString());
metricPoint.setName(metric);
metricPoints.add(metricPoint);
}catch (Exception e){
LOGGER.error("method=handleESQueryResponse||metric={}||errMsg=exception!", metric, e);
LOGGER.error("method=handleGroupMetrics||metric={}||errMsg=exception!", metric, e);
}
} );

View File

@@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm;
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.TopicMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
@@ -26,7 +25,6 @@ import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateC
@Component
public class TopicMetricESDAO extends BaseMetricESDAO {
@PostConstruct
public void init() {
super.indexName = TOPIC_INDEX;
@@ -34,8 +32,6 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
register(this);
}
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("TopicMetricESDAO", 4,8, 500);
public List<TopicMetricPO> listTopicMaxMinMetrics(Long clusterPhyId, List<String> topics, String metric, boolean max, Long startTime, Long endTime){
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
@@ -128,8 +124,13 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
? dslLoaderUtil.getFormatDslByFileName( DslConstant.COUNT_TOPIC_METRIC_VALUE, clusterPhyId, topic, startTime, endTime, termDsl)
: dslLoaderUtil.getFormatDslByFileName( DslConstant.COUNT_TOPIC_NOT_METRIC_VALUE, clusterPhyId, topic, startTime, endTime, termDsl);
return esOpClient.performRequestWithRouting(topic, realIndex, dsl,
s -> handleESQueryResponseCount(s), 3);
return esOpClient.performRequestWithRouting(
topic,
realIndex,
dsl,
s -> handleESQueryResponseCount(s),
DEFAULT_RETRY_TIME
);
}
/**
@@ -207,38 +208,52 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
* 获取每个 metric 的 topN 个 topic 的指标,如果获取不到 topN 的topics, 则默认返回 defaultTopics 的指标
*/
public Table<String/*metric*/, String/*topics*/, List<MetricPointVO>> listTopicMetricsByTopN(Long clusterPhyId,
List<String> defaultTopics,
List<String> metrics,
List<String> defaultTopicNameList,
List<String> metricNameList,
String aggType,
int topN,
Long startTime,
Long endTime){
//1、获取topN要查询的topic每一个指标的topN的topic可能不一样
Map<String, List<String>> metricTopics = this.getTopNTopics(clusterPhyId, metrics, aggType, topN, startTime, endTime);
Map<String, List<String>> metricTopicsMap = this.getTopNTopics(clusterPhyId, metricNameList, aggType, topN, startTime, endTime);
Table<String, String, List<MetricPointVO>> table = HashBasedTable.create();
//2、获取topics列表
Set<String> topicNameSet = new HashSet<>(defaultTopicNameList);
metricTopicsMap.values().forEach(elem -> topicNameSet.addAll(elem));
for(String metric : metrics) {
table.putAll(this.listTopicMetricsByTopics(
clusterPhyId,
Arrays.asList(metric),
aggType,
metricTopics.getOrDefault(metric, defaultTopics),
startTime,
endTime)
);
//3、批量获取信息
Table<String, String, List<MetricPointVO>> allMetricsTable = this.listTopicMetricsByTopics(
clusterPhyId,
metricNameList,
aggType,
new ArrayList<>(topicNameSet),
startTime,
endTime
);
//4、获取Top-Metric
Table<String, String, List<MetricPointVO>> metricsTable = HashBasedTable.create();
for(String metricName: metricNameList) {
for (String topicName: metricTopicsMap.getOrDefault(metricName, defaultTopicNameList)) {
List<MetricPointVO> voList = allMetricsTable.get(metricName, topicName);
if (voList == null) {
continue;
}
metricsTable.put(metricName, topicName, voList);
}
}
return table;
return metricsTable;
}
/**
* 获取每个 metric 指定个 topic 的指标
*/
public Table<String/*metric*/, String/*topics*/, List<MetricPointVO>> listTopicMetricsByTopics(Long clusterPhyId,
List<String> metrics,
List<String> metricNameList,
String aggType,
List<String> topics,
List<String> topicNameList,
Long startTime,
Long endTime){
//1、获取需要查下的索引
@@ -248,37 +263,47 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
String interval = MetricsUtils.getInterval(endTime - startTime);
//3、构造agg查询条件
String aggDsl = buildAggsDSL(metrics, aggType);
String aggDsl = buildAggsDSL(metricNameList, aggType);
final Table<String, String, List<MetricPointVO>> table = HashBasedTable.create();
//4、构造dsl查询条件
for(String topic : topics){
for(String topicName : topicNameList){
try {
queryFuture.runnableTask(
String.format("class=TopicMetricESDAO||method=listTopicMetricsByTopics||ClusterPhyId=%d||topicName=%s",
clusterPhyId, topic),
esTPService.submitSearchTask(
String.format("class=TopicMetricESDAO||method=listTopicMetricsByTopics||ClusterPhyId=%d||topicName=%s", clusterPhyId, topicName),
3000,
() -> {
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslConstant.GET_TOPIC_AGG_LIST_METRICS, clusterPhyId, topic, startTime, endTime, interval, aggDsl);
DslConstant.GET_TOPIC_AGG_LIST_METRICS,
clusterPhyId,
topicName,
startTime,
endTime,
interval,
aggDsl
);
Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(topic, realIndex, dsl,
s -> handleListESQueryResponse(s, metrics, aggType), 3);
Map<String/*metric*/, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(
topicName,
realIndex,
dsl,
s -> handleListESQueryResponse(s, metricNameList, aggType),
DEFAULT_RETRY_TIME
);
synchronized (table){
for(String metric : metricMap.keySet()){
table.put(metric, topic, metricMap.get(metric));
for(Map.Entry<String/*metric*/, List<MetricPointVO>> entry: metricMap.entrySet()){
table.put(entry.getKey(), topicName, entry.getValue());
}
}
});
}catch (Exception e){
LOGGER.error("method=listBrokerMetricsByBrokerIds||clusterPhyId={}||brokerId{}||errMsg=exception!",
clusterPhyId, topic, e);
LOGGER.error("method=listTopicMetricsByTopics||clusterPhyId={}||topicName={}||errMsg=exception!", clusterPhyId, topicName, e);
}
}
queryFuture.waitExecute();
esTPService.waitExecute();
return table;
}
@@ -339,7 +364,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
Map<String, ESAggr> esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap){return metricMap;}
for(String metric : metrics){
@@ -352,15 +377,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(value == null){return;}
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value.toString());
metricPoint.setName(metric);
metricPoints.add(metricPoint);
}else {
LOGGER.info("");
metricPoints.add(new MetricPointVO(metric, timestamp, value.toString(), aggType));
}
}catch (Exception e){
LOGGER.error("method=handleListESQueryResponse||metric={}||errMsg=exception!", metric, e);
@@ -373,7 +390,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
return metricMap;
}
private Map<String, List<String>> handleTopTopicESQueryResponse(ESQueryResponse response, List<String> metrics, int topN){
private Map<String, List<String>> handleTopTopicESQueryResponse(ESQueryResponse response, List<String> metricNameList, int topN){
Map<String, List<String>> ret = new HashMap<>();
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
@@ -382,57 +399,37 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
Map<String, List<Tuple<String, Double>>> metricsTopicValueMap = new HashMap<>();
//1、先获取每个指标对应的所有 topic 以及指标的值
for(String metric : metrics) {
for(String metricName: metricNameList) {
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
String topic = esBucket.getUnusedMap().get(KEY).toString();
Double value = Double.valueOf(esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap()
.get(metric).getUnusedMap().get(VALUE).toString());
Double value = Double.valueOf(
esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap().get(metricName).getUnusedMap().get(VALUE).toString()
);
List<Tuple<String, Double>> brokerValue = (null == metricsTopicValueMap.get(metric)) ?
new ArrayList<>() : metricsTopicValueMap.get(metric);
brokerValue.add(new Tuple<>(topic, value));
metricsTopicValueMap.put(metric, brokerValue);
metricsTopicValueMap.putIfAbsent(metricName, new ArrayList<>());
metricsTopicValueMap.get(metricName).add(new Tuple<>(topic, value));
}
}catch (Exception e){
LOGGER.error("method=handleTopBrokerESQueryResponse||metric={}||errMsg=exception!", metric, e);
LOGGER.error("method=handleTopTopicESQueryResponse||metricName={}||errMsg=exception!", metricName, e);
}
} );
}
//2、对每个指标的broker按照指标值排序并截取前topN个brokerIds
for(String metric : metricsTopicValueMap.keySet()){
List<Tuple<String, Double>> brokerValue = metricsTopicValueMap.get(metric);
for(Map.Entry<String, List<Tuple<String, Double>>> entry: metricsTopicValueMap.entrySet()){
entry.getValue().sort((o1, o2) -> {
if(null == o1 || null == o2) {
return 0;
}
brokerValue.sort((o1, o2) -> {
if(null == o1 || null == o2){return 0;}
return o2.getV2().compareTo(o1.getV2());
} );
List<Tuple<String, Double>> temp = (brokerValue.size() > topN) ? brokerValue.subList(0, topN) : brokerValue;
List<String> topics = temp.stream().map(t -> t.getV1()).collect(Collectors.toList());
List<String> topicNameList = entry.getValue().subList(0, Math.min(entry.getValue().size(), topN)).stream().map(t -> t.getV1()).collect(Collectors.toList());
ret.put(metric, topics);
}
return ret;
}
private Map<String/*metric*/, Map<String/*topic*/, List<MetricPointVO>>> topicMetricMap2MetricTopicMap(
Map<String/*topic*/, Map<String/*metric*/, List<MetricPointVO>>> topicMetricMap){
Map<String/*metric*/, Map<String/*topic*/, List<MetricPointVO>>> ret = new HashMap<>();
for(String topic : topicMetricMap.keySet()){
Map<String/*metric*/, List<MetricPointVO>> metricMap = topicMetricMap.get(topic);
for(String metric : metricMap.keySet()){
Map<String/*topic*/, List<MetricPointVO>> brokerMap = (null == ret.get(metric)) ? new HashMap<>() : ret.get(metric);
brokerMap.put(topic, metricMap.get(metric));
ret.put(metric, brokerMap);
}
ret.put(entry.getKey(), topicNameList);
}
return ret;

View File

@@ -5,13 +5,11 @@ import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.*;
@@ -29,8 +27,6 @@ public class ConnectClusterMetricESDAO extends BaseMetricESDAO {
register( this);
}
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("ConnectClusterMetricESDAO", 4,8, 500);
/**
* 获取集群 clusterPhyId 中每个 metric 的 topN 的 connectCluster 在指定时间[startTime、endTime]区间内所有的指标
* topN 按照[startTime, endTime] 时间段内最后一个值来排序
@@ -97,7 +93,7 @@ public class ConnectClusterMetricESDAO extends BaseMetricESDAO {
aggDsl
);
queryFuture.runnableTask(
esTPService.submitSearchTask(
String.format("class=ConnectClusterMetricESDAO||method=listMetricsByConnectClusterIdList||ClusterPhyId=%d", clusterPhyId),
5000,
() -> {
@@ -106,24 +102,24 @@ public class ConnectClusterMetricESDAO extends BaseMetricESDAO {
realIndex,
dsl,
s -> handleListESQueryResponse(s, metricNameList, aggType),
3
DEFAULT_RETRY_TIME
);
synchronized (table) {
for(String metric : metricMap.keySet()){
table.put(metric, connectClusterId, metricMap.get(metric));
for(Map.Entry<String, List<MetricPointVO>> entry : metricMap.entrySet()){
table.put(entry.getKey(), connectClusterId, entry.getValue());
}
}
});
} catch (Exception e) {
LOGGER.error(
"class=ConnectClusterMetricESDAO||method=listMetricsByConnectClusterIdList||clusterPhyId={}||connectClusterId{}||errMsg=exception!",
"method=listMetricsByConnectClusterIdList||clusterPhyId={}||connectClusterId={}||errMsg=exception!",
clusterPhyId, connectClusterId, e
);
}
}
queryFuture.waitExecute();
esTPService.waitExecute();
return table;
}
@@ -167,107 +163,89 @@ public class ConnectClusterMetricESDAO extends BaseMetricESDAO {
/**************************************************** private method ****************************************************/
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metricNameList, String aggType){
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
if(null == response || null == response.getAggs()){
Map<String, ESAggr> esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap) {
return metricMap;
}
Map<String, ESAggr> esAggrMap = response.getAggs().getEsAggrMap();
if (null == esAggrMap || null == esAggrMap.get(HIST)) {
return metricMap;
}
if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){
return metricMap;
}
for(String metric : metrics){
for(String metricName : metricNameList){
List<MetricPointVO> metricPoints = new ArrayList<>();
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
esAggrMap.get(HIST).getBucketList().forEach(esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(null == value){return;}
Object value = esBucket.getAggrMap().get(metricName).getUnusedMap().get(VALUE);
if(null == value) {
return;
}
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value.toString());
metricPoint.setName(metric);
metricPoints.add(metricPoint);
}else {
LOGGER.info("");
metricPoints.add(new MetricPointVO(metricName, timestamp, value.toString(), aggType));
}
}catch (Exception e){
LOGGER.error("metric={}||errMsg=exception!", metric, e);
LOGGER.error("method=handleListESQueryResponse||metricName={}||errMsg=exception!", metricName, e);
}
} );
metricMap.put(metric, optimizeMetricPoints(metricPoints));
metricMap.put(metricName, optimizeMetricPoints(metricPoints));
}
return metricMap;
}
private Map<String, List<Long>> handleTopConnectClusterESQueryResponse(ESQueryResponse response, List<String> metrics, int topN){
private Map<String, List<Long>> handleTopConnectClusterESQueryResponse(ESQueryResponse response, List<String> metricNameList, int topN){
Map<String, List<Long>> ret = new HashMap<>();
if(null == response || null == response.getAggs()){
Map<String, ESAggr> esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap) {
return ret;
}
Map<String, ESAggr> esAggrMap = response.getAggs().getEsAggrMap();
if (null == esAggrMap || null == esAggrMap.get(HIST)) {
return ret;
}
if(CollectionUtils.isEmpty(esAggrMap.get(HIST).getBucketList())){
return ret;
}
Map<String, List<Tuple<Long, Double>>> metricBrokerValueMap = new HashMap<>();
Map<String, List<Tuple<Long, Double>>> metricConnectClusterValueMap = new HashMap<>();
//1、先获取每个指标对应的所有brokerIds以及指标的值
for(String metric : metrics) {
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
for(String metricName : metricNameList) {
esAggrMap.get(HIST).getBucketList().forEach(esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long connectorClusterId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(HIST).getBucketList()
.get(0).getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(null == value){return;}
List<Tuple<Long, Double>> connectorClusterValue = (null == metricBrokerValueMap.get(metric)) ?
new ArrayList<>() : metricBrokerValueMap.get(metric);
connectorClusterValue.add(new Tuple<>(connectorClusterId, Double.valueOf(value.toString())));
metricBrokerValueMap.put(metric, connectorClusterValue);
if (null == esBucket.getUnusedMap().get(KEY)) {
return;
}
Long connectorClusterId = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(HIST).getBucketList().get(0).getAggrMap().get(metricName).getUnusedMap().get(VALUE);
if(null == value) {
return;
}
metricConnectClusterValueMap.putIfAbsent(metricName, new ArrayList<>());
metricConnectClusterValueMap.get(metricName).add(new Tuple<>(connectorClusterId, Double.valueOf(value.toString())));
}catch (Exception e){
LOGGER.error("metric={}||errMsg=exception!", metric, e);
LOGGER.error("method=handleTopConnectClusterESQueryResponse||metricName={}||errMsg=exception!", metricName, e);
}
} );
}
//2、对每个指标的broker按照指标值排序并截取前topN个brokerIds
for(String metric : metricBrokerValueMap.keySet()){
List<Tuple<Long, Double>> connectorClusterValue = metricBrokerValueMap.get(metric);
//2、对每个指标的connect按照指标值排序并截取前topN个connectIds
for(Map.Entry<String, List<Tuple<Long, Double>>> entry : metricConnectClusterValueMap.entrySet()){
entry.getValue().sort((o1, o2) -> {
if(null == o1 || null == o2) {
return 0;
}
connectorClusterValue.sort((o1, o2) -> {
if(null == o1 || null == o2){return 0;}
return o2.getV2().compareTo(o1.getV2());
} );
});
List<Tuple<Long, Double>> temp = (connectorClusterValue.size() > topN) ? connectorClusterValue.subList(0, topN) : connectorClusterValue;
List<Long> connectorClusterIds = temp.stream().map(t -> t.getV1()).collect(Collectors.toList());
List<Long> connectorClusterIdList = entry.getValue()
.subList(0, Math.min(entry.getValue().size(), topN))
.stream()
.map(t -> t.getV1())
.collect(Collectors.toList());
ret.put(metric, connectorClusterIds);
ret.put(entry.getKey(), connectorClusterIdList);
}
return ret;

View File

@@ -8,7 +8,6 @@ import com.google.common.collect.Table;
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect.ConnectorMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
@@ -33,43 +32,55 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
register( this);
}
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("ConnectorMetricESDAO", 4,8, 500);
/**
* 获取每个 metric 的 topN 个 connector 的指标,如果获取不到 topN 的topics, 则默认返回 defaultTopics 的指标
* 获取每个 metric 的 topN 个 connector 的指标,如果获取不到 topN 的 connectors, 则默认返回 defaultTopics 的指标
*/
public Table<String/*metric*/, Tuple<Long, String>, List<MetricPointVO>> listMetricsByTopN(Long clusterPhyId,
List<Tuple<Long, String>> defaultConnectorList,
List<String> metricNameList,
String aggType,
int topN,
Long startTime,
Long endTime){
List<Tuple<Long, String>> defaultConnectorList,
List<String> metricNameList,
String aggType,
int topN,
Long startTime,
Long endTime) {
//1、获取topN要查询的topic每一个指标的topN的topic可能不一样
Map<String, List<Tuple<Long, String>>> metricsMap = this.getTopNConnectors(clusterPhyId, metricNameList, aggType, topN, startTime, endTime);
Table<String, Tuple<Long, String>, List<MetricPointVO>> table = HashBasedTable.create();
//2、获取connector列表
Set<Tuple<Long, String>> connectorSet = new HashSet<>(defaultConnectorList);
metricsMap.values().forEach(elem -> connectorSet.addAll(elem));
for(String metricName : metricNameList){
table.putAll(this.listMetricsByConnectors(
clusterPhyId,
Arrays.asList(metricName),
aggType,
metricsMap.getOrDefault(metricName, defaultConnectorList),
startTime,
endTime)
);
//3、批量获取信息
Table<String, Tuple<Long, String>, List<MetricPointVO>> allMetricsTable = this.listMetricsByConnectors(
clusterPhyId,
metricNameList,
aggType,
new ArrayList<>(connectorSet),
startTime,
endTime
);
//4、获取Top-Metric
Table<String, Tuple<Long, String>, List<MetricPointVO>> metricTable = HashBasedTable.create();
for (String metricName: metricNameList) {
for (Tuple<Long, String> connector: metricsMap.getOrDefault(metricName, defaultConnectorList)) {
List<MetricPointVO> voList = allMetricsTable.get(metricName, connector);
if (voList == null) {
continue;
}
metricTable.put(metricName, connector, voList);
}
}
return table;
// 返回结果
return metricTable;
}
public List<ConnectorMetricPO> getConnectorLatestMetric(Long clusterPhyId, List<Tuple<Long, String>> connectClusterIdAndConnectorNameList, List<String> metricsNames){
List<ConnectorMetricPO> connectorMetricPOS = new CopyOnWriteArrayList<>();
for(Tuple<Long, String> connectClusterIdAndConnectorName : connectClusterIdAndConnectorNameList){
queryFuture.runnableTask(
esTPService.submitSearchTask(
"getConnectorLatestMetric",
30000,
() -> {
@@ -78,7 +89,7 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
});
}
queryFuture.waitExecute();
esTPService.waitExecute();
return connectorMetricPOS;
}
@@ -112,11 +123,11 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
* 获取每个 metric 指定个 topic 的指标
*/
public Table<String/*metric*/, Tuple<Long, String>, List<MetricPointVO>> listMetricsByConnectors(Long clusterPhyId,
List<String> metrics,
String aggType,
List<Tuple<Long, String>> connectorList,
Long startTime,
Long endTime) {
List<String> metricNameList,
String aggType,
List<Tuple<Long, String>> connectorList,
Long startTime,
Long endTime) {
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
@@ -124,17 +135,15 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
String interval = MetricsUtils.getInterval(endTime - startTime);
//3、构造agg查询条件
String aggDsl = buildAggsDSL(metrics, aggType);
String aggDsl = buildAggsDSL(metricNameList, aggType);
final Table<String, Tuple<Long, String>, List<MetricPointVO>> table = HashBasedTable.create();
//4、构造dsl查询条件
for(Tuple<Long, String> connector : connectorList) {
try {
queryFuture.runnableTask(
String.format(
"method=listConnectorMetricsByConnectors||ClusterPhyId=%d||connectorName=%s",
clusterPhyId, connector.getV2() ),
esTPService.submitSearchTask(
String.format("class=ConnectorMetricESDAO||method=listMetricsByConnectors||ClusterPhyId=%d||connectorName=%s", clusterPhyId, connector.getV2()),
3000,
() -> {
String dsl = dslLoaderUtil.getFormatDslByFileName(
@@ -152,25 +161,25 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
connector.getV1().toString(),
realIndex,
dsl,
s -> handleListESQueryResponse(s, metrics, aggType),
s -> handleListESQueryResponse(s, metricNameList, aggType),
3
);
synchronized (table){
for(String metric : metricMap.keySet()){
table.put(metric, connector, metricMap.get(metric));
synchronized (table) {
for(Map.Entry<String/*metricName*/, List<MetricPointVO>> entry: metricMap.entrySet()){
table.put(entry.getKey(), connector, entry.getValue());
}
}
});
} catch (Exception e) {
LOGGER.error(
"method=listConnectorMetricsByConnectors||clusterPhyId={}||connectorName{}||errMsg=exception!",
clusterPhyId, connector.getV2(), e
"method=listMetricsByConnectors||clusterPhyId={}||connectClusterId={}||connectorName{}||errMsg=exception!",
clusterPhyId, connector.getV1(), connector.getV2(), e
);
}
}
queryFuture.waitExecute();
esTPService.waitExecute();
return table;
}
@@ -181,7 +190,7 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
String aggType,
int topN,
Long startTime,
Long endTime){
Long endTime) {
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
@@ -209,46 +218,19 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
);
}
/**************************************************** private method ****************************************************/
private Table<String/*topic*/, String/*metric*/, MetricPointVO> handleSingleESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Table<String, String, MetricPointVO> table = HashBasedTable.create();
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap){return table;}
for(String metric : metrics){
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
String topic = esBucket.getUnusedMap().get(KEY).toString();
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setValue(value);
metricPoint.setName(metric);
table.put(topic, metric, metricPoint);
}else {
LOGGER.debug("method=handleListESQueryResponse||metric={}||errMsg=get topic is null!", metric);
}
}catch (Exception e){
LOGGER.error("method=handleListESQueryResponse||metric={}||errMsg=exception!", metric, e);
}
});
}
return table;
}
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap){return metricMap;}
Map<String, ESAggr> esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap) {
return metricMap;
}
for(String metric : metrics){
for(String metric : metrics) {
List<MetricPointVO> metricPoints = new ArrayList<>();
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
@@ -256,17 +238,11 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
Object value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE);
if(value == null){return;}
if(value == null){
return;
}
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value.toString());
metricPoint.setName(metric);
metricPoints.add(metricPoint);
}else {
LOGGER.info("");
metricPoints.add(new MetricPointVO(metric, timestamp, value.toString(), aggType));
}
}catch (Exception e){
LOGGER.error("method=handleListESQueryResponse||metric={}||errMsg=exception!", metric, e);
@@ -279,10 +255,12 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
return metricMap;
}
private Map<String, List<Tuple<Long, String>>> handleTopConnectorESQueryResponse(ESQueryResponse response, List<String> metricNameList, int topN){
private Map<String, List<Tuple<Long, String>>> handleTopConnectorESQueryResponse(ESQueryResponse response,
List<String> metricNameList,
int topN) {
Map<String, List<Tuple<Long, String>>> ret = new HashMap<>();
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
Map<String, ESAggr> esAggrMap = this.checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap) {
return ret;
}
@@ -291,7 +269,7 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
// 1、先获取每个指标对应的所有 connector 以及指标的值
for(String metricName : metricNameList) {
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
esAggrMap.get(HIST).getBucketList().forEach(esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
String connectorNameAndClusterId = esBucket.getUnusedMap().get(KEY).toString();
@@ -338,24 +316,6 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
return ret;
}
private Map<String/*metric*/, Map<String/*topic*/, List<MetricPointVO>>> topicMetricMap2MetricTopicMap(
Map<String/*topic*/, Map<String/*metric*/, List<MetricPointVO>>> topicMetricMap){
Map<String/*metric*/, Map<String/*topic*/, List<MetricPointVO>>> ret = new HashMap<>();
for(String topic : topicMetricMap.keySet()){
Map<String/*metric*/, List<MetricPointVO>> metricMap = topicMetricMap.get(topic);
for(String metric : metricMap.keySet()){
Map<String/*topic*/, List<MetricPointVO>> brokerMap = (null == ret.get(metric)) ? new HashMap<>() : ret.get(metric);
brokerMap.put(topic, metricMap.get(metric));
ret.put(metric, brokerMap);
}
}
return ret;
}
private Tuple<String, Long> splitConnectorNameAndClusterId(String connectorNameAndClusterId){
String[] ss = connectorNameAndClusterId.split("#");
if(null == ss || ss.length != 2){return null;}

View File

@@ -50,7 +50,6 @@ logging:
thread-pool:
scheduled:
thread-num: 2 # @Scheduled任务的线程池大小默认是一个
collector: # 采集模块的配置
future-util: # 采集模块线程池配置
num: 3 # 线程池个数
@@ -58,7 +57,6 @@ thread-pool:
queue-size: 10000 # 每个线程池队列大小
select-suitable-enable: true # 任务是否自动选择合适的线程池,非主要,可不修改
suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改
task: # 任务模块的配置
metrics: # metrics采集任务配置
thread-num: 18 # metrics采集任务线程池核心线程数
@@ -69,6 +67,10 @@ thread-pool:
common: # 剩余其他任务配置
thread-num: 15 # 剩余其他任务线程池核心线程数
queue-size: 150 # 剩余其他任务线程池队列大小
es:
search: # es查询线程池
thread-num: 10 # 线程池大小
queue-size: 5000 # 队列大小
# 客户端池大小相关配置