From 63d291cb470a54d9b74c291711b2973964697849 Mon Sep 17 00:00:00 2001 From: 17hao Date: Wed, 3 Feb 2021 15:50:33 +0800 Subject: [PATCH 1/6] Remove __consumer_offsets from topic list --- .../manager/service/service/impl/TopicManagerServiceImpl.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index 0b42d068..9e381587 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -50,6 +50,8 @@ import java.util.stream.Collectors; public class TopicManagerServiceImpl implements TopicManagerService { private static final Logger LOGGER = LoggerFactory.getLogger(TopicManagerServiceImpl.class); + private static final String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets"; + @Autowired private TopicDao topicDao; @@ -275,6 +277,8 @@ public class TopicManagerServiceImpl implements TopicManagerService { } Map> topicMap = new HashMap<>(appList.size()); for (TopicDO topicDO: topicList) { + if (topicDO.getTopicName().equals(CONSUMER_OFFSETS_TOPIC)) + continue; Map subTopicMap = topicMap.getOrDefault(topicDO.getClusterId(), new HashMap<>()); subTopicMap.put(topicDO.getTopicName(), topicDO); topicMap.put(topicDO.getClusterId(), subTopicMap); From 712851a8a56725a097bf89ab479af332c0957e7f Mon Sep 17 00:00:00 2001 From: 17hao Date: Wed, 3 Feb 2021 16:06:16 +0800 Subject: [PATCH 2/6] Add braces --- .../manager/service/service/impl/TopicManagerServiceImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index 9e381587..96913e50 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -277,8 +277,9 @@ public class TopicManagerServiceImpl implements TopicManagerService { } Map> topicMap = new HashMap<>(appList.size()); for (TopicDO topicDO: topicList) { - if (topicDO.getTopicName().equals(CONSUMER_OFFSETS_TOPIC)) + if (topicDO.getTopicName().equals(CONSUMER_OFFSETS_TOPIC)) { continue; + } Map subTopicMap = topicMap.getOrDefault(topicDO.getClusterId(), new HashMap<>()); subTopicMap.put(topicDO.getTopicName(), topicDO); topicMap.put(topicDO.getClusterId(), subTopicMap); From dfb625377b85afae67f9e17d6e7a263a11be9aea Mon Sep 17 00:00:00 2001 From: 17hao Date: Wed, 3 Feb 2021 22:30:38 +0800 Subject: [PATCH 3/6] Using existing topic name constant --- .../manager/service/service/impl/TopicManagerServiceImpl.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index 96913e50..fc5f678d 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -50,8 +50,6 @@ import java.util.stream.Collectors; public class TopicManagerServiceImpl implements TopicManagerService { private static final Logger LOGGER = LoggerFactory.getLogger(TopicManagerServiceImpl.class); - private static final String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets"; - @Autowired private TopicDao topicDao; @@ -277,7 +275,7 @@ public class TopicManagerServiceImpl implements TopicManagerService { } Map> topicMap = new HashMap<>(appList.size()); for (TopicDO topicDO: topicList) { - if (topicDO.getTopicName().equals(CONSUMER_OFFSETS_TOPIC)) { + if (topicDO.getTopicName().equals(KafkaConstant.COORDINATOR_TOPIC_NAME)) { continue; } Map subTopicMap = topicMap.getOrDefault(topicDO.getClusterId(), new HashMap<>()); From e8de40328690b729a9d6572a0bfa1f808e6842e7 Mon Sep 17 00:00:00 2001 From: 17hao Date: Thu, 4 Feb 2021 12:14:44 +0800 Subject: [PATCH 4/6] Hide __transaction_state in topic list && fix logic error --- .../service/impl/TopicManagerServiceImpl.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index fc5f678d..e903b2e7 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -2,6 +2,7 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum; +import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; import com.xiaojukeji.kafka.manager.common.entity.Result; @@ -275,11 +276,13 @@ public class TopicManagerServiceImpl implements TopicManagerService { } Map> topicMap = new HashMap<>(appList.size()); for (TopicDO topicDO: topicList) { - if (topicDO.getTopicName().equals(KafkaConstant.COORDINATOR_TOPIC_NAME)) { + String topicName = topicDO.getTopicName(); + if (topicName.equals(KafkaConstant.COORDINATOR_TOPIC_NAME) || + topicName.equals(KafkaConstant.TRANSACTION_TOPIC_NAME)) { continue; } Map subTopicMap = topicMap.getOrDefault(topicDO.getClusterId(), new HashMap<>()); - subTopicMap.put(topicDO.getTopicName(), topicDO); + subTopicMap.put(topicName, topicDO); topicMap.put(topicDO.getClusterId(), subTopicMap); } @@ -304,6 +307,11 @@ public class TopicManagerServiceImpl implements TopicManagerService { continue; } + TopicDO topicDO = topicMap.get(topicName); + if (ValidateUtils.isNull(topicDO)) { + continue; + } + TopicDTO dto = new TopicDTO(); dtoList.add(dto); @@ -311,11 +319,6 @@ public class TopicManagerServiceImpl implements TopicManagerService { dto.setLogicalClusterName(logicalClusterDO.getName()); dto.setTopicName(topicName); dto.setNeedAuth(Boolean.TRUE); - - TopicDO topicDO = topicMap.get(topicName); - if (ValidateUtils.isNull(topicDO)) { - continue; - } dto.setDescription(topicDO.getDescription()); dto.setAppId(topicDO.getAppId()); From 9491418f3b5d7c38d32442b63b46b9e09c7faf8f Mon Sep 17 00:00:00 2001 From: 17hao Date: Thu, 4 Feb 2021 12:32:28 +0800 Subject: [PATCH 5/6] Update if statements --- .../service/service/impl/TopicManagerServiceImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index e903b2e7..f86d022a 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -303,12 +303,10 @@ public class TopicManagerServiceImpl implements TopicManagerService { clusterDO.getId(), topicName ); - if (ValidateUtils.isNull(logicalClusterDO)) { - continue; - } TopicDO topicDO = topicMap.get(topicName); - if (ValidateUtils.isNull(topicDO)) { + + if (ValidateUtils.isNull(topicDO) || ValidateUtils.isNull(logicalClusterDO)) { continue; } From c7919210a2cf3d90f58b3578539b96e6bfcd9578 Mon Sep 17 00:00:00 2001 From: 17hao Date: Thu, 4 Feb 2021 16:30:23 +0800 Subject: [PATCH 6/6] Fix topic list filter condition --- .../service/impl/TopicManagerServiceImpl.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index f86d022a..a2d5aa92 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -276,13 +276,8 @@ public class TopicManagerServiceImpl implements TopicManagerService { } Map> topicMap = new HashMap<>(appList.size()); for (TopicDO topicDO: topicList) { - String topicName = topicDO.getTopicName(); - if (topicName.equals(KafkaConstant.COORDINATOR_TOPIC_NAME) || - topicName.equals(KafkaConstant.TRANSACTION_TOPIC_NAME)) { - continue; - } Map subTopicMap = topicMap.getOrDefault(topicDO.getClusterId(), new HashMap<>()); - subTopicMap.put(topicName, topicDO); + subTopicMap.put(topicDO.getTopicName(), topicDO); topicMap.put(topicDO.getClusterId(), subTopicMap); } @@ -299,14 +294,15 @@ public class TopicManagerServiceImpl implements TopicManagerService { Map topicMap) { List dtoList = new ArrayList<>(); for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) { + if (topicName.equals(KafkaConstant.COORDINATOR_TOPIC_NAME) || topicName.equals(KafkaConstant.TRANSACTION_TOPIC_NAME)) { + continue; + } + LogicalClusterDO logicalClusterDO = logicalClusterMetadataManager.getTopicLogicalCluster( clusterDO.getId(), topicName ); - - TopicDO topicDO = topicMap.get(topicName); - - if (ValidateUtils.isNull(topicDO) || ValidateUtils.isNull(logicalClusterDO)) { + if (ValidateUtils.isNull(logicalClusterDO)) { continue; } @@ -317,6 +313,11 @@ public class TopicManagerServiceImpl implements TopicManagerService { dto.setLogicalClusterName(logicalClusterDO.getName()); dto.setTopicName(topicName); dto.setNeedAuth(Boolean.TRUE); + + TopicDO topicDO = topicMap.get(topicName); + if (ValidateUtils.isNull(topicDO)) { + continue; + } dto.setDescription(topicDO.getDescription()); dto.setAppId(topicDO.getAppId());