diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java index ea343539..bffabec8 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java @@ -27,11 +27,13 @@ import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; import kafka.zk.TopicsZNode; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -84,6 +86,13 @@ public class TopicServiceImpl implements TopicService { } return partitionMap; + } catch (ExecutionException e) { + log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e); + if (e.getCause() instanceof UnknownTopicOrPartitionException) { + throw new AdminOperateException(String.format("Kafka does not host Topic:[%s]", topicName), e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED); + } + + throw new AdminOperateException("get topic info from kafka failed", e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED); } catch (Exception e) { log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e); throw new AdminOperateException("get topic info from kafka failed", e, ResultStatus.KAFKA_OPERATE_FAILED);