From 5bd93aa47827039c0af104952a10bc741278f5e6 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 9 Feb 2023 16:43:02 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E4=BF=AE=E5=A4=8D=E6=AD=A3=E5=B8=B8?= =?UTF-8?q?=E6=83=85=E5=86=B5=E4=B8=8B=EF=BC=8C=E9=9B=86=E7=BE=A4=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E7=BB=9F=E8=AE=A1=E9=94=99=E8=AF=AF=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98(#865)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../biz/cluster/MultiClusterPhyManager.java | 6 ++ .../impl/MultiClusterPhyManagerImpl.java | 57 ++++++------------- .../impl/HealthCheckResultServiceImpl.java | 4 ++ .../state/impl/HealthStateServiceImpl.java | 24 ++++++++ .../v3/cluster/MultiClusterPhyController.java | 10 +++- 5 files changed, 61 insertions(+), 40 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/MultiClusterPhyManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/MultiClusterPhyManager.java index 0bd2f6e4..2d57d719 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/MultiClusterPhyManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/MultiClusterPhyManager.java @@ -4,8 +4,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysHe import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysState; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.MultiClusterDashboardDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO; +import java.util.List; + /** * 多集群总体状态 */ @@ -24,4 +28,6 @@ public interface MultiClusterPhyManager { * @return */ PaginationResult getClusterPhysDashboard(MultiClusterDashboardDTO dto); + + Result> getClusterPhysBasic(); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java index e5fa31e1..7e379c99 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java @@ -9,13 +9,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysHe import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysState; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.MultiClusterDashboardDTO; -import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.converter.ClusterVOConverter; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; @@ -24,7 +23,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; -import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -42,37 +40,26 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager { @Autowired private ClusterMetricService clusterMetricService; - @Autowired - private KafkaControllerService kafkaControllerService; - @Override public ClusterPhysState getClusterPhysState() { List clusterPhyList = clusterPhyService.listAllClusters(); + ClusterPhysState physState = new ClusterPhysState(0, 0, 0, clusterPhyList.size()); - Map controllerMap = kafkaControllerService.getKafkaControllersFromDB( - clusterPhyList.stream().map(elem -> elem.getId()).collect(Collectors.toList()), - false - ); - - ClusterPhysState physState = new ClusterPhysState(0, 0, clusterPhyList.size()); - for (ClusterPhy clusterPhy: clusterPhyList) { - KafkaController kafkaController = controllerMap.get(clusterPhy.getId()); - - if (kafkaController != null && !kafkaController.alive()) { - // 存在明确的信息表示controller挂了 - physState.setDownCount(physState.getDownCount() + 1); - } else if ((System.currentTimeMillis() - clusterPhy.getCreateTime().getTime() >= 5 * 60 * 1000) && kafkaController == null) { - // 集群接入时间是在近5分钟内,同时kafkaController信息不存在,则设置为down + for (ClusterPhy clusterPhy : clusterPhyList) { + ClusterMetrics metrics = clusterMetricService.getLatestMetricsFromCache(clusterPhy.getId()); + Float state = metrics.getMetric(ClusterMetricVersionItems.CLUSTER_METRIC_HEALTH_STATE); + if (state == null) { + physState.setUnknownCount(physState.getUnknownCount() + 1); + } else if (state.intValue() == HealthStateEnum.DEAD.getDimension()) { physState.setDownCount(physState.getDownCount() + 1); } else { - // 其他情况都设置为alive physState.setLiveCount(physState.getLiveCount() + 1); } } - return physState; } + @Override public ClusterPhysHealthState getClusterPhysHealthState() { List clusterPhyList = clusterPhyService.listAllClusters(); @@ -107,23 +94,6 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager { // 转为vo格式,方便后续进行分页筛选等 List voList = ConvertUtil.list2List(clusterPhyList, ClusterPhyDashboardVO.class); - // 获取集群controller信息并补充到vo中, - Map controllerMap = kafkaControllerService.getKafkaControllersFromDB(clusterPhyList.stream().map(elem -> elem.getId()).collect(Collectors.toList()), false); - for (ClusterPhyDashboardVO vo: voList) { - KafkaController kafkaController = controllerMap.get(vo.getId()); - - if (kafkaController != null && !kafkaController.alive()) { - // 存在明确的信息表示controller挂了 - vo.setAlive(Constant.DOWN); - } else if ((System.currentTimeMillis() - vo.getCreateTime().getTime() >= 5 * 60L * 1000L) && kafkaController == null) { - // 集群接入时间是在近5分钟内,同时kafkaController信息不存在,则设置为down - vo.setAlive(Constant.DOWN); - } else { - // 其他情况都设置为alive - vo.setAlive(Constant.ALIVE); - } - } - // 本地分页过滤 voList = this.getAndPagingDataInLocal(voList, dto); @@ -148,6 +118,15 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager { ); } + @Override + public Result> getClusterPhysBasic() { + // 获取集群 + List clusterPhyList = clusterPhyService.listAllClusters(); + + // 转为vo格式,方便后续进行分页筛选等 + return Result.buildSuc(ConvertUtil.list2List(clusterPhyList, ClusterPhyBaseVO.class)); + } + /**************************************************** private method ****************************************************/ diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java index 4c1471b6..322f1926 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java @@ -18,6 +18,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.config.PlatformClusterConfigService; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.persistence.mysql.connect.ConnectClusterDAO; +import com.xiaojukeji.know.streaming.km.persistence.mysql.connect.ConnectorDAO; import com.xiaojukeji.know.streaming.km.persistence.mysql.health.HealthCheckResultDAO; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; @@ -39,6 +40,9 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService { @Autowired private ConnectClusterDAO connectClusterDAO; + @Autowired + private ConnectorDAO connectorDAO; + @Autowired private PlatformClusterConfigService platformClusterConfigService; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java index 5247564b..b2d34f2a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java @@ -1,10 +1,12 @@ package com.xiaojukeji.know.streaming.km.core.service.health.state.impl; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAggResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; @@ -19,7 +21,9 @@ import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClus import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; +import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -29,6 +33,7 @@ import java.util.stream.Collectors; import java.util.List; import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum.*; +import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum.DEAD; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectorMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.BrokerMetricVersionItems.*; @@ -52,6 +57,9 @@ public class HealthStateServiceImpl implements HealthStateService { @Autowired private ConnectClusterService connectClusterService; + @Autowired + private KafkaControllerService kafkaControllerService; + @Override public ClusterMetrics calClusterHealthMetrics(Long clusterPhyId) { ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); @@ -105,6 +113,10 @@ public class HealthStateServiceImpl implements HealthStateService { state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_CONNECTOR)); state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_MIRROR_MAKER)); + if (isKafkaClusterDown(clusterPhyId)) { + state = Float.valueOf(HealthStateEnum.DEAD.getDimension()); + } + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED, passed); metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL, total); metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE, state); @@ -531,4 +543,16 @@ public class HealthStateServiceImpl implements HealthStateService { return existNotPassed? HealthStateEnum.MEDIUM: HealthStateEnum.GOOD; } + + private boolean isKafkaClusterDown(Long clusterPhyId) { + ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId); + KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId); + if (kafkaController != null && !kafkaController.alive()) { + return true; + } else if ((System.currentTimeMillis() - clusterPhy.getCreateTime().getTime() >= 5 * 60 * 1000) && kafkaController == null) { + // 集群接入时间是在近5分钟内,同时kafkaController信息不存在,则设置为down + return true; + } + return false; + } } diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java index aa9bf781..c93ec94f 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java @@ -4,6 +4,7 @@ import com.xiaojukeji.know.streaming.km.biz.cluster.MultiClusterPhyManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.MultiClusterDashboardDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhysHealthStateVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhysStateVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO; @@ -37,10 +38,17 @@ public class MultiClusterPhyController { @ApiOperation(value = "多物理集群-大盘", notes = "") @PostMapping(value = "physical-clusters/dashboard") @ResponseBody - public PaginationResult getClusterPhyBasic(@RequestBody @Validated MultiClusterDashboardDTO dto) { + public PaginationResult getClusterPhyDashboard(@RequestBody @Validated MultiClusterDashboardDTO dto) { return multiClusterPhyManager.getClusterPhysDashboard(dto); } + @ApiOperation(value = "多物理集群-基本信息", notes = "") + @GetMapping(value = "physical-clusters/basic") + @ResponseBody + public Result> getClusterPhyBasic() { + return multiClusterPhyManager.getClusterPhysBasic(); + } + @ApiOperation(value = "多物理集群-状态", notes = "") @GetMapping(value = "physical-clusters/state") @ResponseBody