[Optimize]优化日志输出 & 本地缓存统一管理(#800)

This commit is contained in:
zengqiao
2022-12-05 14:04:19 +08:00
parent 7066246e8f
commit 0f8be4fadc
11 changed files with 221 additions and 231 deletions

View File

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.didiglobal.logi.elasticsearch.client.ESClient;
import com.didiglobal.logi.elasticsearch.client.gateway.document.ESIndexRequest;
import com.didiglobal.logi.elasticsearch.client.gateway.document.ESIndexResponse;
import com.didiglobal.logi.elasticsearch.client.model.exception.ESIndexNotFoundException;
import com.didiglobal.logi.elasticsearch.client.model.type.ESVersion;
import com.didiglobal.logi.elasticsearch.client.request.batch.BatchNode;
import com.didiglobal.logi.elasticsearch.client.request.batch.BatchType;
@@ -20,10 +21,11 @@ import com.didiglobal.logi.elasticsearch.client.response.indices.puttemplate.ESI
import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse;
import com.didiglobal.logi.elasticsearch.client.response.setting.template.TemplateConfig;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.google.common.collect.Lists;
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.LoggerUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
@@ -36,6 +38,7 @@ import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -43,7 +46,7 @@ import java.util.stream.Collectors;
@Component
public class ESOpClient {
private static final ILog LOGGER = LogFactory.getLog("ES_LOGGER");
private static final ILog LOGGER = LoggerUtil.getESLogger();
/**
* es 地址
@@ -90,40 +93,27 @@ public class ESOpClient {
ESClient esClient = this.buildEsClient(esAddress, esPass, "", "");
if (esClient != null) {
this.esClientPool.add(esClient);
LOGGER.info("class=ESOpClient||method=init||msg=add new es client {}", esAddress);
LOGGER.info("method=init||esAddress={}||msg=add new es client", esAddress);
}
}
}
/**
* 从更新es http 客户端连接池找那个获取
*
* @return
* 获取ES客户端
*/
public ESClient getESClientFromPool() {
if (ValidateUtils.isEmptyList(esClientPool)) {
return null;
}
return esClientPool.get((int)(System.currentTimeMillis() % clientCnt));
}
/**
* 归还到es http 客户端连接池
* @param esClient
*/
public void returnESClientToPool(ESClient esClient) {
// 已不需要进行归还,后续再删除该代码
}
/**
* 查询并获取第一个元素
*
* @param indexName
* @param queryDsl
* @param clzz
* @param <T>
* @return
*/
public <T> T performRequestAndTakeFirst(String indexName, String queryDsl, Class<T> clzz) {
List<T> hits = performRequest(indexName, queryDsl, clzz);
public <T> T performRequestAndTakeFirst(String indexName, String queryDsl, Class<T> clazz) {
List<T> hits = this.performRequest(indexName, queryDsl, clazz);
if (CollectionUtils.isEmpty(hits)) {
return null;
}
@@ -133,31 +123,20 @@ public class ESOpClient {
/**
* 查询并获取第一个元素
*
* @param indexName
* @param queryDsl
* @param clazz
* @param <T>
* @return
*/
public <T> T performRequestAndTakeFirst(String routingValue, String indexName,
String queryDsl, Class<T> clazz) {
List<T> hits = performRequestWithRouting(routingValue, indexName, queryDsl, clazz);
if (CollectionUtils.isEmpty(hits)) {return null;}
public <T> T performRequestAndTakeFirst(String routingValue, String indexName, String queryDsl, Class<T> clazz) {
List<T> hits = this.performRequestWithRouting(routingValue, indexName, queryDsl, clazz);
if (CollectionUtils.isEmpty(hits)) {
return null;
}
return hits.get(0);
}
/**
* 执行查询
*
* @param indexName
* @param queryDsl
* @return
* @throws IOException
*/
public ESQueryResponse performRequest(String indexName,String queryDsl) {
public ESQueryResponse performRequest(String indexName, String queryDsl) {
return doQuery(new ESQueryRequest().indices(indexName).source(queryDsl));
}
@@ -170,7 +149,7 @@ public class ESOpClient {
return func.apply(esQueryResponse);
}
public <T> List<T> performRequest(String indexName, String queryDsl, Class<T> clzz) {
public <T> List<T> performRequest(String indexName, String queryDsl, Class<T> clzz) {
ESQueryResponse esQueryResponse = doQuery(
new ESQueryRequest().indices(indexName).source(queryDsl).clazz(clzz));
if (esQueryResponse == null) {
@@ -210,8 +189,7 @@ public class ESOpClient {
return hits;
}
public <R> R performRequestWithRouting(String routingValue, String indexName,
String queryDsl, Function<ESQueryResponse, R> func, int tryTimes) {
public <R> R performRequestWithRouting(String routingValue, String indexName, String queryDsl, Function<ESQueryResponse, R> func, int tryTimes) {
ESQueryResponse esQueryResponse;
do {
esQueryResponse = doQuery(new ESQueryRequest().routing(routingValue).indices(indexName).source(queryDsl));
@@ -222,16 +200,12 @@ public class ESOpClient {
/**
* 写入单条
*
* @param source
* @return
*/
public boolean index(String indexName, String id, String source) {
ESClient esClient = null;
ESIndexResponse response = null;
try {
esClient = getESClientFromPool();
ESClient esClient = this.getESClientFromPool();
if (esClient == null) {
return false;
}
@@ -250,20 +224,11 @@ public class ESOpClient {
return response.getRestStatus().getStatus() == HttpStatus.SC_OK
|| response.getRestStatus().getStatus() == HttpStatus.SC_CREATED;
}
} catch (Exception e) {
LOGGER.warn(
"class=ESOpClient||method=index||indexName={}||id={}||source={}||errMsg=index doc error. ",
indexName, id, source, e);
if (response != null) {
LOGGER.warn(
"class=ESOpClient||method=index||indexName={}||id={}||source={}||errMsg=response {}",
indexName, id, source, JSON.toJSONString(response));
}
} finally {
if (esClient != null) {
returnESClientToPool(esClient);
}
LOGGER.error(
"method=index||indexName={}||id={}||source={}||response={}||errMsg=index failed",
indexName, id, source, ConvertUtil.obj2Json(response), e
);
}
return false;
@@ -271,19 +236,15 @@ public class ESOpClient {
/**
* 批量写入
*
* @param indexName
* @return
*/
public boolean batchInsert(String indexName, List<? extends BaseESPO> pos) {
if (CollectionUtils.isEmpty(pos)) {
return true;
}
ESClient esClient = null;
ESBatchResponse response = null;
try {
esClient = getESClientFromPool();
ESClient esClient = this.getESClientFromPool();
if (esClient == null) {
return false;
}
@@ -312,16 +273,10 @@ public class ESOpClient {
return response.getRestStatus().getStatus() == HttpStatus.SC_OK && !response.getErrors();
}
} catch (Exception e) {
LOGGER.warn(
"method=batchInsert||indexName={}||errMsg=batch insert error. ", indexName, e);
if (response != null) {
LOGGER.warn("method=batchInsert||indexName={}||errMsg=response {}", indexName, JSON.toJSONString(response));
}
} finally {
if (esClient != null) {
returnESClientToPool(esClient);
}
LOGGER.error(
"method=batchInsert||indexName={}||response={}||errMsg=batch index failed",
indexName, ConvertUtil.obj2Json(response), e
);
}
return false;
@@ -331,9 +286,8 @@ public class ESOpClient {
* 根据表达式判断索引是否已存在
*/
public boolean indexExist(String indexName) {
ESClient esClient = null;
try {
esClient = this.getESClientFromPool();
ESClient esClient = this.getESClientFromPool();
if (esClient == null) {
return false;
}
@@ -341,11 +295,7 @@ public class ESOpClient {
// 检查索引是否存在
return esClient.admin().indices().prepareExists(indexName).execute().actionGet(30, TimeUnit.SECONDS).isExists();
} catch (Exception e){
LOGGER.warn("class=ESOpClient||method=indexExist||indexName={}||msg=exception!", indexName, e);
} finally {
if (esClient != null) {
returnESClientToPool(esClient);
}
LOGGER.error("method=indexExist||indexName={}||msg=exception!", indexName, e);
}
return false;
@@ -355,48 +305,45 @@ public class ESOpClient {
* 创建索引
*/
public boolean createIndex(String indexName) {
if (indexExist(indexName)) {
if (this.indexExist(indexName)) {
return true;
}
ESClient client = getESClientFromPool();
if (client != null) {
try {
ESIndicesPutIndexResponse response = client.admin().indices().preparePutIndex(indexName).execute()
.actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS);
return response.getAcknowledged();
} catch (Exception e){
LOGGER.warn( "msg=create index fail||indexName={}", indexName, e);
} finally {
returnESClientToPool(client);
}
ESClient client = this.getESClientFromPool();
try {
ESIndicesPutIndexResponse response = client
.admin()
.indices()
.preparePutIndex(indexName)
.execute()
.actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS);
return response.getAcknowledged();
} catch (Exception e){
LOGGER.error( "method=createIndex||indexName={}||errMsg=exception!", indexName, e);
}
return false;
}
public boolean templateExist(String indexTemplateName){
ESClient esClient = null;
try {
esClient = this.getESClientFromPool();
ESClient esClient = this.getESClientFromPool();
// 获取es中原来index template的配置
ESIndicesGetTemplateResponse getTemplateResponse =
esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS );
ESIndicesGetTemplateResponse getTemplateResponse = esClient
.admin()
.indices()
.prepareGetTemplate(indexTemplateName)
.execute()
.actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS );
TemplateConfig templateConfig = getTemplateResponse.getMultiTemplatesConfig().getSingleConfig();
if (null != templateConfig) {
return true;
}
} catch (Exception e) {
LOGGER.warn( "method=templateExist||indexTemplateName={}||msg=exception!",
indexTemplateName, e);
} finally {
if (esClient != null) {
this.returnESClientToPool(esClient);
}
LOGGER.error( "method=templateExist||indexTemplateName={}||msg=exception!", indexTemplateName, e);
}
return false;
@@ -406,27 +353,29 @@ public class ESOpClient {
* 创建索引模板
*/
public boolean createIndexTemplateIfNotExist(String indexTemplateName, String config) {
ESClient esClient = null;
try {
esClient = this.getESClientFromPool();
ESClient esClient = this.getESClientFromPool();
//存在模板就返回,不存在就创建
if(templateExist(indexTemplateName)){return true;}
// 存在模板就返回,不存在就创建
if(this.templateExist(indexTemplateName)) {
return true;
}
// 创建新的模板
ESIndicesPutTemplateResponse response = esClient.admin().indices().preparePutTemplate( indexTemplateName )
.setTemplateConfig( config ).execute().actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS );
ESIndicesPutTemplateResponse response = esClient
.admin()
.indices()
.preparePutTemplate( indexTemplateName )
.setTemplateConfig(config)
.execute()
.actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS);
return response.getAcknowledged();
} catch (Exception e) {
LOGGER.warn( "method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!",
LOGGER.error(
"method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!",
indexTemplateName, config, e
);
} finally {
if (esClient != null) {
this.returnESClientToPool(esClient);
}
}
return false;
@@ -434,54 +383,47 @@ public class ESOpClient {
/**
* 根据索引模板获取所有的索引
* @param indexName
* @return
*/
public List<String> listIndexByName(String indexName){
ESClient esClient = null;
public List<String> listIndexByName(String indexName) {
try {
esClient = this.getESClientFromPool();
ESClient esClient = this.getESClientFromPool();
ESIndicesCatIndicesResponse response = esClient.admin().indices().prepareCatIndices(indexName + "*").execute()
ESIndicesCatIndicesResponse response = esClient
.admin()
.indices()
.prepareCatIndices(indexName + "*")
.execute()
.actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS);
if(null != response){
if(null != response) {
return response.getCatIndexResults().stream().map(CatIndexResult::getIndex).collect(Collectors.toList());
}
} catch (Exception e) {
LOGGER.warn( "method=listIndexByTemplate||indexName={}||msg=exception!",
indexName, e);
} finally {
if (esClient != null) {
this.returnESClientToPool(esClient);
}
LOGGER.error( "method=listIndexByName||indexName={}||msg=exception!", indexName, e);
}
return new ArrayList<>();
return Collections.emptyList();
}
/**
* 删除索引
* @param indexRealName
* @return
*/
public boolean delIndexByName(String indexRealName){
ESClient esClient = null;
try {
esClient = this.getESClientFromPool();
ESClient esClient = this.getESClientFromPool();
ESIndicesDeleteIndexResponse response = esClient.admin().indices().prepareDeleteIndex(indexRealName).execute()
ESIndicesDeleteIndexResponse response = esClient
.admin()
.indices()
.prepareDeleteIndex(indexRealName)
.execute()
.actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS);
return response.getAcknowledged();
} catch (ESIndexNotFoundException nfe) {
// 索引不存在时debug环境时再进行打印
LOGGER.debug( "method=delIndexByName||indexRealName={}||errMsg=exception!", indexRealName, nfe);
} catch (Exception e) {
LOGGER.warn( "method=delIndexByName||indexRealName={}||msg=exception!",
indexRealName, e);
} finally {
if (esClient != null) {
this.returnESClientToPool(esClient);
}
LOGGER.error( "method=delIndexByName||indexRealName={}||errMsg=exception!", indexRealName, e);
}
return false;
@@ -491,61 +433,55 @@ public class ESOpClient {
/**
* 执行查询
* @param request
* @return
*/
@Nullable
private ESQueryResponse doQuery(ESQueryRequest request) {
ESClient esClient = null;
try {
esClient = getESClientFromPool();
ESClient esClient = this.getESClientFromPool();
ESQueryResponse response = esClient.query(request).actionGet(120, TimeUnit.SECONDS);
if(!EnvUtil.isOnline()){
LOGGER.info("method=doQuery||indexName={}||queryDsl={}||ret={}",
request.indices(), bytesReferenceConvertDsl(request.source()), JSON.toJSONString(response));
}
LOGGER.debug(
"method=doQuery||indexName={}||queryDsl={}||ret={}",
request.indices(), bytesReferenceConvertDsl(request.source()), JSON.toJSONString(response)
);
return response;
} catch (Exception e) {
LOGGER.error( "method=doQuery||indexName={}||queryDsl={}||errMsg=query error. ",
request.indices(), bytesReferenceConvertDsl(request.source()), e);
return null;
}finally {
if (esClient != null) {
returnESClientToPool(esClient);
}
}
}
private boolean handleErrorResponse(String indexName, List<? extends BaseESPO> pos, ESBatchResponse response) {
if (response.getErrors().booleanValue()) {
int errorItemIndex = 0;
if (CollectionUtils.isNotEmpty(response.getItems())) {
for (IndexResultItemNode item : response.getItems()) {
recordErrorResponseItem(indexName, pos, errorItemIndex++, item);
}
}
return true;
if (response.getErrors()) {
return false;
}
return false;
int errorItemIndex = 0;
if (CollectionUtils.isNotEmpty(response.getItems())) {
for (IndexResultItemNode item : response.getItems()) {
recordErrorResponseItem(indexName, pos, errorItemIndex++, item);
}
}
return true;
}
private void recordErrorResponseItem(String indexName, List<? extends BaseESPO> pos, int errorItemIndex, IndexResultItemNode item) {
if (item.getIndex() != null && item.getIndex().getShards() != null
if (item.getIndex() != null
&& item.getIndex().getShards() != null
&& CollectionUtils.isNotEmpty(item.getIndex().getShards().getFailures())) {
LOGGER.warn(
"class=ESOpClient||method=batchInsert||indexName={}||errMsg=Failures: {}, content: {}",
"method=batchInsert||indexName={}||errMsg=Failures: {}, content: {}",
indexName, item.getIndex().getShards().getFailures().toString(),
JSON.toJSONString(pos.get(errorItemIndex)));
}
if (item.getIndex() != null && item.getIndex().getError() != null) {
LOGGER.warn(
"class=ESOpClient||method=batchInsert||indexName={}||errMsg=Error: {}, content: {}",
"method=batchInsert||indexName={}||errMsg=Error: {}, content: {}",
indexName, item.getIndex().getError().getReason(),
JSON.toJSONString(pos.get(errorItemIndex)));
}
@@ -553,21 +489,18 @@ public class ESOpClient {
/**
* 转换dsl语句
*
* @param bytes
* @return
*/
private String bytesReferenceConvertDsl(BytesReference bytes) {
try {
return XContentHelper.convertToJson(bytes, false);
} catch (IOException e) {
LOGGER.warn("class=ESOpClient||method=bytesReferenceConvertDsl||errMsg=fail to covert", e);
LOGGER.warn("method=bytesReferenceConvertDsl||errMsg=fail to covert", e);
}
return "";
}
private ESClient buildEsClient(String address,String password,String clusterName, String version) {
private ESClient buildEsClient(String address, String password,String clusterName, String version) {
if (StringUtils.isBlank(address)) {
return null;
}
@@ -602,7 +535,7 @@ public class ESOpClient {
// ignore
}
LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, e);
LOGGER.error("method=buildEsClient||address={}||errMsg=exception", address, e);
return null;
}
}

View File

@@ -207,17 +207,27 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
/**
* 获取每个 metric 的 topN 个 topic 的指标,如果获取不到 topN 的topics, 则默认返回 defaultTopics 的指标
*/
public Table<String/*metric*/, String/*topics*/, List<MetricPointVO>> listTopicMetricsByTopN(Long clusterPhyId, List<String> defaultTopics,
List<String> metrics, String aggType, int topN,
Long startTime, Long endTime){
public Table<String/*metric*/, String/*topics*/, List<MetricPointVO>> listTopicMetricsByTopN(Long clusterPhyId,
List<String> defaultTopics,
List<String> metrics,
String aggType,
int topN,
Long startTime,
Long endTime){
//1、获取topN要查询的topic每一个指标的topN的topic可能不一样
Map<String, List<String>> metricTopics = getTopNTopics(clusterPhyId, metrics, aggType, topN, startTime, endTime);
Map<String, List<String>> metricTopics = this.getTopNTopics(clusterPhyId, metrics, aggType, topN, startTime, endTime);
Table<String, String, List<MetricPointVO>> table = HashBasedTable.create();
for(String metric : metrics){
table.putAll(listTopicMetricsByTopics(clusterPhyId, Arrays.asList(metric),
aggType, metricTopics.getOrDefault(metric, defaultTopics), startTime, endTime));
for(String metric : metrics) {
table.putAll(this.listTopicMetricsByTopics(
clusterPhyId,
Arrays.asList(metric),
aggType,
metricTopics.getOrDefault(metric, defaultTopics),
startTime,
endTime)
);
}
return table;
@@ -226,9 +236,12 @@ public class TopicMetricESDAO extends BaseMetricESDAO {
/**
* 获取每个 metric 指定个 topic 的指标
*/
public Table<String/*metric*/, String/*topics*/, List<MetricPointVO>> listTopicMetricsByTopics(Long clusterPhyId, List<String> metrics,
String aggType, List<String> topics,
Long startTime, Long endTime){
public Table<String/*metric*/, String/*topics*/, List<MetricPointVO>> listTopicMetricsByTopics(Long clusterPhyId,
List<String> metrics,
String aggType,
List<String> topics,
Long startTime,
Long endTime){
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);