From ceb8db09f417132cf730f92d4f37eb3129f2bbd0 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 6 Sep 2022 15:21:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9F=A5=E8=AF=A2Topic?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=EF=BC=8CTopic=E4=B8=8D=E5=AD=98=E5=9C=A8?= =?UTF-8?q?=E6=97=B6=E7=9A=84=E9=94=99=E8=AF=AF=E6=8F=90=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/core/service/topic/impl/TopicServiceImpl.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java index ea343539..bffabec8 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java @@ -27,11 +27,13 @@ import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; import kafka.zk.TopicsZNode; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -84,6 +86,13 @@ public class TopicServiceImpl implements TopicService { } return partitionMap; + } catch (ExecutionException e) { + log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e); + if (e.getCause() instanceof UnknownTopicOrPartitionException) { + throw new AdminOperateException(String.format("Kafka does not host Topic:[%s]", topicName), e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED); + } + + throw new AdminOperateException("get topic info from kafka failed", e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED); } catch (Exception e) { log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e); throw new AdminOperateException("get topic info from kafka failed", e, ResultStatus.KAFKA_OPERATE_FAILED);