From 5127b600ecb5a96481c6f3a7b5346a80999e0f95 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 28 Nov 2022 14:32:40 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=E4=BC=98=E5=8C=96ESClient=E7=9A=84?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E8=AE=BF=E9=97=AE=E6=8E=A7=E5=88=B6(#787)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/persistence/es/ESOpClient.java | 35 ++++--------------- km-rest/src/main/resources/application.yml | 2 +- 2 files changed, 7 insertions(+), 30 deletions(-) diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index 1efa01d6..5e91f7b2 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -37,7 +37,6 @@ import javax.annotation.PostConstruct; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -61,7 +60,7 @@ public class ESOpClient { /** * 客户端个数 */ - @Value("${es.client.client-cnt:10}") + @Value("${es.client.client-cnt:2}") private Integer clientCnt; /** @@ -79,13 +78,13 @@ public class ESOpClient { /** * 更新es数据的客户端连接队列 */ - private LinkedBlockingQueue esClientPool; + private List esClientPool; - private static final Integer ES_OPERATE_TIMEOUT = 30; + private static final Integer ES_OPERATE_TIMEOUT = 30; @PostConstruct public void init(){ - esClientPool = new LinkedBlockingQueue<>( clientCnt ); + esClientPool = new ArrayList<>(clientCnt); for (int i = 0; i < clientCnt; ++i) { ESClient esClient = this.buildEsClient(esAddress, esPass, "", ""); @@ -102,37 +101,15 @@ public class ESOpClient { * @return */ public ESClient getESClientFromPool() { - ESClient esClient = null; - int retryCount = 0; - - // 如果esClient为空或者重试次数小于5次,循环获取 - while (esClient == null && retryCount < 5) { - try { - ++retryCount; - esClient = esClientPool.poll(3, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - if (esClient == null) { - LOGGER.error( "class=ESOpClient||method=getESClientFromPool||errMsg=fail to get es client from pool"); - } - - return esClient; + return esClientPool.get((int)(System.currentTimeMillis() % clientCnt)); } /** * 归还到es http 客户端连接池 - * * @param esClient */ public void returnESClientToPool(ESClient esClient) { - try { - this.esClientPool.put(esClient); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + // 已不需要进行归还,后续再删除该代码 } /** diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index c9cd3c5e..3b01022e 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -85,7 +85,7 @@ es: client: address: 127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061 pass: # ES账号密码,如果有账号密码,按照 username:password 的格式填写,没有则不需要填写 - client-cnt: 10 # 创建的ES客户端数 + client-cnt: 2 # 创建的ES客户端数 io-thread-cnt: 2 max-retry-cnt: 5 index: