From 2925a20e8e8047b46bdc60a5d779f24e4e843bf9 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 9 Jan 2023 11:18:11 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E4=BF=AE=E5=A4=8D=E6=9F=A5=E7=9C=8B?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=97=B6=EF=BC=8C=E9=80=89=E6=8B=A9=E5=88=86?= =?UTF-8?q?=E5=8C=BA=E4=B8=8D=E7=94=9F=E6=95=88=E9=97=AE=E9=A2=98(#858)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/core/service/partition/impl/PartitionServiceImpl.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java index 83222090..f4688729 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java @@ -266,9 +266,14 @@ public class PartitionServiceImpl extends BaseKafkaVersionControlService impleme List tpList = this.listPartitionFromCacheFirst(clusterPhyId, topicName).stream() .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) + .filter(partition -> partition.getPartitionId().equals(partitionId)) .map(elem -> new TopicPartition(topicName, elem.getPartitionId())) .collect(Collectors.toList()); + if (ValidateUtils.isEmptyList(tpList)) { + return Result.buildSuc(new HashMap<>(0)); + } + try { Result>>> listResult = (Result>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, offsetSpec, tpList));