mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
自动创建ES索引 & 主动填补指标历史曲线缺少的点
This commit is contained in:
@@ -8,7 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
/**
|
||||
* 直接操作es集群的dao
|
||||
*/
|
||||
public class BaseESDAO {
|
||||
public abstract class BaseESDAO {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("ES_LOGGER");
|
||||
|
||||
/**
|
||||
|
||||
@@ -11,7 +11,11 @@ import com.didiglobal.logi.elasticsearch.client.request.batch.ESBatchRequest;
|
||||
import com.didiglobal.logi.elasticsearch.client.request.query.query.ESQueryRequest;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.batch.ESBatchResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.batch.IndexResultItemNode;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.gettemplate.ESIndicesGetTemplateResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.putindex.ESIndicesPutIndexResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.puttemplate.ESIndicesPutTemplateResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.setting.template.TemplateConfig;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.google.common.collect.Lists;
|
||||
@@ -340,7 +344,94 @@ public class ESOpClient {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据表达式判断索引是否已存在
|
||||
*/
|
||||
public boolean indexExist(String indexName) {
|
||||
ESClient esClient = null;
|
||||
try {
|
||||
esClient = this.getESClientFromPool();
|
||||
if (esClient == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 检查索引是否存在
|
||||
return esClient.admin().indices().prepareExists(indexName).execute().actionGet(30, TimeUnit.SECONDS).isExists();
|
||||
} catch (Exception e){
|
||||
LOGGER.warn("class=ESOpClient||method=indexExist||indexName={}||msg=exception!", indexName, e);
|
||||
} finally {
|
||||
if (esClient != null) {
|
||||
returnESClientToPool(esClient);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建索引
|
||||
*/
|
||||
public boolean createIndex(String indexName) {
|
||||
if (indexExist(indexName)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
ESClient client = getESClientFromPool();
|
||||
if (client != null) {
|
||||
try {
|
||||
ESIndicesPutIndexResponse response = client.admin().indices().preparePutIndex(indexName).execute()
|
||||
.actionGet(30, TimeUnit.SECONDS);
|
||||
return response.getAcknowledged();
|
||||
} catch (Exception e){
|
||||
LOGGER.warn( "msg=create index fail||indexName={}", indexName, e);
|
||||
} finally {
|
||||
returnESClientToPool(client);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建索引模板
|
||||
*/
|
||||
public boolean createIndexTemplateIfNotExist(String indexTemplateName, String config) {
|
||||
ESClient esClient = null;
|
||||
|
||||
try {
|
||||
esClient = this.getESClientFromPool();
|
||||
|
||||
// 获取es中原来index template的配置
|
||||
ESIndicesGetTemplateResponse getTemplateResponse =
|
||||
esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( 30, TimeUnit.SECONDS );
|
||||
|
||||
TemplateConfig templateConfig = getTemplateResponse.getMultiTemplatesConfig().getSingleConfig();
|
||||
|
||||
if (null != templateConfig) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 创建新的模板
|
||||
ESIndicesPutTemplateResponse response = esClient.admin().indices().preparePutTemplate( indexTemplateName )
|
||||
.setTemplateConfig( config ).execute().actionGet( 30, TimeUnit.SECONDS );
|
||||
|
||||
return response.getAcknowledged();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn(
|
||||
"class=ESOpClient||method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!",
|
||||
indexTemplateName, config, e
|
||||
);
|
||||
} finally {
|
||||
if (esClient != null) {
|
||||
this.returnESClientToPool(esClient);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
/**
|
||||
* 执行查询
|
||||
* @param request
|
||||
|
||||
@@ -8,11 +8,12 @@ import com.google.common.collect.Maps;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.*;
|
||||
@@ -25,7 +26,8 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
/**
|
||||
* 操作的索引名称
|
||||
*/
|
||||
protected String indexName;
|
||||
protected String indexName;
|
||||
protected String indexTemplate;
|
||||
|
||||
protected static final Long ONE_MIN = 60 * 1000L;
|
||||
protected static final Long FIVE_MIN = 5 * ONE_MIN;
|
||||
@@ -35,10 +37,24 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
/**
|
||||
* 不同维度 kafka 监控数据
|
||||
*/
|
||||
private static Map<KafkaMetricIndexEnum, BaseMetricESDAO> ariusStatsEsDaoMap = Maps
|
||||
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps
|
||||
.newConcurrentMap();
|
||||
|
||||
public static BaseMetricESDAO getByStatsType(KafkaMetricIndexEnum statsType) {
|
||||
/**
|
||||
* 检查 es 索引是否存在,不存在则创建索引
|
||||
*/
|
||||
@Scheduled(cron = "0 3/5 * * * ?")
|
||||
public void checkCurrentDayIndexExist(){
|
||||
String realIndex = IndexNameUtils.genCurrentDailyIndexName(indexName);
|
||||
|
||||
if(esOpClient.indexExist(realIndex)){return;}
|
||||
|
||||
if(esOpClient.createIndexTemplateIfNotExist(indexName, indexTemplate)){
|
||||
esOpClient.createIndex(realIndex);
|
||||
}
|
||||
}
|
||||
|
||||
public static BaseMetricESDAO getByStatsType(String statsType) {
|
||||
return ariusStatsEsDaoMap.get(statsType);
|
||||
}
|
||||
|
||||
@@ -48,7 +64,7 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
* @param statsType
|
||||
* @param baseAriusStatsEsDao
|
||||
*/
|
||||
public static void register(KafkaMetricIndexEnum statsType, BaseMetricESDAO baseAriusStatsEsDao) {
|
||||
public static void register(String statsType, BaseMetricESDAO baseAriusStatsEsDao) {
|
||||
ariusStatsEsDaoMap.put(statsType, baseAriusStatsEsDao);
|
||||
}
|
||||
|
||||
@@ -358,7 +374,50 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
String dsl = dslLoaderUtil.getFormatDslByFileName(DslsConstant.GET_LATEST_METRIC_TIME, startTime, endTime, appendQueryDsl);
|
||||
String realIndexName = IndexNameUtils.genDailyIndexName(indexName, startTime, endTime);
|
||||
|
||||
return esOpClient.performRequest(realIndexName, dsl, s -> s.getHits().getHits().isEmpty()
|
||||
? System.currentTimeMillis() : ((Map<String, Long>)s.getHits().getHits().get(0).getSource()).get(TIME_STAMP), 3);
|
||||
return esOpClient.performRequest(
|
||||
realIndexName,
|
||||
dsl,
|
||||
s -> s == null || s.getHits().getHits().isEmpty() ? System.currentTimeMillis() : ((Map<String, Long>)s.getHits().getHits().get(0).getSource()).get(TIME_STAMP),
|
||||
3
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 对 metricPointVOS 进行缺点优化
|
||||
*/
|
||||
protected List<MetricPointVO> optimizeMetricPoints(List<MetricPointVO> metricPointVOS){
|
||||
if(CollectionUtils.isEmpty(metricPointVOS)){return metricPointVOS;}
|
||||
|
||||
int size = metricPointVOS.size();
|
||||
if(size < 2){return metricPointVOS;}
|
||||
|
||||
Collections.sort(metricPointVOS);
|
||||
|
||||
List<MetricPointVO> rets = new ArrayList<>();
|
||||
for(int first = 0, second = first + 1; second < size; first++, second++){
|
||||
MetricPointVO firstPoint = metricPointVOS.get(first);
|
||||
MetricPointVO secondPoint = metricPointVOS.get(second);
|
||||
|
||||
if(null != firstPoint && null != secondPoint){
|
||||
rets.add(firstPoint);
|
||||
|
||||
//说明有空点,那就增加一个点
|
||||
if(secondPoint.getTimeStamp() - firstPoint.getTimeStamp() > ONE_MIN){
|
||||
MetricPointVO addPoint = new MetricPointVO();
|
||||
addPoint.setName(firstPoint.getName());
|
||||
addPoint.setAggType(firstPoint.getAggType());
|
||||
addPoint.setValue(firstPoint.getValue());
|
||||
addPoint.setTimeStamp(firstPoint.getTimeStamp() + ONE_MIN);
|
||||
|
||||
rets.add(addPoint);
|
||||
}
|
||||
|
||||
if(second == size - 1){
|
||||
rets.add(secondPoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rets;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,14 +18,16 @@ import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.BROKER_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
@Component
|
||||
public class BrokerMetricESDAO extends BaseMetricESDAO {
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = BROKER_INFO.getIndex();
|
||||
BaseMetricESDAO.register(BROKER_INFO, this);
|
||||
super.indexName = BROKER_INDEX;
|
||||
super.indexTemplate = BROKER_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("BrokerMetricESDAO", 4,8, 500);
|
||||
@@ -258,7 +260,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
} );
|
||||
|
||||
metricMap.put(metric, metricPoints);
|
||||
metricMap.put(metric, optimizeMetricPoints(metricPoints));
|
||||
}
|
||||
|
||||
return metricMap;
|
||||
|
||||
@@ -23,15 +23,17 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.CLUSTER_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
@Component
|
||||
public class ClusterMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = CLUSTER_INFO.getIndex();
|
||||
BaseMetricESDAO.register(CLUSTER_INFO, this);
|
||||
super.indexName = CLUSTER_INDEX;
|
||||
super.indexTemplate = CLUSTER_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("ClusterMetricESDAO", 4,8, 500);
|
||||
@@ -207,7 +209,7 @@ public class ClusterMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
} );
|
||||
|
||||
metricMap.put(metric, metricPoints);
|
||||
metricMap.put(metric, optimizeMetricPoints(metricPoints));
|
||||
}
|
||||
|
||||
return metricMap;
|
||||
|
||||
@@ -23,16 +23,17 @@ import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.Constant.ZERO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.KEY;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.GROUP_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
@Component
|
||||
public class GroupMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = GROUP_INFO.getIndex();
|
||||
BaseMetricESDAO.register(GROUP_INFO, this);
|
||||
super.indexName = GROUP_INDEX;
|
||||
super.indexTemplate = GROUP_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("GroupMetricESDAO", 4,8, 500);
|
||||
@@ -206,7 +207,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
} );
|
||||
|
||||
metricMap.put(metric, metricPoints);
|
||||
metricMap.put(metric, optimizeMetricPoints(metricPoints));
|
||||
}
|
||||
|
||||
return metricMap;
|
||||
|
||||
@@ -8,7 +8,7 @@ import javax.annotation.PostConstruct;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.PARTITION_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
@@ -18,8 +18,10 @@ public class PartitionMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = PARTITION_INFO.getIndex();
|
||||
BaseMetricESDAO.register(PARTITION_INFO, this);
|
||||
super.indexName = PARTITION_INDEX;
|
||||
super.indexTemplate = PARTITION_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
public PartitionMetricPO getPartitionLatestMetrics(Long clusterPhyId, String topic,
|
||||
|
||||
@@ -14,7 +14,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.VALUE;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.REPLICATION_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
@@ -24,8 +24,10 @@ public class ReplicationMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = REPLICATION_INFO.getIndex();
|
||||
BaseMetricESDAO.register(REPLICATION_INFO, this);
|
||||
super.indexName = REPLICATION_INDEX;
|
||||
super.indexTemplate = REPLICATION_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -22,15 +22,17 @@ import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.TOPIC_INFO;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
||||
|
||||
@Component
|
||||
public class TopicMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = TOPIC_INFO.getIndex();
|
||||
BaseMetricESDAO.register(TOPIC_INFO, this);
|
||||
super.indexName = TOPIC_INDEX;
|
||||
super.indexTemplate = TOPIC_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
protected FutureWaitUtil<Void> queryFuture = FutureWaitUtil.init("TopicMetricESDAO", 4,8, 500);
|
||||
@@ -352,7 +354,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
|
||||
}
|
||||
} );
|
||||
|
||||
metricMap.put(metric, metricPoints);
|
||||
metricMap.put(metric, optimizeMetricPoints(metricPoints));
|
||||
}
|
||||
|
||||
return metricMap;
|
||||
|
||||
Reference in New Issue
Block a user