From 0123ce4a5a4bd0b25d719b937c928edeaa204e22 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 6 Dec 2022 16:47:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Broker=E5=88=97=E8=A1=A8JMX?= =?UTF-8?q?=E7=AB=AF=E5=8F=A3=E7=9A=84=E8=BF=94=E5=9B=9E=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/ClusterBrokersManagerImpl.java | 20 +++++++++++++++++-- .../km/common/bean/entity/broker/Broker.java | 4 ++-- .../broker/impl/BrokerServiceImpl.java | 19 +++++++++--------- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java index 6b180126..7f98e86f 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java @@ -6,6 +6,8 @@ import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterBrokersManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterBrokersOverviewDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; @@ -16,6 +18,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBroker import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; +import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterRunStateEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil; import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; @@ -24,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -83,9 +88,13 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { Map jmxConnectedMap = new HashMap<>(); brokerList.forEach(elem -> jmxConnectedMap.put(elem.getBrokerId(), kafkaJMXClient.getClientWithCheck(clusterPhyId, elem.getBrokerId()) != null)); + + ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId); + // 格式转换 return PaginationResult.buildSuc( this.convert2ClusterBrokersOverviewVOList( + clusterPhy, paginationResult.getData().getBizData(), brokerList, metricsResult.getData(), @@ -169,7 +178,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { ); } - private List convert2ClusterBrokersOverviewVOList(List pagedBrokerIdList, + private List convert2ClusterBrokersOverviewVOList(ClusterPhy clusterPhy, + List pagedBrokerIdList, List brokerList, List metricsList, Topic groupTopic, @@ -185,9 +195,15 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { Broker broker = brokerMap.get(brokerId); BrokerMetrics brokerMetrics = metricsMap.get(brokerId); Boolean jmxConnected = jmxConnectedMap.get(brokerId); - voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController, jmxConnected)); } + + //补充非zk模式的JMXPort信息 + if (!clusterPhy.getRunState().equals(ClusterRunStateEnum.RUN_ZK.getRunState())) { + JmxConfig jmxConfig = ConvertUtil.str2ObjByJson(clusterPhy.getJmxProperties(), JmxConfig.class); + voList.forEach(elem -> elem.setJmxPort(jmxConfig.getJmxPort() == null ? -1 : jmxConfig.getJmxPort())); + } + return voList; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java index 513e926e..a1e39f34 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java @@ -66,13 +66,13 @@ public class Broker implements Serializable { */ private Map endpointMap; - public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp, JmxConfig jmxConfig) { + public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp) { Broker metadata = new Broker(); metadata.setClusterPhyId(clusterPhyId); metadata.setBrokerId(node.id()); metadata.setHost(node.host()); metadata.setPort(node.port()); - metadata.setJmxPort(jmxConfig != null ? jmxConfig.getJmxPort() : -1); + metadata.setJmxPort(-1); metadata.setStartTimestamp(startTimestamp); metadata.setRack(node.rack()); metadata.setStatus(1); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index 47f3bdbe..0bd7f364 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -168,7 +168,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok allBrokerList = this.listAllBrokersAndUpdateCache(clusterPhyId); } - return allBrokerList.stream().filter( elem -> elem.alive()).collect(Collectors.toList()); + return allBrokerList.stream().filter(elem -> elem.alive()).collect(Collectors.toList()); } @Override @@ -234,11 +234,10 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok @Override public String getBrokerVersionFromKafka(Long clusterId, Integer brokerId) { - JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClient(clusterId, brokerId); - if (ValidateUtils.isNull(jmxConnectorWrap) || !jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) { + JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterId, brokerId); + if (jmxConnectorWrap == null) { return ""; } - try { return (String) jmxConnectorWrap.getAttribute(new ObjectName(JMX_SERVER_APP_INFO + ",id=" + brokerId), VERSION); } catch (Exception e) { @@ -331,7 +330,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok return Result.buildSuc(brokerList); } catch (Exception e) { - log.error("class=BrokerServiceImpl||method=getBrokersFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); + log.error("method=getBrokersFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); } @@ -353,7 +352,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok return Result.buildSuc(newBrokerList); } catch (Exception e) { - log.error("class=BrokerServiceImpl||method=getBrokersFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); + log.error("method=getBrokersFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } @@ -361,13 +360,13 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok private Broker getStartTimeAndBuildBroker(Long clusterPhyId, Node newNode, JmxConfig jmxConfig) { try { - Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), null, jmxConfig); + Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), jmxConfig.getJmxPort(), jmxConfig); - return Broker.buildFrom(clusterPhyId, newNode, startTime, jmxConfig); + return Broker.buildFrom(clusterPhyId, newNode, startTime); } catch (Exception e) { - log.error("class=BrokerServiceImpl||method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e); + log.error("method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e); } - return Broker.buildFrom(clusterPhyId, newNode, null, jmxConfig); + return Broker.buildFrom(clusterPhyId, newNode, null); } }