From c1031a492a26454fbbd719969990963d51ec7118 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 8 Nov 2022 10:29:54 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=E5=A2=9E=E5=8A=A0ES=E7=B4=A2?= =?UTF-8?q?=E5=BC=95=E5=88=A0=E9=99=A4=E7=9A=84=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/persistence/es/ESOpClient.java | 67 ++++++++++++++++++- .../persistence/es/dao/BaseMetricESDAO.java | 27 ++++++++ km-rest/src/main/resources/application.yml | 2 + 3 files changed, 93 insertions(+), 3 deletions(-) diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index c70a4df6..1efa01d6 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -11,6 +11,9 @@ import com.didiglobal.logi.elasticsearch.client.request.batch.ESBatchRequest; import com.didiglobal.logi.elasticsearch.client.request.query.query.ESQueryRequest; import com.didiglobal.logi.elasticsearch.client.response.batch.ESBatchResponse; import com.didiglobal.logi.elasticsearch.client.response.batch.IndexResultItemNode; +import com.didiglobal.logi.elasticsearch.client.response.indices.catindices.CatIndexResult; +import com.didiglobal.logi.elasticsearch.client.response.indices.catindices.ESIndicesCatIndicesResponse; +import com.didiglobal.logi.elasticsearch.client.response.indices.deleteindex.ESIndicesDeleteIndexResponse; import com.didiglobal.logi.elasticsearch.client.response.indices.gettemplate.ESIndicesGetTemplateResponse; import com.didiglobal.logi.elasticsearch.client.response.indices.putindex.ESIndicesPutIndexResponse; import com.didiglobal.logi.elasticsearch.client.response.indices.puttemplate.ESIndicesPutTemplateResponse; @@ -37,6 +40,7 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; @Component public class ESOpClient { @@ -77,6 +81,8 @@ public class ESOpClient { */ private LinkedBlockingQueue esClientPool; + private static final Integer ES_OPERATE_TIMEOUT = 30; + @PostConstruct public void init(){ esClientPool = new LinkedBlockingQueue<>( clientCnt ); @@ -380,7 +386,7 @@ public class ESOpClient { if (client != null) { try { ESIndicesPutIndexResponse response = client.admin().indices().preparePutIndex(indexName).execute() - .actionGet(30, TimeUnit.SECONDS); + .actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS); return response.getAcknowledged(); } catch (Exception e){ LOGGER.warn( "msg=create index fail||indexName={}", indexName, e); @@ -400,7 +406,7 @@ public class ESOpClient { // 获取es中原来index template的配置 ESIndicesGetTemplateResponse getTemplateResponse = - esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( 30, TimeUnit.SECONDS ); + esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS ); TemplateConfig templateConfig = getTemplateResponse.getMultiTemplatesConfig().getSingleConfig(); @@ -433,7 +439,7 @@ public class ESOpClient { // 创建新的模板 ESIndicesPutTemplateResponse response = esClient.admin().indices().preparePutTemplate( indexTemplateName ) - .setTemplateConfig( config ).execute().actionGet( 30, TimeUnit.SECONDS ); + .setTemplateConfig( config ).execute().actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS ); return response.getAcknowledged(); } catch (Exception e) { @@ -449,6 +455,61 @@ public class ESOpClient { return false; } + /** + * 根据索引模板获取所有的索引 + * @param indexName + * @return + */ + public List listIndexByName(String indexName){ + ESClient esClient = null; + + try { + esClient = this.getESClientFromPool(); + + ESIndicesCatIndicesResponse response = esClient.admin().indices().prepareCatIndices(indexName + "*").execute() + .actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS); + + if(null != response){ + return response.getCatIndexResults().stream().map(CatIndexResult::getIndex).collect(Collectors.toList()); + } + } catch (Exception e) { + LOGGER.warn( "method=listIndexByTemplate||indexName={}||msg=exception!", + indexName, e); + } finally { + if (esClient != null) { + this.returnESClientToPool(esClient); + } + } + + return new ArrayList<>(); + } + + /** + * 删除索引 + * @param indexRealName + * @return + */ + public boolean delIndexByName(String indexRealName){ + ESClient esClient = null; + + try { + esClient = this.getESClientFromPool(); + + ESIndicesDeleteIndexResponse response = esClient.admin().indices().prepareDeleteIndex(indexRealName).execute() + .actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS); + return response.getAcknowledged(); + } catch (Exception e) { + LOGGER.warn( "method=delIndexByName||indexRealName={}||msg=exception!", + indexRealName, e); + } finally { + if (esClient != null) { + this.returnESClientToPool(esClient); + } + } + + return false; + } + /**************************************************** private method ****************************************************/ /** diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java index faeb64cb..39c9ef44 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java @@ -10,10 +10,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; +import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils; import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.util.CollectionUtils; @@ -42,6 +44,12 @@ public class BaseMetricESDAO extends BaseESDAO { */ private static Map ariusStatsEsDaoMap = Maps.newConcurrentMap(); + /** + * es 地址 + */ + @Value("${es.index.expire:60}") + private int indexExpireDays; + /** * 检查 es 索引是否存在,不存在则创建索引 */ @@ -62,6 +70,25 @@ public class BaseMetricESDAO extends BaseESDAO { } } + @Scheduled(cron = "0 30/45 * * * ?") + public void delExpireIndex(){ + List indexList = esOpClient.listIndexByName(indexName); + if(CollectionUtils.isEmpty(indexList)){return;} + + indexList.sort((o1, o2) -> -o1.compareTo(o2)); + + int size = indexList.size(); + if(size > indexExpireDays){ + if(!EnvUtil.isOnline()){ + LOGGER.info("method=delExpireIndex||indexExpireDays={}||delIndex={}", + indexExpireDays, indexList.subList(indexExpireDays, size)); + } + + indexList.subList(indexExpireDays, size).stream().forEach( + s -> esOpClient.delIndexByName(s)); + } + } + public static BaseMetricESDAO getByStatsType(String statsType) { return ariusStatsEsDaoMap.get(statsType); } diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index 4a4b7f1c..c9cd3c5e 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -88,6 +88,8 @@ es: client-cnt: 10 # 创建的ES客户端数 io-thread-cnt: 2 max-retry-cnt: 5 + index: + expire: 60 # 索引过期天数,60表示超过60天的索引会被KS过期删除 # 普罗米修斯指标导出相关配置 management: