From 3b72f732be68edb4750ffa33b675ebf8450198eb Mon Sep 17 00:00:00 2001 From: EricZeng Date: Wed, 27 Sep 2023 14:05:45 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=E4=BC=98=E5=8C=96=E9=9B=86=E7=BE=A4B?= =?UTF-8?q?rokers=E4=B8=AD,=20Controller=E6=98=BE=E7=A4=BA=E5=AD=98?= =?UTF-8?q?=E5=9C=A8=E5=BB=B6=E8=BF=9F=E7=9A=84=E9=97=AE=E9=A2=98=20(#1162?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 优化方式: 从DB获取调整为从Kafka中实时获取。 --- .../km/biz/cluster/ClusterBrokersManager.java | 3 ++- .../impl/ClusterBrokersManagerImpl.java | 26 ++++++++++++++----- .../v3/cluster/ClusterBrokersController.java | 2 +- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java index 8427c1ef..38a2ea4d 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterBrokersManager.java @@ -2,6 +2,7 @@ package com.xiaojukeji.know.streaming.km.biz.cluster; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterBrokersOverviewDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO; @@ -22,5 +23,5 @@ public interface ClusterBrokersManager { * @param clusterPhyId 物理集群 id * @return 返回根据物理集群id获取到的集群对应broker状态信息 */ - ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId); + Result getClusterPhyBrokersState(Long clusterPhyId); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java index c77724dd..99477370 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java @@ -12,11 +12,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.Kafka import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterRunStateEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; @@ -26,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; @@ -60,6 +63,9 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { @Autowired private KafkaJMXClient kafkaJMXClient; + @Autowired + private ClusterPhyService clusterPhyService; + @Override public PaginationResult getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto) { // 获取集群Broker列表 @@ -108,7 +114,12 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { } @Override - public ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId) { + public Result getClusterPhyBrokersState(Long clusterPhyId) { + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId); + if (clusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); + } + ClusterBrokersStateVO clusterBrokersStateVO = new ClusterBrokersStateVO(); // 获取集群Broker列表 @@ -126,24 +137,25 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { ); // 获取controller信息 - KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId); + Result controllerResult = kafkaControllerService.getControllerFromKafka(clusterPhy); // 设置kafka-controller信息 clusterBrokersStateVO.setKafkaControllerAlive(false); - if(null != kafkaController) { + if(null != controllerResult.getData()) { clusterBrokersStateVO.setKafkaController( this.convert2KafkaControllerVO( - kafkaController, - brokerService.getBroker(clusterPhyId, kafkaController.getBrokerId()) + controllerResult.getData(), + brokerService.getBroker(clusterPhyId, controllerResult.getData().getBrokerId()) ) ); clusterBrokersStateVO.setKafkaControllerAlive(true); } - clusterBrokersStateVO.setConfigSimilar(brokerConfigService.countBrokerConfigDiffsFromDB(clusterPhyId, KafkaConstant.CONFIG_SIMILAR_IGNORED_CONFIG_KEY_LIST) <= 0 + clusterBrokersStateVO.setConfigSimilar( + brokerConfigService.countBrokerConfigDiffsFromDB(clusterPhyId, KafkaConstant.CONFIG_SIMILAR_IGNORED_CONFIG_KEY_LIST) <= 0 ); - return clusterBrokersStateVO; + return Result.buildSuc(clusterBrokersStateVO); } /**************************************************** private method ****************************************************/ diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterBrokersController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterBrokersController.java index db9c933a..7d8d6ee0 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterBrokersController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterBrokersController.java @@ -52,7 +52,7 @@ public class ClusterBrokersController { @GetMapping(value = "clusters/{clusterPhyId}/brokers-state") @ResponseBody public Result getClusterPhyBrokersState(@PathVariable Long clusterPhyId) { - return Result.buildSuc(clusterBrokersManager.getClusterPhyBrokersState(clusterPhyId)); + return clusterBrokersManager.getClusterPhyBrokersState(clusterPhyId); } @ApiOperation(value = "集群brokers信息列表")