diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java index 83222090..f4688729 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java @@ -266,9 +266,14 @@ public class PartitionServiceImpl extends BaseKafkaVersionControlService impleme List tpList = this.listPartitionFromCacheFirst(clusterPhyId, topicName).stream() .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) + .filter(partition -> partition.getPartitionId().equals(partitionId)) .map(elem -> new TopicPartition(topicName, elem.getPartitionId())) .collect(Collectors.toList()); + if (ValidateUtils.isEmptyList(tpList)) { + return Result.buildSuc(new HashMap<>(0)); + } + try { Result>>> listResult = (Result>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, offsetSpec, tpList));