From 9b732fbbad71d5384477574f0200d460a07948c2 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 29 Aug 2022 20:32:01 +0800 Subject: [PATCH] =?UTF-8?q?ES=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=95=B0=E5=8F=AF?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/persistence/es/ESOpClient.java | 45 ++++++++++++------- km-rest/src/main/resources/application.yml | 9 +++- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index d0ca75e9..c611c538 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -16,7 +16,6 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.google.common.collect.Lists; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; -import com.xiaojukeji.know.streaming.km.common.constant.ESConstant; import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -37,7 +36,6 @@ import java.util.function.Function; @Component public class ESOpClient { - private static final ILog LOGGER = LogFactory.getLog("ES_LOGGER"); /** @@ -45,6 +43,7 @@ public class ESOpClient { */ @Value("${es.client.address}") private String esAddress; + /** * es 访问密码 */ @@ -54,22 +53,32 @@ public class ESOpClient { /** * 客户端个数 */ - private static final int ES_CLIENT_COUNT = 30; + @Value("${es.client.client-cnt:10}") + private Integer clientCnt; - private static final int MAX_RETRY_CNT = 5; - - private static final int ES_IO_THREAD_COUNT = 4; + /** + * 最大重试次数 + */ + @Value("${es.client.max-retry-cnt:5}") + private Integer maxRetryCnt; + /** + * IO线程数 + */ + @Value("${es.client.io-thread-cnt:2}") + private Integer ioThreadCnt; /** * 更新es数据的客户端连接队列 */ - private LinkedBlockingQueue esClientPool = new LinkedBlockingQueue<>( ES_CLIENT_COUNT ); + private LinkedBlockingQueue esClientPool; @PostConstruct public void init(){ - for (int i = 0; i < ES_CLIENT_COUNT; ++i) { - ESClient esClient = buildEsClient(esAddress, esPass, "", ""); + esClientPool = new LinkedBlockingQueue<>( clientCnt ); + + for (int i = 0; i < clientCnt; ++i) { + ESClient esClient = this.buildEsClient(esAddress, esPass, "", ""); if (esClient != null) { this.esClientPool.add(esClient); LOGGER.info("class=ESOpClient||method=init||msg=add new es client {}", esAddress); @@ -245,7 +254,7 @@ public class ESOpClient { esIndexRequest.source(source); esIndexRequest.id(id); - for (int i = 0; i < MAX_RETRY_CNT; ++i) { + for (int i = 0; i < this.maxRetryCnt; ++i) { response = esClient.index(esIndexRequest).actionGet(10, TimeUnit.SECONDS); if (response == null) { continue; @@ -307,7 +316,7 @@ public class ESOpClient { batchRequest.addNode(BatchType.INDEX, indexName, null, po.getKey(), JSON.toJSONString(po)); } - for (int i = 0; i < MAX_RETRY_CNT; ++i) { + for (int i = 0; i < this.maxRetryCnt; ++i) { response = esClient.batch(batchRequest).actionGet(2, TimeUnit.MINUTES); if (response == null) {continue;} @@ -428,8 +437,8 @@ public class ESOpClient { if(StringUtils.isNotBlank(password)){ esClient.setPassword(password); } - if(ES_IO_THREAD_COUNT > 0) { - esClient.setIoThreadCount( ES_IO_THREAD_COUNT ); + if(this.ioThreadCnt > 0) { + esClient.setIoThreadCount( this.ioThreadCnt ); } // 配置http超时 @@ -439,11 +448,13 @@ public class ESOpClient { return esClient; } catch (Exception e) { - esClient.close(); - - LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, - e); + try { + esClient.close(); + } catch (Exception innerE) { + // ignore + } + LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, e); return null; } } diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index a6417157..4b0831c7 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -73,8 +73,13 @@ client-pool: borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒 -# es客户端服务地址 -es.client.address: 127.0.0.1:8060 +# ES客户端配置 +es: + client: + address: 127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061 + client-cnt: 10 + io-thread-cnt: 2 + max-retry-cnt: 5 # 普罗米修斯指标导出相关配置 management: