diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java index e7a67ac7..50c3596d 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java @@ -14,6 +14,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO; +import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil; import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; @@ -71,6 +72,9 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { Topic groupTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME); Topic transactionTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME); + //获取controller信息 + KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId); + // 格式转换 return PaginationResult.buildSuc( this.convert2ClusterBrokersOverviewVOList( @@ -78,7 +82,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { brokerList, metricsResult.getData(), groupTopic, - transactionTopic + transactionTopic, + kafkaController ), paginationResult ); @@ -159,7 +164,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { List brokerList, List metricsList, Topic groupTopic, - Topic transactionTopic) { + Topic transactionTopic, + KafkaController kafkaController) { Map metricsMap = metricsList == null? new HashMap<>(): metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity())); Map brokerMap = brokerList == null? new HashMap<>(): brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity())); @@ -169,12 +175,12 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { Broker broker = brokerMap.get(brokerId); BrokerMetrics brokerMetrics = metricsMap.get(brokerId); - voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic)); + voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController)); } return voList; } - private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic) { + private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic, KafkaController kafkaController) { ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO(); clusterBrokersOverviewVO.setBrokerId(brokerId); if (broker != null) { @@ -192,6 +198,9 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { if (transactionTopic != null && transactionTopic.getBrokerIdSet().contains(brokerId)) { clusterBrokersOverviewVO.getKafkaRoleList().add(transactionTopic.getTopicName()); } + if (kafkaController != null && kafkaController.getBrokerId().equals(brokerId)) { + clusterBrokersOverviewVO.getKafkaRoleList().add(KafkaConstant.CONTROLLER_ROLE); + } clusterBrokersOverviewVO.setLatestMetrics(brokerMetrics); return clusterBrokersOverviewVO; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java index 3b768e01..16fd7921 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java @@ -41,6 +41,8 @@ public class KafkaConstant { public static final Long POLL_ONCE_TIMEOUT_UNIT_MS = 2000L; + public static final String CONTROLLER_ROLE = "controller"; + public static final Map KAFKA_ALL_CONFIG_DEF_MAP = new ConcurrentHashMap<>(); static {