mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-03 19:38:20 +08:00
Merge pull request #521 from didi/dev_v3.0.0
1、bump version;2、ES客户端数可配置化;3、采样优化;4、文档修改;
This commit is contained in:
Binary file not shown.
|
Before Width: | Height: | Size: 9.5 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 183 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 50 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 59 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 600 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 228 KiB |
@@ -36,7 +36,7 @@ KS-KM 根据其需要纳管的 kafka 版本,按照上述三个维度构建了
|
|||||||
|
|
||||||
  KS-KM 的每个版本针对需要纳管的 kafka 版本列表,事先分析各个版本的差异性和产品需求,同时 KS-KM 构建了一套专门处理兼容性的服务,来进行兼容性的注册、字典构建、处理器分发等操作,其中版本兼容性处理器是来具体处理不同 kafka 版本差异性的地方。
|
  KS-KM 的每个版本针对需要纳管的 kafka 版本列表,事先分析各个版本的差异性和产品需求,同时 KS-KM 构建了一套专门处理兼容性的服务,来进行兼容性的注册、字典构建、处理器分发等操作,其中版本兼容性处理器是来具体处理不同 kafka 版本差异性的地方。
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
  如上图所示,KS-KM 的 topic 服务在面对不同 kafka 版本时,其 topic 的创建、删除、扩容由于 kafka 版本自身的差异,导致 KnowStreaming 的处理也不一样,所以需要根据不同的 kafka 版本来实现不同的兼容性处理器,同时向 KnowStreaming 的兼容服务进行兼容性的注册,构建兼容性字典,后续在 KnowStreaming 的运行过程中,针对不同的 kafka 版本即可分发到不同的处理器中执行。
|
  如上图所示,KS-KM 的 topic 服务在面对不同 kafka 版本时,其 topic 的创建、删除、扩容由于 kafka 版本自身的差异,导致 KnowStreaming 的处理也不一样,所以需要根据不同的 kafka 版本来实现不同的兼容性处理器,同时向 KnowStreaming 的兼容服务进行兼容性的注册,构建兼容性字典,后续在 KnowStreaming 的运行过程中,针对不同的 kafka 版本即可分发到不同的处理器中执行。
|
||||||
|
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/KnowStreaming.java
|
|||||||
IDEA 更多具体的配置如下图所示:
|
IDEA 更多具体的配置如下图所示:
|
||||||
|
|
||||||
<p align="center">
|
<p align="center">
|
||||||
<img src="./assets/startup_using_source_code/IDEA配置.jpg" width = "512" height = "318" div align=center />
|
<img src="http://img-ys011.didistatic.com/static/dc2img/do1_BW1RzgEMh4n6L4dL4ncl" width = "512" height = "318" div align=center />
|
||||||
</p>
|
</p>
|
||||||
|
|
||||||
**第四步:启动项目**
|
**第四步:启动项目**
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
|
|
||||||

|

|
||||||
|
|
||||||
|
|
||||||
## JMX-连接失败问题解决
|
## JMX-连接失败问题解决
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
|
|
||||||

|

|
||||||
|
|
||||||
|
|
||||||
# `Know Streaming` 源码编译打包手册
|
# `Know Streaming` 源码编译打包手册
|
||||||
|
|||||||
@@ -129,7 +129,12 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
|||||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
|
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取分区offset
|
// 获取分区beginOffset
|
||||||
|
Result<Map<TopicPartition, Long>> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.earliest(), null);
|
||||||
|
if (beginOffsetsMapResult.failed()) {
|
||||||
|
return Result.buildFromIgnoreData(beginOffsetsMapResult);
|
||||||
|
}
|
||||||
|
// 获取分区endOffset
|
||||||
Result<Map<TopicPartition, Long>> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.latest(), null);
|
Result<Map<TopicPartition, Long>> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.latest(), null);
|
||||||
if (endOffsetsMapResult.failed()) {
|
if (endOffsetsMapResult.failed()) {
|
||||||
return Result.buildFromIgnoreData(endOffsetsMapResult);
|
return Result.buildFromIgnoreData(endOffsetsMapResult);
|
||||||
@@ -142,13 +147,25 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
|||||||
// 创建kafka-consumer
|
// 创建kafka-consumer
|
||||||
kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()));
|
kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()));
|
||||||
|
|
||||||
kafkaConsumer.assign(endOffsetsMapResult.getData().keySet());
|
List<TopicPartition> partitionList = new ArrayList<>();
|
||||||
for (Map.Entry<TopicPartition, Long> entry: endOffsetsMapResult.getData().entrySet()) {
|
long maxMessage = 0;
|
||||||
kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords()));
|
for (Map.Entry<TopicPartition, Long> entry : endOffsetsMapResult.getData().entrySet()) {
|
||||||
|
long begin = beginOffsetsMapResult.getData().get(entry.getKey());
|
||||||
|
long end = entry.getValue();
|
||||||
|
if (begin == end){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
maxMessage += end - begin;
|
||||||
|
partitionList.add(entry.getKey());
|
||||||
|
}
|
||||||
|
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
|
||||||
|
kafkaConsumer.assign(partitionList);
|
||||||
|
for (TopicPartition partition : partitionList) {
|
||||||
|
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
|
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
|
||||||
while (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS <= dto.getPullTimeoutUnitMs() && voList.size() < dto.getMaxRecords()) {
|
while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) {
|
||||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
|
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
|
||||||
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
|
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
|
||||||
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
|
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ import com.didiglobal.logi.log.ILog;
|
|||||||
import com.didiglobal.logi.log.LogFactory;
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.constant.ESConstant;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
@@ -37,7 +36,6 @@ import java.util.function.Function;
|
|||||||
|
|
||||||
@Component
|
@Component
|
||||||
public class ESOpClient {
|
public class ESOpClient {
|
||||||
|
|
||||||
private static final ILog LOGGER = LogFactory.getLog("ES_LOGGER");
|
private static final ILog LOGGER = LogFactory.getLog("ES_LOGGER");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -45,6 +43,7 @@ public class ESOpClient {
|
|||||||
*/
|
*/
|
||||||
@Value("${es.client.address}")
|
@Value("${es.client.address}")
|
||||||
private String esAddress;
|
private String esAddress;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* es 访问密码
|
* es 访问密码
|
||||||
*/
|
*/
|
||||||
@@ -54,22 +53,32 @@ public class ESOpClient {
|
|||||||
/**
|
/**
|
||||||
* 客户端个数
|
* 客户端个数
|
||||||
*/
|
*/
|
||||||
private static final int ES_CLIENT_COUNT = 30;
|
@Value("${es.client.client-cnt:10}")
|
||||||
|
private Integer clientCnt;
|
||||||
|
|
||||||
private static final int MAX_RETRY_CNT = 5;
|
/**
|
||||||
|
* 最大重试次数
|
||||||
private static final int ES_IO_THREAD_COUNT = 4;
|
*/
|
||||||
|
@Value("${es.client.max-retry-cnt:5}")
|
||||||
|
private Integer maxRetryCnt;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* IO线程数
|
||||||
|
*/
|
||||||
|
@Value("${es.client.io-thread-cnt:2}")
|
||||||
|
private Integer ioThreadCnt;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 更新es数据的客户端连接队列
|
* 更新es数据的客户端连接队列
|
||||||
*/
|
*/
|
||||||
private LinkedBlockingQueue<ESClient> esClientPool = new LinkedBlockingQueue<>( ES_CLIENT_COUNT );
|
private LinkedBlockingQueue<ESClient> esClientPool;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init(){
|
public void init(){
|
||||||
for (int i = 0; i < ES_CLIENT_COUNT; ++i) {
|
esClientPool = new LinkedBlockingQueue<>( clientCnt );
|
||||||
ESClient esClient = buildEsClient(esAddress, esPass, "", "");
|
|
||||||
|
for (int i = 0; i < clientCnt; ++i) {
|
||||||
|
ESClient esClient = this.buildEsClient(esAddress, esPass, "", "");
|
||||||
if (esClient != null) {
|
if (esClient != null) {
|
||||||
this.esClientPool.add(esClient);
|
this.esClientPool.add(esClient);
|
||||||
LOGGER.info("class=ESOpClient||method=init||msg=add new es client {}", esAddress);
|
LOGGER.info("class=ESOpClient||method=init||msg=add new es client {}", esAddress);
|
||||||
@@ -245,7 +254,7 @@ public class ESOpClient {
|
|||||||
esIndexRequest.source(source);
|
esIndexRequest.source(source);
|
||||||
esIndexRequest.id(id);
|
esIndexRequest.id(id);
|
||||||
|
|
||||||
for (int i = 0; i < MAX_RETRY_CNT; ++i) {
|
for (int i = 0; i < this.maxRetryCnt; ++i) {
|
||||||
response = esClient.index(esIndexRequest).actionGet(10, TimeUnit.SECONDS);
|
response = esClient.index(esIndexRequest).actionGet(10, TimeUnit.SECONDS);
|
||||||
if (response == null) {
|
if (response == null) {
|
||||||
continue;
|
continue;
|
||||||
@@ -307,7 +316,7 @@ public class ESOpClient {
|
|||||||
batchRequest.addNode(BatchType.INDEX, indexName, null, po.getKey(), JSON.toJSONString(po));
|
batchRequest.addNode(BatchType.INDEX, indexName, null, po.getKey(), JSON.toJSONString(po));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < MAX_RETRY_CNT; ++i) {
|
for (int i = 0; i < this.maxRetryCnt; ++i) {
|
||||||
response = esClient.batch(batchRequest).actionGet(2, TimeUnit.MINUTES);
|
response = esClient.batch(batchRequest).actionGet(2, TimeUnit.MINUTES);
|
||||||
if (response == null) {continue;}
|
if (response == null) {continue;}
|
||||||
|
|
||||||
@@ -428,8 +437,8 @@ public class ESOpClient {
|
|||||||
if(StringUtils.isNotBlank(password)){
|
if(StringUtils.isNotBlank(password)){
|
||||||
esClient.setPassword(password);
|
esClient.setPassword(password);
|
||||||
}
|
}
|
||||||
if(ES_IO_THREAD_COUNT > 0) {
|
if(this.ioThreadCnt > 0) {
|
||||||
esClient.setIoThreadCount( ES_IO_THREAD_COUNT );
|
esClient.setIoThreadCount( this.ioThreadCnt );
|
||||||
}
|
}
|
||||||
|
|
||||||
// 配置http超时
|
// 配置http超时
|
||||||
@@ -439,11 +448,13 @@ public class ESOpClient {
|
|||||||
|
|
||||||
return esClient;
|
return esClient;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
esClient.close();
|
try {
|
||||||
|
esClient.close();
|
||||||
LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address,
|
} catch (Exception innerE) {
|
||||||
e);
|
// ignore
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, e);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -73,8 +73,13 @@ client-pool:
|
|||||||
borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒
|
borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒
|
||||||
|
|
||||||
|
|
||||||
# es客户端服务地址
|
# ES客户端配置
|
||||||
es.client.address: 127.0.0.1:8060
|
es:
|
||||||
|
client:
|
||||||
|
address: 127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061
|
||||||
|
client-cnt: 10
|
||||||
|
io-thread-cnt: 2
|
||||||
|
max-retry-cnt: 5
|
||||||
|
|
||||||
# 普罗米修斯指标导出相关配置
|
# 普罗米修斯指标导出相关配置
|
||||||
management:
|
management:
|
||||||
|
|||||||
2
pom.xml
2
pom.xml
@@ -15,7 +15,7 @@
|
|||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<km.revision>3.0.0-beta</km.revision>
|
<km.revision>3.0.0-beta.1</km.revision>
|
||||||
|
|
||||||
<maven.compiler.source>8</maven.compiler.source>
|
<maven.compiler.source>8</maven.compiler.source>
|
||||||
<maven.compiler.target>8</maven.compiler.target>
|
<maven.compiler.target>8</maven.compiler.target>
|
||||||
|
|||||||
Reference in New Issue
Block a user