mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Optimize]优化ESClient的并发访问控制(#787)
This commit is contained in:
@@ -37,7 +37,6 @@ import javax.annotation.PostConstruct;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@@ -61,7 +60,7 @@ public class ESOpClient {
|
|||||||
/**
|
/**
|
||||||
* 客户端个数
|
* 客户端个数
|
||||||
*/
|
*/
|
||||||
@Value("${es.client.client-cnt:10}")
|
@Value("${es.client.client-cnt:2}")
|
||||||
private Integer clientCnt;
|
private Integer clientCnt;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -79,13 +78,13 @@ public class ESOpClient {
|
|||||||
/**
|
/**
|
||||||
* 更新es数据的客户端连接队列
|
* 更新es数据的客户端连接队列
|
||||||
*/
|
*/
|
||||||
private LinkedBlockingQueue<ESClient> esClientPool;
|
private List<ESClient> esClientPool;
|
||||||
|
|
||||||
private static final Integer ES_OPERATE_TIMEOUT = 30;
|
private static final Integer ES_OPERATE_TIMEOUT = 30;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init(){
|
public void init(){
|
||||||
esClientPool = new LinkedBlockingQueue<>( clientCnt );
|
esClientPool = new ArrayList<>(clientCnt);
|
||||||
|
|
||||||
for (int i = 0; i < clientCnt; ++i) {
|
for (int i = 0; i < clientCnt; ++i) {
|
||||||
ESClient esClient = this.buildEsClient(esAddress, esPass, "", "");
|
ESClient esClient = this.buildEsClient(esAddress, esPass, "", "");
|
||||||
@@ -102,37 +101,15 @@ public class ESOpClient {
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public ESClient getESClientFromPool() {
|
public ESClient getESClientFromPool() {
|
||||||
ESClient esClient = null;
|
return esClientPool.get((int)(System.currentTimeMillis() % clientCnt));
|
||||||
int retryCount = 0;
|
|
||||||
|
|
||||||
// 如果esClient为空或者重试次数小于5次,循环获取
|
|
||||||
while (esClient == null && retryCount < 5) {
|
|
||||||
try {
|
|
||||||
++retryCount;
|
|
||||||
esClient = esClientPool.poll(3, TimeUnit.SECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (esClient == null) {
|
|
||||||
LOGGER.error( "class=ESOpClient||method=getESClientFromPool||errMsg=fail to get es client from pool");
|
|
||||||
}
|
|
||||||
|
|
||||||
return esClient;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 归还到es http 客户端连接池
|
* 归还到es http 客户端连接池
|
||||||
*
|
|
||||||
* @param esClient
|
* @param esClient
|
||||||
*/
|
*/
|
||||||
public void returnESClientToPool(ESClient esClient) {
|
public void returnESClientToPool(ESClient esClient) {
|
||||||
try {
|
// 已不需要进行归还,后续再删除该代码
|
||||||
this.esClientPool.put(esClient);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ es:
|
|||||||
client:
|
client:
|
||||||
address: 127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061
|
address: 127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061
|
||||||
pass: # ES账号密码,如果有账号密码,按照 username:password 的格式填写,没有则不需要填写
|
pass: # ES账号密码,如果有账号密码,按照 username:password 的格式填写,没有则不需要填写
|
||||||
client-cnt: 10 # 创建的ES客户端数
|
client-cnt: 2 # 创建的ES客户端数
|
||||||
io-thread-cnt: 2
|
io-thread-cnt: 2
|
||||||
max-retry-cnt: 5
|
max-retry-cnt: 5
|
||||||
index:
|
index:
|
||||||
|
|||||||
Reference in New Issue
Block a user