mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Optimize]增加ES索引删除的功能
This commit is contained in:
@@ -11,6 +11,9 @@ import com.didiglobal.logi.elasticsearch.client.request.batch.ESBatchRequest;
|
||||
import com.didiglobal.logi.elasticsearch.client.request.query.query.ESQueryRequest;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.batch.ESBatchResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.batch.IndexResultItemNode;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.catindices.CatIndexResult;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.catindices.ESIndicesCatIndicesResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.deleteindex.ESIndicesDeleteIndexResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.gettemplate.ESIndicesGetTemplateResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.putindex.ESIndicesPutIndexResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.indices.puttemplate.ESIndicesPutTemplateResponse;
|
||||
@@ -37,6 +40,7 @@ import java.util.List;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Component
|
||||
public class ESOpClient {
|
||||
@@ -77,6 +81,8 @@ public class ESOpClient {
|
||||
*/
|
||||
private LinkedBlockingQueue<ESClient> esClientPool;
|
||||
|
||||
private static final Integer ES_OPERATE_TIMEOUT = 30;
|
||||
|
||||
@PostConstruct
|
||||
public void init(){
|
||||
esClientPool = new LinkedBlockingQueue<>( clientCnt );
|
||||
@@ -380,7 +386,7 @@ public class ESOpClient {
|
||||
if (client != null) {
|
||||
try {
|
||||
ESIndicesPutIndexResponse response = client.admin().indices().preparePutIndex(indexName).execute()
|
||||
.actionGet(30, TimeUnit.SECONDS);
|
||||
.actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS);
|
||||
return response.getAcknowledged();
|
||||
} catch (Exception e){
|
||||
LOGGER.warn( "msg=create index fail||indexName={}", indexName, e);
|
||||
@@ -400,7 +406,7 @@ public class ESOpClient {
|
||||
|
||||
// 获取es中原来index template的配置
|
||||
ESIndicesGetTemplateResponse getTemplateResponse =
|
||||
esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( 30, TimeUnit.SECONDS );
|
||||
esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS );
|
||||
|
||||
TemplateConfig templateConfig = getTemplateResponse.getMultiTemplatesConfig().getSingleConfig();
|
||||
|
||||
@@ -433,7 +439,7 @@ public class ESOpClient {
|
||||
|
||||
// 创建新的模板
|
||||
ESIndicesPutTemplateResponse response = esClient.admin().indices().preparePutTemplate( indexTemplateName )
|
||||
.setTemplateConfig( config ).execute().actionGet( 30, TimeUnit.SECONDS );
|
||||
.setTemplateConfig( config ).execute().actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS );
|
||||
|
||||
return response.getAcknowledged();
|
||||
} catch (Exception e) {
|
||||
@@ -449,6 +455,61 @@ public class ESOpClient {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据索引模板获取所有的索引
|
||||
* @param indexName
|
||||
* @return
|
||||
*/
|
||||
public List<String> listIndexByName(String indexName){
|
||||
ESClient esClient = null;
|
||||
|
||||
try {
|
||||
esClient = this.getESClientFromPool();
|
||||
|
||||
ESIndicesCatIndicesResponse response = esClient.admin().indices().prepareCatIndices(indexName + "*").execute()
|
||||
.actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS);
|
||||
|
||||
if(null != response){
|
||||
return response.getCatIndexResults().stream().map(CatIndexResult::getIndex).collect(Collectors.toList());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn( "method=listIndexByTemplate||indexName={}||msg=exception!",
|
||||
indexName, e);
|
||||
} finally {
|
||||
if (esClient != null) {
|
||||
this.returnESClientToPool(esClient);
|
||||
}
|
||||
}
|
||||
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除索引
|
||||
* @param indexRealName
|
||||
* @return
|
||||
*/
|
||||
public boolean delIndexByName(String indexRealName){
|
||||
ESClient esClient = null;
|
||||
|
||||
try {
|
||||
esClient = this.getESClientFromPool();
|
||||
|
||||
ESIndicesDeleteIndexResponse response = esClient.admin().indices().prepareDeleteIndex(indexRealName).execute()
|
||||
.actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS);
|
||||
return response.getAcknowledged();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn( "method=delIndexByName||indexRealName={}||msg=exception!",
|
||||
indexRealName, e);
|
||||
} finally {
|
||||
if (esClient != null) {
|
||||
this.returnESClientToPool(esClient);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
/**
|
||||
|
||||
@@ -10,10 +10,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@@ -42,6 +44,12 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
*/
|
||||
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps.newConcurrentMap();
|
||||
|
||||
/**
|
||||
* es 地址
|
||||
*/
|
||||
@Value("${es.index.expire:60}")
|
||||
private int indexExpireDays;
|
||||
|
||||
/**
|
||||
* 检查 es 索引是否存在,不存在则创建索引
|
||||
*/
|
||||
@@ -62,6 +70,25 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(cron = "0 30/45 * * * ?")
|
||||
public void delExpireIndex(){
|
||||
List<String> indexList = esOpClient.listIndexByName(indexName);
|
||||
if(CollectionUtils.isEmpty(indexList)){return;}
|
||||
|
||||
indexList.sort((o1, o2) -> -o1.compareTo(o2));
|
||||
|
||||
int size = indexList.size();
|
||||
if(size > indexExpireDays){
|
||||
if(!EnvUtil.isOnline()){
|
||||
LOGGER.info("method=delExpireIndex||indexExpireDays={}||delIndex={}",
|
||||
indexExpireDays, indexList.subList(indexExpireDays, size));
|
||||
}
|
||||
|
||||
indexList.subList(indexExpireDays, size).stream().forEach(
|
||||
s -> esOpClient.delIndexByName(s));
|
||||
}
|
||||
}
|
||||
|
||||
public static BaseMetricESDAO getByStatsType(String statsType) {
|
||||
return ariusStatsEsDaoMap.get(statsType);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user