From 220f1c6fc37a00e8e9f306a3b73388808ee27f8a Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 29 Aug 2022 20:31:34 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=87=E6=A0=B7=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../biz/topic/impl/TopicStateManagerImpl.java | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 81ed009f..a0418bb2 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -129,7 +129,12 @@ public class TopicStateManagerImpl implements TopicStateManager { return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); } - // 获取分区offset + // 获取分区beginOffset + Result> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.earliest(), null); + if (beginOffsetsMapResult.failed()) { + return Result.buildFromIgnoreData(beginOffsetsMapResult); + } + // 获取分区endOffset Result> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.latest(), null); if (endOffsetsMapResult.failed()) { return Result.buildFromIgnoreData(endOffsetsMapResult); @@ -142,13 +147,25 @@ public class TopicStateManagerImpl implements TopicStateManager { // 创建kafka-consumer kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords())); - kafkaConsumer.assign(endOffsetsMapResult.getData().keySet()); - for (Map.Entry entry: endOffsetsMapResult.getData().entrySet()) { - kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords())); + List partitionList = new ArrayList<>(); + long maxMessage = 0; + for (Map.Entry entry : endOffsetsMapResult.getData().entrySet()) { + long begin = beginOffsetsMapResult.getData().get(entry.getKey()); + long end = entry.getValue(); + if (begin == end){ + continue; + } + maxMessage += end - begin; + partitionList.add(entry.getKey()); + } + maxMessage = Math.min(maxMessage, dto.getMaxRecords()); + kafkaConsumer.assign(partitionList); + for (TopicPartition partition : partitionList) { + kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); } // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间 - while (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS <= dto.getPullTimeoutUnitMs() && voList.size() < dto.getMaxRecords()) { + while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS)); for (ConsumerRecord consumerRecord : consumerRecords) { if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {