From 5262ae8907a6152e11b73bdbad1a0e0ace1cb839 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 25 Aug 2022 20:30:15 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=87=E6=A0=B7=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/biz/topic/impl/TopicStateManagerImpl.java | 10 +++++----- .../km/common/bean/entity/record/RecordHeaderKS.java | 2 +- .../km/common/converter/TopicVOConverter.java | 3 ++- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 4e892f94..81ed009f 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -142,12 +142,13 @@ public class TopicStateManagerImpl implements TopicStateManager { // 创建kafka-consumer kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords())); + kafkaConsumer.assign(endOffsetsMapResult.getData().keySet()); + for (Map.Entry entry: endOffsetsMapResult.getData().entrySet()) { + kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords())); + } + // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间 while (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS <= dto.getPullTimeoutUnitMs() && voList.size() < dto.getMaxRecords()) { - for (Map.Entry entry: endOffsetsMapResult.getData().entrySet()) { - kafkaConsumer.assign(Arrays.asList(entry.getKey())); - kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords())); - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS)); for (ConsumerRecord consumerRecord : consumerRecords) { if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) { @@ -165,7 +166,6 @@ public class TopicStateManagerImpl implements TopicStateManager { || voList.size() > dto.getMaxRecords()) { break; } - } } return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/record/RecordHeaderKS.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/record/RecordHeaderKS.java index 8ad3cee1..a15385f6 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/record/RecordHeaderKS.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/record/RecordHeaderKS.java @@ -10,5 +10,5 @@ import lombok.NoArgsConstructor; public class RecordHeaderKS { private String key; - private byte[] value; + private String value; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicVOConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicVOConverter.java index 9f01ff63..8f5d5c28 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicVOConverter.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicVOConverter.java @@ -18,6 +18,7 @@ import io.swagger.annotations.ApiModelProperty; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; +import java.nio.charset.StandardCharsets; import java.util.*; public class TopicVOConverter { @@ -51,7 +52,7 @@ public class TopicVOConverter { vo.setValue(consumerRecord.value()); vo.setHeaderList(new ArrayList<>()); for (Header header : consumerRecord.headers().toArray()) { - vo.getHeaderList().add(new RecordHeaderKS(header.key(), header.value())); + vo.getHeaderList().add(new RecordHeaderKS(header.key(), new String(header.value(), StandardCharsets.UTF_8))); } return vo; }