diff --git a/docs/assets/KnowStreamingLogo.png b/docs/assets/KnowStreamingLogo.png deleted file mode 100644 index f38dd42a..00000000 Binary files a/docs/assets/KnowStreamingLogo.png and /dev/null differ diff --git a/docs/assets/readme/KnowStreamingPageDemo.jpg b/docs/assets/readme/KnowStreamingPageDemo.jpg deleted file mode 100644 index a8d97df1..00000000 Binary files a/docs/assets/readme/KnowStreamingPageDemo.jpg and /dev/null differ diff --git a/docs/assets/readme/WeChat.png b/docs/assets/readme/WeChat.png deleted file mode 100644 index 262d0aae..00000000 Binary files a/docs/assets/readme/WeChat.png and /dev/null differ diff --git a/docs/assets/readme/ZSXQ.jpeg b/docs/assets/readme/ZSXQ.jpeg deleted file mode 100644 index 121bf9b1..00000000 Binary files a/docs/assets/readme/ZSXQ.jpeg and /dev/null differ diff --git a/docs/dev_guide/assets/multi_version_compatible/registerHandler.png b/docs/dev_guide/assets/multi_version_compatible/registerHandler.png deleted file mode 100644 index f7b040dc..00000000 Binary files a/docs/dev_guide/assets/multi_version_compatible/registerHandler.png and /dev/null differ diff --git a/docs/dev_guide/assets/startup_using_source_code/IDEA配置.jpg b/docs/dev_guide/assets/startup_using_source_code/IDEA配置.jpg deleted file mode 100644 index 237aaa42..00000000 Binary files a/docs/dev_guide/assets/startup_using_source_code/IDEA配置.jpg and /dev/null differ diff --git a/docs/dev_guide/多版本兼容方案.md b/docs/dev_guide/多版本兼容方案.md index 389d0650..f41c01d4 100644 --- a/docs/dev_guide/多版本兼容方案.md +++ b/docs/dev_guide/多版本兼容方案.md @@ -36,7 +36,7 @@ KS-KM 根据其需要纳管的 kafka 版本,按照上述三个维度构建了   KS-KM 的每个版本针对需要纳管的 kafka 版本列表,事先分析各个版本的差异性和产品需求,同时 KS-KM 构建了一套专门处理兼容性的服务,来进行兼容性的注册、字典构建、处理器分发等操作,其中版本兼容性处理器是来具体处理不同 kafka 版本差异性的地方。 -​ ![registerHandler](./assets/multi_version_compatible/registerHandler.png) +​ ![registerHandler](http://img-ys011.didistatic.com/static/dc2img/do1_WxVTzndYE59ah5DFrMfn)   如上图所示,KS-KM 的 topic 服务在面对不同 kafka 版本时,其 topic 的创建、删除、扩容由于 kafka 版本自身的差异,导致 KnowStreaming 的处理也不一样,所以需要根据不同的 kafka 版本来实现不同的兼容性处理器,同时向 KnowStreaming 的兼容服务进行兼容性的注册,构建兼容性字典,后续在 KnowStreaming 的运行过程中,针对不同的 kafka 版本即可分发到不同的处理器中执行。 diff --git a/docs/dev_guide/本地源码启动手册.md b/docs/dev_guide/本地源码启动手册.md index ed21c3b8..a46e06cf 100644 --- a/docs/dev_guide/本地源码启动手册.md +++ b/docs/dev_guide/本地源码启动手册.md @@ -73,7 +73,7 @@ km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/KnowStreaming.java IDEA 更多具体的配置如下图所示:

- +

**第四步:启动项目** diff --git a/docs/dev_guide/解决连接JMX失败.md b/docs/dev_guide/解决连接JMX失败.md index f66a5ab0..a82069ac 100644 --- a/docs/dev_guide/解决连接JMX失败.md +++ b/docs/dev_guide/解决连接JMX失败.md @@ -1,5 +1,5 @@ -![Logo](../assets/KnowStreamingLogo.png) +![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png) ## JMX-连接失败问题解决 diff --git a/docs/install_guide/源码编译打包手册.md b/docs/install_guide/源码编译打包手册.md index b0b20101..708396b6 100644 --- a/docs/install_guide/源码编译打包手册.md +++ b/docs/install_guide/源码编译打包手册.md @@ -1,5 +1,5 @@ -![Logo](../assets/KnowStreamingLogo.png) +![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png) # `Know Streaming` 源码编译打包手册 diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 81ed009f..a0418bb2 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -129,7 +129,12 @@ public class TopicStateManagerImpl implements TopicStateManager { return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); } - // 获取分区offset + // 获取分区beginOffset + Result> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.earliest(), null); + if (beginOffsetsMapResult.failed()) { + return Result.buildFromIgnoreData(beginOffsetsMapResult); + } + // 获取分区endOffset Result> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.latest(), null); if (endOffsetsMapResult.failed()) { return Result.buildFromIgnoreData(endOffsetsMapResult); @@ -142,13 +147,25 @@ public class TopicStateManagerImpl implements TopicStateManager { // 创建kafka-consumer kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords())); - kafkaConsumer.assign(endOffsetsMapResult.getData().keySet()); - for (Map.Entry entry: endOffsetsMapResult.getData().entrySet()) { - kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords())); + List partitionList = new ArrayList<>(); + long maxMessage = 0; + for (Map.Entry entry : endOffsetsMapResult.getData().entrySet()) { + long begin = beginOffsetsMapResult.getData().get(entry.getKey()); + long end = entry.getValue(); + if (begin == end){ + continue; + } + maxMessage += end - begin; + partitionList.add(entry.getKey()); + } + maxMessage = Math.min(maxMessage, dto.getMaxRecords()); + kafkaConsumer.assign(partitionList); + for (TopicPartition partition : partitionList) { + kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); } // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间 - while (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS <= dto.getPullTimeoutUnitMs() && voList.size() < dto.getMaxRecords()) { + while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS)); for (ConsumerRecord consumerRecord : consumerRecords) { if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) { diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index d0ca75e9..c611c538 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -16,7 +16,6 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.google.common.collect.Lists; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; -import com.xiaojukeji.know.streaming.km.common.constant.ESConstant; import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -37,7 +36,6 @@ import java.util.function.Function; @Component public class ESOpClient { - private static final ILog LOGGER = LogFactory.getLog("ES_LOGGER"); /** @@ -45,6 +43,7 @@ public class ESOpClient { */ @Value("${es.client.address}") private String esAddress; + /** * es 访问密码 */ @@ -54,22 +53,32 @@ public class ESOpClient { /** * 客户端个数 */ - private static final int ES_CLIENT_COUNT = 30; + @Value("${es.client.client-cnt:10}") + private Integer clientCnt; - private static final int MAX_RETRY_CNT = 5; - - private static final int ES_IO_THREAD_COUNT = 4; + /** + * 最大重试次数 + */ + @Value("${es.client.max-retry-cnt:5}") + private Integer maxRetryCnt; + /** + * IO线程数 + */ + @Value("${es.client.io-thread-cnt:2}") + private Integer ioThreadCnt; /** * 更新es数据的客户端连接队列 */ - private LinkedBlockingQueue esClientPool = new LinkedBlockingQueue<>( ES_CLIENT_COUNT ); + private LinkedBlockingQueue esClientPool; @PostConstruct public void init(){ - for (int i = 0; i < ES_CLIENT_COUNT; ++i) { - ESClient esClient = buildEsClient(esAddress, esPass, "", ""); + esClientPool = new LinkedBlockingQueue<>( clientCnt ); + + for (int i = 0; i < clientCnt; ++i) { + ESClient esClient = this.buildEsClient(esAddress, esPass, "", ""); if (esClient != null) { this.esClientPool.add(esClient); LOGGER.info("class=ESOpClient||method=init||msg=add new es client {}", esAddress); @@ -245,7 +254,7 @@ public class ESOpClient { esIndexRequest.source(source); esIndexRequest.id(id); - for (int i = 0; i < MAX_RETRY_CNT; ++i) { + for (int i = 0; i < this.maxRetryCnt; ++i) { response = esClient.index(esIndexRequest).actionGet(10, TimeUnit.SECONDS); if (response == null) { continue; @@ -307,7 +316,7 @@ public class ESOpClient { batchRequest.addNode(BatchType.INDEX, indexName, null, po.getKey(), JSON.toJSONString(po)); } - for (int i = 0; i < MAX_RETRY_CNT; ++i) { + for (int i = 0; i < this.maxRetryCnt; ++i) { response = esClient.batch(batchRequest).actionGet(2, TimeUnit.MINUTES); if (response == null) {continue;} @@ -428,8 +437,8 @@ public class ESOpClient { if(StringUtils.isNotBlank(password)){ esClient.setPassword(password); } - if(ES_IO_THREAD_COUNT > 0) { - esClient.setIoThreadCount( ES_IO_THREAD_COUNT ); + if(this.ioThreadCnt > 0) { + esClient.setIoThreadCount( this.ioThreadCnt ); } // 配置http超时 @@ -439,11 +448,13 @@ public class ESOpClient { return esClient; } catch (Exception e) { - esClient.close(); - - LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, - e); + try { + esClient.close(); + } catch (Exception innerE) { + // ignore + } + LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, e); return null; } } diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index a6417157..4b0831c7 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -73,8 +73,13 @@ client-pool: borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒 -# es客户端服务地址 -es.client.address: 127.0.0.1:8060 +# ES客户端配置 +es: + client: + address: 127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061 + client-cnt: 10 + io-thread-cnt: 2 + max-retry-cnt: 5 # 普罗米修斯指标导出相关配置 management: diff --git a/pom.xml b/pom.xml index d1dd7544..5d0052d8 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ - 3.0.0-beta + 3.0.0-beta.1 8 8