From fa0ae5e474bb2be2f053adf5e31bae6bd7e0dd09 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Fri, 21 Oct 2022 11:58:57 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=20=E9=9B=86=E7=BE=A4Broker=E5=88=97?= =?UTF-8?q?=E8=A1=A8=E4=B8=AD=EF=BC=8C=E8=A1=A5=E5=85=85Jmx=E6=98=AF?= =?UTF-8?q?=E5=90=A6=E6=88=90=E5=8A=9F=E8=BF=9E=E6=8E=A5=E7=9A=84=E4=BF=A1?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、当前页面无数据时,一部分的原因是JMX连接失败导致; 2、Broker列表中增加是否连接成功的信息,便于问题的排查; --- .../impl/ClusterBrokersManagerImpl.java | 24 ++++++++++++++----- .../cluster/res/ClusterBrokersOverviewVO.java | 3 +++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java index 50c3596d..6b180126 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java @@ -24,6 +24,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -51,6 +52,9 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { @Autowired private KafkaControllerService kafkaControllerService; + @Autowired + private KafkaJMXClient kafkaJMXClient; + @Override public PaginationResult getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto) { // 获取集群Broker列表 @@ -75,6 +79,10 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { //获取controller信息 KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId); + //获取jmx状态信息 + Map jmxConnectedMap = new HashMap<>(); + brokerList.forEach(elem -> jmxConnectedMap.put(elem.getBrokerId(), kafkaJMXClient.getClientWithCheck(clusterPhyId, elem.getBrokerId()) != null)); + // 格式转换 return PaginationResult.buildSuc( this.convert2ClusterBrokersOverviewVOList( @@ -83,7 +91,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { metricsResult.getData(), groupTopic, transactionTopic, - kafkaController + kafkaController, + jmxConnectedMap ), paginationResult ); @@ -165,22 +174,24 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { List metricsList, Topic groupTopic, Topic transactionTopic, - KafkaController kafkaController) { - Map metricsMap = metricsList == null? new HashMap<>(): metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity())); + KafkaController kafkaController, + Map jmxConnectedMap) { + Map metricsMap = metricsList == null ? new HashMap<>() : metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity())); - Map brokerMap = brokerList == null? new HashMap<>(): brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity())); + Map brokerMap = brokerList == null ? new HashMap<>() : brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity())); List voList = new ArrayList<>(pagedBrokerIdList.size()); for (Integer brokerId : pagedBrokerIdList) { Broker broker = brokerMap.get(brokerId); BrokerMetrics brokerMetrics = metricsMap.get(brokerId); + Boolean jmxConnected = jmxConnectedMap.get(brokerId); - voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController)); + voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController, jmxConnected)); } return voList; } - private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic, KafkaController kafkaController) { + private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic, KafkaController kafkaController, Boolean jmxConnected) { ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO(); clusterBrokersOverviewVO.setBrokerId(brokerId); if (broker != null) { @@ -203,6 +214,7 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { } clusterBrokersOverviewVO.setLatestMetrics(brokerMetrics); + clusterBrokersOverviewVO.setJmxConnected(jmxConnected); return clusterBrokersOverviewVO; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/res/ClusterBrokersOverviewVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/res/ClusterBrokersOverviewVO.java index be1b529d..b172403c 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/res/ClusterBrokersOverviewVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/res/ClusterBrokersOverviewVO.java @@ -31,6 +31,9 @@ public class ClusterBrokersOverviewVO extends BrokerMetadataVO { @ApiModelProperty(value = "jmx端口") private Integer jmxPort; + @ApiModelProperty(value = "jmx连接状态 true:连接成功 false:连接失败") + private Boolean jmxConnected; + @ApiModelProperty(value = "是否存活 true:存活 false:不存活") private Boolean alive; }