mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
优化查询Topic信息,Topic不存在时的错误提示
This commit is contained in:
@@ -27,11 +27,13 @@ import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
|
||||
import kafka.zk.TopicsZNode;
|
||||
import org.apache.kafka.clients.admin.*;
|
||||
import org.apache.kafka.common.TopicPartitionInfo;
|
||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -84,6 +86,13 @@ public class TopicServiceImpl implements TopicService {
|
||||
}
|
||||
|
||||
return partitionMap;
|
||||
} catch (ExecutionException e) {
|
||||
log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e);
|
||||
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
|
||||
throw new AdminOperateException(String.format("Kafka does not host Topic:[%s]", topicName), e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED);
|
||||
}
|
||||
|
||||
throw new AdminOperateException("get topic info from kafka failed", e.getCause(), ResultStatus.KAFKA_OPERATE_FAILED);
|
||||
} catch (Exception e) {
|
||||
log.error("method=getTopicPartitionMapFromKafka||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhyId, topicName, e);
|
||||
throw new AdminOperateException("get topic info from kafka failed", e, ResultStatus.KAFKA_OPERATE_FAILED);
|
||||
|
||||
Reference in New Issue
Block a user