ES客户端数可配置化

This commit is contained in:
zengqiao
2022-08-29 20:32:01 +08:00
parent 220f1c6fc3
commit 9b732fbbad
2 changed files with 35 additions and 19 deletions

View File

@@ -16,7 +16,6 @@ import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
import com.xiaojukeji.know.streaming.km.common.constant.ESConstant;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@@ -37,7 +36,6 @@ import java.util.function.Function;
@Component @Component
public class ESOpClient { public class ESOpClient {
private static final ILog LOGGER = LogFactory.getLog("ES_LOGGER"); private static final ILog LOGGER = LogFactory.getLog("ES_LOGGER");
/** /**
@@ -45,6 +43,7 @@ public class ESOpClient {
*/ */
@Value("${es.client.address}") @Value("${es.client.address}")
private String esAddress; private String esAddress;
/** /**
* es 访问密码 * es 访问密码
*/ */
@@ -54,22 +53,32 @@ public class ESOpClient {
/** /**
* 客户端个数 * 客户端个数
*/ */
private static final int ES_CLIENT_COUNT = 30; @Value("${es.client.client-cnt:10}")
private Integer clientCnt;
private static final int MAX_RETRY_CNT = 5; /**
* 最大重试次数
private static final int ES_IO_THREAD_COUNT = 4; */
@Value("${es.client.max-retry-cnt:5}")
private Integer maxRetryCnt;
/**
* IO线程数
*/
@Value("${es.client.io-thread-cnt:2}")
private Integer ioThreadCnt;
/** /**
* 更新es数据的客户端连接队列 * 更新es数据的客户端连接队列
*/ */
private LinkedBlockingQueue<ESClient> esClientPool = new LinkedBlockingQueue<>( ES_CLIENT_COUNT ); private LinkedBlockingQueue<ESClient> esClientPool;
@PostConstruct @PostConstruct
public void init(){ public void init(){
for (int i = 0; i < ES_CLIENT_COUNT; ++i) { esClientPool = new LinkedBlockingQueue<>( clientCnt );
ESClient esClient = buildEsClient(esAddress, esPass, "", "");
for (int i = 0; i < clientCnt; ++i) {
ESClient esClient = this.buildEsClient(esAddress, esPass, "", "");
if (esClient != null) { if (esClient != null) {
this.esClientPool.add(esClient); this.esClientPool.add(esClient);
LOGGER.info("class=ESOpClient||method=init||msg=add new es client {}", esAddress); LOGGER.info("class=ESOpClient||method=init||msg=add new es client {}", esAddress);
@@ -245,7 +254,7 @@ public class ESOpClient {
esIndexRequest.source(source); esIndexRequest.source(source);
esIndexRequest.id(id); esIndexRequest.id(id);
for (int i = 0; i < MAX_RETRY_CNT; ++i) { for (int i = 0; i < this.maxRetryCnt; ++i) {
response = esClient.index(esIndexRequest).actionGet(10, TimeUnit.SECONDS); response = esClient.index(esIndexRequest).actionGet(10, TimeUnit.SECONDS);
if (response == null) { if (response == null) {
continue; continue;
@@ -307,7 +316,7 @@ public class ESOpClient {
batchRequest.addNode(BatchType.INDEX, indexName, null, po.getKey(), JSON.toJSONString(po)); batchRequest.addNode(BatchType.INDEX, indexName, null, po.getKey(), JSON.toJSONString(po));
} }
for (int i = 0; i < MAX_RETRY_CNT; ++i) { for (int i = 0; i < this.maxRetryCnt; ++i) {
response = esClient.batch(batchRequest).actionGet(2, TimeUnit.MINUTES); response = esClient.batch(batchRequest).actionGet(2, TimeUnit.MINUTES);
if (response == null) {continue;} if (response == null) {continue;}
@@ -428,8 +437,8 @@ public class ESOpClient {
if(StringUtils.isNotBlank(password)){ if(StringUtils.isNotBlank(password)){
esClient.setPassword(password); esClient.setPassword(password);
} }
if(ES_IO_THREAD_COUNT > 0) { if(this.ioThreadCnt > 0) {
esClient.setIoThreadCount( ES_IO_THREAD_COUNT ); esClient.setIoThreadCount( this.ioThreadCnt );
} }
// 配置http超时 // 配置http超时
@@ -439,11 +448,13 @@ public class ESOpClient {
return esClient; return esClient;
} catch (Exception e) { } catch (Exception e) {
esClient.close(); try {
esClient.close();
LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, } catch (Exception innerE) {
e); // ignore
}
LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, e);
return null; return null;
} }
} }

View File

@@ -73,8 +73,13 @@ client-pool:
borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒 borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒
# es客户端服务地址 # ES客户端配置
es.client.address: 127.0.0.1:8060 es:
client:
address: 127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061
client-cnt: 10
io-thread-cnt: 2
max-retry-cnt: 5
# 普罗米修斯指标导出相关配置 # 普罗米修斯指标导出相关配置
management: management: