ZK-指标采集入ES

This commit is contained in:
zengqiao
2022-10-08 15:31:59 +08:00
parent 7d781712c9
commit b4cc31c459
26 changed files with 1126 additions and 42 deletions

View File

@@ -40,8 +40,7 @@ public class BaseMetricESDAO extends BaseESDAO {
/**
* 不同维度 kafka 监控数据
*/
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps
.newConcurrentMap();
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps.newConcurrentMap();
/**
* 检查 es 索引是否存在,不存在则创建索引

View File

@@ -0,0 +1,106 @@
package com.xiaojukeji.know.streaming.km.persistence.es.dao;
import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse;
import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.constant.ESConstant;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_TEMPLATE;
@Component
public class ZookeeperMetricESDAO extends BaseMetricESDAO {
@PostConstruct
public void init() {
super.indexName = ZOOKEEPER_INDEX;
super.indexTemplate = ZOOKEEPER_TEMPLATE;
checkCurrentDayIndexExist();
BaseMetricESDAO.register(indexName, this);
}
/**
* 获取指定集群,指定指标,一段时间内的值
*/
public Map<String/*metricName*/, List<MetricPointVO>> listMetricsByClusterPhyId(Long clusterPhyId,
List<String> metricNameList,
String aggType,
Long startTime,
Long endTime) {
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
//2、根据查询的时间区间大小来确定指标点的聚合区间大小
String interval = MetricsUtils.getInterval(endTime - startTime);
//3、构造agg查询条件
String aggDsl = buildAggsDSL(metricNameList, aggType);
//4、构造dsl查询条件开始查询
try {
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_ZOOKEEPER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl);
return esOpClient.performRequestWithRouting(
String.valueOf(clusterPhyId),
realIndex,
dsl,
s -> handleListESQueryResponse(s, metricNameList, aggType),
ESConstant.DEFAULT_RETRY_TIME
);
} catch (Exception e){
LOGGER.error("class=ZookeeperMetricESDAO||method=listMetricsByClusterPhyId||clusterPhyId={}||errMsg=exception!",
clusterPhyId, e
);
}
return new HashMap<>();
}
/**************************************************** private method ****************************************************/
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap) {
return new HashMap<>();
}
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
for(String metric : metrics){
List<MetricPointVO> metricPoints = new ArrayList<>();
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value);
metricPoint.setName(metric);
metricPoints.add(metricPoint);
}
}catch (Exception e){
LOGGER.error("method=handleESQueryResponse||metric={}||errMsg=exception!", metric, e);
}
} );
metricMap.put(metric, optimizeMetricPoints(metricPoints));
}
return metricMap;
}
}

View File

@@ -80,4 +80,6 @@ public class DslsConstant {
public static final String COUNT_GROUP_NOT_METRIC_VALUE = "GroupMetricESDAO/countGroupNotMetricValue";
/**************************************************** Zookeeper ****************************************************/
public static final String GET_ZOOKEEPER_AGG_LIST_METRICS = "ZookeeperMetricESDAO/getAggListZookeeperMetrics";
}

View File

@@ -12,5 +12,7 @@ import javax.management.ObjectName;
public interface JmxDAO {
Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute);
Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute);
Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute);
Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig);
}

View File

@@ -19,24 +19,28 @@ public class JmxDAOImpl implements JmxDAO {
@Override
public Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) {
return this.getJmxValue(null, null, jmxHost, jmxPort, jmxConfig, objectName, attribute);
return this.getJmxValue(null, jmxHost, jmxPort, jmxConfig, objectName, attribute);
}
@Override
public Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) {
public Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) {
JmxConnectorWrap jmxConnectorWrap = null;
try {
jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, brokerId, null, jmxHost, jmxPort, jmxConfig);
jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, null, null, jmxHost, jmxPort, jmxConfig);
if (!jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) {
log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed",
clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig);
log.error(
"method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed",
clusterPhyId, jmxHost, jmxPort, jmxConfig
);
return null;
}
return jmxConnectorWrap.getAttribute(objectName, attribute);
} catch (Exception e) {
log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg={}",
clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e);
log.error(
"method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg=exception!",
clusterPhyId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e
);
} finally {
if (jmxConnectorWrap != null) {
jmxConnectorWrap.close();
@@ -45,4 +49,27 @@ public class JmxDAOImpl implements JmxDAO {
return null;
}
@Override
public Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig) {
try {
Object object = this.getJmxValue(
clusterPhyId,
jmxHost,
jmxPort,
jmxConfig,
new ObjectName("java.lang:type=Runtime"),
"StartTime"
);
return object == null? null: (Long) object;
} catch (Exception e) {
log.error(
"class=JmxDAOImpl||method=getServerStartTime||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMsg=exception!",
clusterPhyId, jmxHost, jmxPort, jmxConfig, e
);
}
return null;
}
}

View File

@@ -0,0 +1,44 @@
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"clusterPhyId": {
"value": %d
}
}
},
{
"term": {
"brokerId": {
"value": %d
}
}
},
{
"range": {
"timestamp": {
"gte": %d,
"lte": %d
}
}
}
]
}
},
"aggs": {
"hist": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "%s",
"time_zone": "Asia/Shanghai",
"min_doc_count": 0
},
"aggs": {
%s
}
}
}
}