From b101cec6faecf75e1fb86f5afe2f4185596133ff Mon Sep 17 00:00:00 2001 From: zengqiao Date: Sat, 29 Oct 2022 13:43:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=81=A5=E5=BA=B7=E5=88=86=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E4=B8=BA=E5=81=A5=E5=BA=B7=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/ClusterZookeepersManagerImpl.java | 32 +- .../impl/VersionControlManagerImpl.java | 8 +- .../km/common/bean/entity/broker/Broker.java | 5 +- .../healthcheck/BaseClusterHealthConfig.java | 5 - .../healthcheck/HealthAmountRatioConfig.java | 19 + .../entity/health/HealthCheckAggResult.java | 83 ++++ .../bean/entity/health/HealthScoreResult.java | 36 +- .../param/zookeeper/ZookeeperParam.java | 26 ++ .../bean/vo/health/HealthCheckConfigVO.java | 3 - .../vo/health/HealthScoreBaseResultVO.java | 6 +- .../km/common/constant/Constant.java | 5 - .../converter/HealthScoreVOConverter.java | 16 +- .../health/HealthCheckDimensionEnum.java | 10 +- .../enums/health/HealthCheckNameEnum.java | 86 +++- .../common/enums/health/HealthStateEnum.java | 2 +- .../broker/impl/BrokerMetricServiceImpl.java | 7 +- .../broker/impl/BrokerServiceImpl.java | 6 +- .../impl/ClusterMetricServiceImpl.java | 16 +- .../group/impl/GroupMetricServiceImpl.java | 8 +- .../HealthCheckZookeeperService.java | 281 ++++++++++++ .../health/state/HealthStateService.java | 50 +++ .../state/impl/HealthStateServiceImpl.java | 408 ++++++++++++++++++ .../topic/impl/TopicMetricServiceImpl.java | 6 +- .../metrics/BrokerMetricVersionItems.java | 6 +- .../metrics/ClusterMetricVersionItems.java | 71 +-- .../metrics/GroupMetricVersionItems.java | 9 +- .../metrics/TopicMetricVersionItems.java | 5 +- .../metrics/ZookeeperMetricVersionItems.java | 22 +- .../impl/ZookeeperMetricServiceImpl.java | 14 + .../api/v3/health/KafkaHealthController.java | 11 +- 30 files changed, 1091 insertions(+), 171 deletions(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/healthcheck/HealthAmountRatioConfig.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthCheckAggResult.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/zookeeper/ZookeeperParam.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java index b285cac9..7783b40b 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java @@ -5,9 +5,7 @@ import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterZookeepersManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; @@ -20,7 +18,6 @@ import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; -import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService; @@ -30,7 +27,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; @Service @@ -56,11 +52,6 @@ public class ClusterZookeepersManagerImpl implements ClusterZookeepersManager { return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); } -// // TODO -// private Integer healthState; -// private Integer healthCheckPassed; -// private Integer healthCheckTotal; - List infoList = zookeeperService.listFromDBByCluster(clusterPhyId); ClusterZookeepersStateVO vo = new ClusterZookeepersStateVO(); @@ -90,12 +81,17 @@ public class ClusterZookeepersManagerImpl implements ClusterZookeepersManager { } } - Result metricsResult = zookeeperMetricService.collectMetricsFromZookeeper(new ZookeeperMetricParam( + // 指标获取 + Result metricsResult = zookeeperMetricService.batchCollectMetricsFromZookeeper( clusterPhyId, - infoList.stream().filter(elem -> elem.alive()).map(item -> new Tuple(item.getHost(), item.getPort())).collect(Collectors.toList()), - ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class), - ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT - )); + Arrays.asList( + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT, + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_HEALTH_STATE, + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED, + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL + ) + + ); if (metricsResult.failed()) { LOGGER.error( "class=ClusterZookeepersManagerImpl||method=getClusterPhyZookeepersState||clusterPhyId={}||errMsg={}", @@ -103,8 +99,12 @@ public class ClusterZookeepersManagerImpl implements ClusterZookeepersManager { ); return Result.buildSuc(vo); } - Float watchCount = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT); - vo.setWatchCount(watchCount != null? watchCount.intValue(): null); + + ZookeeperMetrics metrics = metricsResult.getData(); + vo.setWatchCount(ConvertUtil.float2Integer(metrics.getMetrics().get(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT))); + vo.setHealthState(ConvertUtil.float2Integer(metrics.getMetrics().get(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_HEALTH_STATE))); + vo.setHealthCheckPassed(ConvertUtil.float2Integer(metrics.getMetrics().get(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED))); + vo.setHealthCheckTotal(ConvertUtil.float2Integer(metrics.getMetrics().get(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL))); return Result.buildSuc(vo); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java index 70f4814b..501e4822 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java @@ -47,7 +47,7 @@ public class VersionControlManagerImpl implements VersionControlManager { @PostConstruct public void init(){ - defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_HEALTH_SCORE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_HEALTH_STATE, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_FAILED_FETCH_REQ, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_FAILED_PRODUCE_REQ, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_UNDER_REPLICA_PARTITIONS, true)); @@ -57,7 +57,7 @@ public class VersionControlManagerImpl implements VersionControlManager { defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_BYTES_REJECTED, true)); defaultMetrics.add(new UserMetricConfig(METRIC_TOPIC.getCode(), TOPIC_METRIC_MESSAGE_IN, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_HEALTH_SCORE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_HEALTH_STATE, true)); defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_ACTIVE_CONTROLLER_COUNT, true)); defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_BYTES_IN, true)); defaultMetrics.add(new UserMetricConfig(METRIC_CLUSTER.getCode(), CLUSTER_METRIC_BYTES_OUT, true)); @@ -75,9 +75,9 @@ public class VersionControlManagerImpl implements VersionControlManager { defaultMetrics.add(new UserMetricConfig(METRIC_GROUP.getCode(), GROUP_METRIC_OFFSET_CONSUMED, true)); defaultMetrics.add(new UserMetricConfig(METRIC_GROUP.getCode(), GROUP_METRIC_LAG, true)); defaultMetrics.add(new UserMetricConfig(METRIC_GROUP.getCode(), GROUP_METRIC_STATE, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_GROUP.getCode(), GROUP_METRIC_HEALTH_SCORE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_GROUP.getCode(), GROUP_METRIC_HEALTH_STATE, true)); - defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_HEALTH_SCORE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_HEALTH_STATE, true)); defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_CONNECTION_COUNT, true)); defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_MESSAGE_IN, true)); defaultMetrics.add(new UserMetricConfig(METRIC_BROKER.getCode(), BROKER_METRIC_NETWORK_RPO_AVG_IDLE, true)); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java index 752aade0..513e926e 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.broker; import com.alibaba.fastjson.TypeReference; import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxConfig; import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import lombok.AllArgsConstructor; @@ -65,13 +66,13 @@ public class Broker implements Serializable { */ private Map endpointMap; - public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp) { + public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp, JmxConfig jmxConfig) { Broker metadata = new Broker(); metadata.setClusterPhyId(clusterPhyId); metadata.setBrokerId(node.id()); metadata.setHost(node.host()); metadata.setPort(node.port()); - metadata.setJmxPort(-1); + metadata.setJmxPort(jmxConfig != null ? jmxConfig.getJmxPort() : -1); metadata.setStartTimestamp(startTimestamp); metadata.setRack(node.rack()); metadata.setStatus(1); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/healthcheck/BaseClusterHealthConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/healthcheck/BaseClusterHealthConfig.java index 0e33275a..a22c53fb 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/healthcheck/BaseClusterHealthConfig.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/healthcheck/BaseClusterHealthConfig.java @@ -13,9 +13,4 @@ public class BaseClusterHealthConfig extends BaseClusterConfigValue { * 健康检查名称 */ protected HealthCheckNameEnum checkNameEnum; - - /** - * 权重 - */ - protected Float weight; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/healthcheck/HealthAmountRatioConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/healthcheck/HealthAmountRatioConfig.java new file mode 100644 index 00000000..0c4fb60e --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/healthcheck/HealthAmountRatioConfig.java @@ -0,0 +1,19 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck; + +import lombok.Data; + +/** + * @author wyb + * @date 2022/10/26 + */ +@Data +public class HealthAmountRatioConfig extends BaseClusterHealthConfig { + /** + * 总数 + */ + private Integer amount; + /** + * 比例 + */ + private Double ratio; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthCheckAggResult.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthCheckAggResult.java new file mode 100644 index 00000000..69d65a20 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthCheckAggResult.java @@ -0,0 +1,83 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.health; + +import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +@Data +@NoArgsConstructor +public class HealthCheckAggResult { + private HealthCheckNameEnum checkNameEnum; + + private List poList; + + private Boolean passed; + + public HealthCheckAggResult(HealthCheckNameEnum checkNameEnum, List poList) { + this.checkNameEnum = checkNameEnum; + this.poList = poList; + if (!ValidateUtils.isEmptyList(poList) && poList.stream().filter(elem -> elem.getPassed() <= 0).count() <= 0) { + passed = true; + } else { + passed = false; + } + } + + public Integer getTotalCount() { + if (poList == null) { + return 0; + } + + return poList.size(); + } + + public Integer getPassedCount() { + if (poList == null) { + return 0; + } + return (int) (poList.stream().filter(elem -> elem.getPassed() > 0).count()); + } + + /** + * 计算当前检查的健康分 + * 比如:计算集群Broker健康检查中的某一项的健康分 + */ + public Integer calRawHealthScore() { + if (poList == null || poList.isEmpty()) { + return 100; + } + + return 100 * this.getPassedCount() / this.getTotalCount(); + } + + public List getNotPassedResNameList() { + if (poList == null) { + return new ArrayList<>(); + } + + return poList.stream().filter(elem -> elem.getPassed() <= 0).map(elem -> elem.getResName()).collect(Collectors.toList()); + } + + public Date getCreateTime() { + if (ValidateUtils.isEmptyList(poList)) { + return null; + } + + return poList.get(0).getCreateTime(); + } + + public Date getUpdateTime() { + if (ValidateUtils.isEmptyList(poList)) { + return null; + } + + return poList.get(0).getUpdateTime(); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthScoreResult.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthScoreResult.java index f1af1765..c503c129 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthScoreResult.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthScoreResult.java @@ -17,10 +17,6 @@ import java.util.stream.Collectors; public class HealthScoreResult { private HealthCheckNameEnum checkNameEnum; - private Float presentDimensionTotalWeight; - - private Float allDimensionTotalWeight; - private BaseClusterHealthConfig baseConfig; private List poList; @@ -28,15 +24,11 @@ public class HealthScoreResult { private Boolean passed; public HealthScoreResult(HealthCheckNameEnum checkNameEnum, - Float presentDimensionTotalWeight, - Float allDimensionTotalWeight, BaseClusterHealthConfig baseConfig, List poList) { this.checkNameEnum = checkNameEnum; this.baseConfig = baseConfig; this.poList = poList; - this.presentDimensionTotalWeight = presentDimensionTotalWeight; - this.allDimensionTotalWeight = allDimensionTotalWeight; if (!ValidateUtils.isEmptyList(poList) && poList.stream().filter(elem -> elem.getPassed() <= 0).count() <= 0) { passed = true; } else { @@ -59,32 +51,6 @@ public class HealthScoreResult { return (int) (poList.stream().filter(elem -> elem.getPassed() > 0).count()); } - /** - * 计算所有检查结果的健康分 - * 比如:计算集群健康分 - */ - public Float calAllWeightHealthScore() { - Float healthScore = 100 * baseConfig.getWeight() / allDimensionTotalWeight; - if (poList == null || poList.isEmpty()) { - return 0.0f; - } - - return healthScore * this.getPassedCount() / this.getTotalCount(); - } - - /** - * 计算当前维度的健康分 - * 比如:计算集群Broker健康分 - */ - public Float calDimensionWeightHealthScore() { - Float healthScore = 100 * baseConfig.getWeight() / presentDimensionTotalWeight; - if (poList == null || poList.isEmpty()) { - return 0.0f; - } - - return healthScore * this.getPassedCount() / this.getTotalCount(); - } - /** * 计算当前检查的健康分 * 比如:计算集群Broker健康检查中的某一项的健康分 @@ -102,7 +68,7 @@ public class HealthScoreResult { return new ArrayList<>(); } - return poList.stream().filter(elem -> elem.getPassed() <= 0).map(elem -> elem.getResName()).collect(Collectors.toList()); + return poList.stream().filter(elem -> elem.getPassed() <= 0 && !ValidateUtils.isBlank(elem.getResName())).map(elem -> elem.getResName()).collect(Collectors.toList()); } public Date getCreateTime() { diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/zookeeper/ZookeeperParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/zookeeper/ZookeeperParam.java new file mode 100644 index 00000000..fc111356 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/zookeeper/ZookeeperParam.java @@ -0,0 +1,26 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author didi + */ +@Data +@NoArgsConstructor +public class ZookeeperParam extends ClusterPhyParam { + private List> zkAddressList; + + private ZKConfig zkConfig; + + public ZookeeperParam(Long clusterPhyId, List> zkAddressList, ZKConfig zkConfig) { + super(clusterPhyId); + this.zkAddressList = zkAddressList; + this.zkConfig = zkConfig; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthCheckConfigVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthCheckConfigVO.java index 8980619f..c9857c56 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthCheckConfigVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthCheckConfigVO.java @@ -32,9 +32,6 @@ public class HealthCheckConfigVO { @ApiModelProperty(value="检查说明", example = "Group延迟") private String configDesc; - @ApiModelProperty(value="权重", example = "10") - private Float weight; - @ApiModelProperty(value="检查配置", example = "100") private String value; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthScoreBaseResultVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthScoreBaseResultVO.java index 203977e7..c9abc528 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthScoreBaseResultVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthScoreBaseResultVO.java @@ -18,6 +18,9 @@ public class HealthScoreBaseResultVO extends BaseTimeVO { @ApiModelProperty(value="检查维度", example = "1") private Integer dimension; + @ApiModelProperty(value="检查维度名称", example = "cluster") + private String dimensionName; + @ApiModelProperty(value="检查名称", example = "Group延迟") private String configName; @@ -27,9 +30,6 @@ public class HealthScoreBaseResultVO extends BaseTimeVO { @ApiModelProperty(value="检查说明", example = "Group延迟") private String configDesc; - @ApiModelProperty(value="权重百分比[0-100]", example = "10") - private Integer weightPercent; - @ApiModelProperty(value="得分", example = "100") private Integer score; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index 639ad0f3..59a52c1c 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -35,14 +35,9 @@ public class Constant { public static final Integer DEFAULT_SESSION_TIMEOUT_UNIT_MS = 15000; public static final Integer DEFAULT_REQUEST_TIMEOUT_UNIT_MS = 5000; - public static final Float MIN_HEALTH_SCORE = 10f; - - /** * 指标相关 */ - public static final Integer DEFAULT_CLUSTER_HEALTH_SCORE = 90; - public static final Integer PER_BATCH_MAX_VALUE = 100; public static final String DEFAULT_USER_NAME = "know-streaming-app"; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/HealthScoreVOConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/HealthScoreVOConverter.java index de7ad08f..150306dc 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/HealthScoreVOConverter.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/HealthScoreVOConverter.java @@ -15,24 +15,15 @@ public class HealthScoreVOConverter { private HealthScoreVOConverter() { } - public static List convert2HealthScoreResultDetailVOList(List healthScoreResultList, boolean useGlobalWeight) { - Float globalWeightSum = 1f; - if (!healthScoreResultList.isEmpty()) { - globalWeightSum = healthScoreResultList.get(0).getAllDimensionTotalWeight(); - } - + public static List convert2HealthScoreResultDetailVOList(List healthScoreResultList) { List voList = new ArrayList<>(); for (HealthScoreResult healthScoreResult: healthScoreResultList) { HealthScoreResultDetailVO vo = new HealthScoreResultDetailVO(); vo.setDimension(healthScoreResult.getCheckNameEnum().getDimensionEnum().getDimension()); + vo.setDimensionName(healthScoreResult.getCheckNameEnum().getDimensionEnum().getMessage()); vo.setConfigName(healthScoreResult.getCheckNameEnum().getConfigName()); vo.setConfigItem(healthScoreResult.getCheckNameEnum().getConfigItem()); vo.setConfigDesc(healthScoreResult.getCheckNameEnum().getConfigDesc()); - if (useGlobalWeight) { - vo.setWeightPercent(healthScoreResult.getBaseConfig().getWeight().intValue() * 100 / globalWeightSum.intValue()); - } else { - vo.setWeightPercent(healthScoreResult.getBaseConfig().getWeight().intValue() * 100 / healthScoreResult.getPresentDimensionTotalWeight().intValue()); - } vo.setScore(healthScoreResult.calRawHealthScore()); if (healthScoreResult.getTotalCount() <= 0) { @@ -57,9 +48,9 @@ public class HealthScoreVOConverter { for (HealthScoreResult healthScoreResult: healthScoreResultList) { HealthScoreBaseResultVO vo = new HealthScoreBaseResultVO(); vo.setDimension(healthScoreResult.getCheckNameEnum().getDimensionEnum().getDimension()); + vo.setDimensionName(healthScoreResult.getCheckNameEnum().getDimensionEnum().getMessage()); vo.setConfigName(healthScoreResult.getCheckNameEnum().getConfigName()); vo.setConfigDesc(healthScoreResult.getCheckNameEnum().getConfigDesc()); - vo.setWeightPercent(healthScoreResult.getBaseConfig().getWeight().intValue() * 100 / healthScoreResult.getPresentDimensionTotalWeight().intValue()); vo.setScore(healthScoreResult.calRawHealthScore()); vo.setPassed(healthScoreResult.getPassedCount().equals(healthScoreResult.getTotalCount())); vo.setCheckConfig(convert2HealthCheckConfigVO(ConfigGroupEnum.HEALTH.name(), healthScoreResult.getBaseConfig())); @@ -86,7 +77,6 @@ public class HealthScoreVOConverter { vo.setConfigName(config.getCheckNameEnum().getConfigName()); vo.setConfigItem(config.getCheckNameEnum().getConfigItem()); vo.setConfigDesc(config.getCheckNameEnum().getConfigDesc()); - vo.setWeight(config.getWeight()); vo.setValue(ConvertUtil.obj2Json(config)); return vo; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java index bde25a1f..daa4e641 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java @@ -10,13 +10,15 @@ import lombok.Getter; public enum HealthCheckDimensionEnum { UNKNOWN(-1, "未知"), - CLUSTER(0, "Cluster维度"), + CLUSTER(0, "Cluster"), - BROKER(1, "Broker维度"), + BROKER(1, "Broker"), - TOPIC(2, "Topic维度"), + TOPIC(2, "Topic"), - GROUP(3, "消费组维度"), + GROUP(3, "Group"), + + ZOOKEEPER(4, "Zookeeper"), ; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java index 724f5ed3..5b294e67 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java @@ -1,6 +1,7 @@ package com.xiaojukeji.know.streaming.km.common.enums.health; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthAmountRatioConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthDetectedInLatestMinutesConfig; import com.xiaojukeji.know.streaming.km.common.constant.Constant; @@ -19,7 +20,8 @@ public enum HealthCheckNameEnum { "未知", Constant.HC_CONFIG_NAME_PREFIX + "UNKNOWN", "未知", - BaseClusterHealthConfig.class + BaseClusterHealthConfig.class, + false ), CLUSTER_NO_CONTROLLER( @@ -27,7 +29,8 @@ public enum HealthCheckNameEnum { "Controller", Constant.HC_CONFIG_NAME_PREFIX + "CLUSTER_NO_CONTROLLER", "集群Controller数正常", - HealthCompareValueConfig.class + HealthCompareValueConfig.class, + true ), BROKER_REQUEST_QUEUE_FULL( @@ -35,7 +38,8 @@ public enum HealthCheckNameEnum { "RequestQueueSize", Constant.HC_CONFIG_NAME_PREFIX + "BROKER_REQUEST_QUEUE_FULL", "Broker-RequestQueueSize指标", - HealthCompareValueConfig.class + HealthCompareValueConfig.class, + false ), BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW( @@ -43,7 +47,8 @@ public enum HealthCheckNameEnum { "NetworkProcessorAvgIdlePercent", Constant.HC_CONFIG_NAME_PREFIX + "BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW", "Broker-NetworkProcessorAvgIdlePercent指标", - HealthCompareValueConfig.class + HealthCompareValueConfig.class, + false ), GROUP_RE_BALANCE_TOO_FREQUENTLY( @@ -51,7 +56,8 @@ public enum HealthCheckNameEnum { "Group Re-Balance", Constant.HC_CONFIG_NAME_PREFIX + "GROUP_RE_BALANCE_TOO_FREQUENTLY", "Group re-balance频率", - HealthDetectedInLatestMinutesConfig.class + HealthDetectedInLatestMinutesConfig.class, + false ), TOPIC_NO_LEADER( @@ -59,7 +65,8 @@ public enum HealthCheckNameEnum { "NoLeader", Constant.HC_CONFIG_NAME_PREFIX + "TOPIC_NO_LEADER", "Topic 无Leader数", - HealthCompareValueConfig.class + HealthCompareValueConfig.class, + false ), TOPIC_UNDER_REPLICA_TOO_LONG( @@ -67,9 +74,66 @@ public enum HealthCheckNameEnum { "UnderReplicaTooLong", Constant.HC_CONFIG_NAME_PREFIX + "TOPIC_UNDER_REPLICA_TOO_LONG", "Topic 未同步持续时间", - HealthDetectedInLatestMinutesConfig.class + HealthDetectedInLatestMinutesConfig.class, + false ), + ZK_BRAIN_SPLIT( + HealthCheckDimensionEnum.ZOOKEEPER, + "BrainSplit", + Constant.HC_CONFIG_NAME_PREFIX + "ZK_BRAIN_SPLIT", + "ZK 脑裂", + HealthCompareValueConfig.class, + true + ), + + ZK_OUTSTANDING_REQUESTS( + HealthCheckDimensionEnum.ZOOKEEPER, + "OutstandingRequests", + Constant.HC_CONFIG_NAME_PREFIX + "ZK_OUTSTANDING_REQUESTS", + "ZK Outstanding 请求堆积数", + HealthAmountRatioConfig.class, + false + ), + + ZK_WATCH_COUNT( + HealthCheckDimensionEnum.ZOOKEEPER, + "WatchCount", + Constant.HC_CONFIG_NAME_PREFIX + "ZK_WATCH_COUNT", + "ZK WatchCount 数", + HealthAmountRatioConfig.class, + false + ), + + ZK_ALIVE_CONNECTIONS( + HealthCheckDimensionEnum.ZOOKEEPER, + "AliveConnections", + Constant.HC_CONFIG_NAME_PREFIX + "ZK_ALIVE_CONNECTIONS", + "ZK 连接数", + HealthAmountRatioConfig.class, + false + ), + + ZK_APPROXIMATE_DATA_SIZE( + HealthCheckDimensionEnum.ZOOKEEPER, + "ApproximateDataSize", + Constant.HC_CONFIG_NAME_PREFIX + "ZK_APPROXIMATE_DATA_SIZE", + "ZK 数据大小(Byte)", + HealthAmountRatioConfig.class, + false + ), + + ZK_SENT_RATE( + HealthCheckDimensionEnum.ZOOKEEPER, + "SentRate", + Constant.HC_CONFIG_NAME_PREFIX + "ZK_SENT_RATE", + "ZK 发包数", + HealthAmountRatioConfig.class, + false + ), + + + ; /** @@ -97,12 +161,18 @@ public enum HealthCheckNameEnum { */ private final Class configClazz; - HealthCheckNameEnum(HealthCheckDimensionEnum dimensionEnum, String configItem, String configName, String configDesc, Class configClazz) { + /** + * 是可用性检查? + */ + private final boolean availableChecker; + + HealthCheckNameEnum(HealthCheckDimensionEnum dimensionEnum, String configItem, String configName, String configDesc, Class configClazz, boolean availableChecker) { this.dimensionEnum = dimensionEnum; this.configItem = configItem; this.configName = configName; this.configDesc = configDesc; this.configClazz = configClazz; + this.availableChecker = availableChecker; } public static HealthCheckNameEnum getByName(String configName) { diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthStateEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthStateEnum.java index a9490fb6..8bb533f9 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthStateEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthStateEnum.java @@ -16,7 +16,7 @@ public enum HealthStateEnum { POOR(2, "差"), - DEAD(3, "宕机"), + DEAD(3, "Down"), ; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java index e82882e1..1a08c85c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java @@ -28,7 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; -import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService; +import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; @@ -82,7 +82,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker private ReplicaMetricService replicaMetricService; @Autowired - private HealthScoreService healthScoreService; + private HealthStateService healthStateService; @Autowired private KafkaJMXClient kafkaJMXClient; @@ -108,7 +108,6 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker registerVCHandler( BROKER_METHOD_GET_HEALTH_SCORE, this::getMetricHealthScore); registerVCHandler( BROKER_METHOD_GET_PARTITIONS_SKEW, this::getPartitionsSkew); registerVCHandler( BROKER_METHOD_GET_LEADERS_SKEW, this::getLeadersSkew); -// registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize); registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_0_10_0_0, V_1_0_0, "getLogSizeFromJmx", this::getLogSizeFromJmx); registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_1_0_0, V_MAX, "getLogSizeFromClient", this::getLogSizeFromClient); @@ -318,7 +317,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker Long clusterId = param.getClusterId(); Integer brokerId = param.getBrokerId(); - BrokerMetrics brokerMetrics = healthScoreService.calBrokerHealthScore(clusterId, brokerId); + BrokerMetrics brokerMetrics = healthStateService.calBrokerHealthMetrics(clusterId, brokerId); return Result.buildSuc(brokerMetrics); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index 3fd74ee5..47f3bdbe 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -134,7 +134,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok newBrokerPO.setId(inDBBrokerPO.getId()); newBrokerPO.setStatus(Constant.ALIVE); newBrokerPO.setCreateTime(inDBBrokerPO.getCreateTime()); - newBrokerPO.setUpdateTime(inDBBrokerPO.getUpdateTime()); + newBrokerPO.setUpdateTime(new Date()); if (newBrokerPO.getStartTimestamp() == null) { // 如果当前broker获取不到启动时间 // 如果DB中的broker状态为down,则使用当前时间,否则使用db中已有broker的时间 @@ -363,11 +363,11 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok try { Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), null, jmxConfig); - return Broker.buildFrom(clusterPhyId, newNode, startTime); + return Broker.buildFrom(clusterPhyId, newNode, startTime, jmxConfig); } catch (Exception e) { log.error("class=BrokerServiceImpl||method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e); } - return Broker.buildFrom(clusterPhyId, newNode, null); + return Broker.buildFrom(clusterPhyId, newNode, null, jmxConfig); } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java index 3d004f78..bdd652aa 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java @@ -39,7 +39,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; -import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService; +import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.job.JobService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; @@ -85,7 +85,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust public static final String CLUSTER_METHOD_GET_TOTAL_LOG_SIZE = "getTotalLogSize"; public static final String CLUSTER_METHOD_GET_PARTITION_SIZE = "getPartitionSize"; public static final String CLUSTER_METHOD_GET_PARTITION_NO_LEADER_SIZE = "getPartitionNoLeaderSize"; - public static final String CLUSTER_METHOD_GET_HEALTH_SCORE = "getClusterHealthScore"; + public static final String CLUSTER_METHOD_GET_HEALTH_METRICS = "getClusterHealthMetrics"; public static final String CLUSTER_METHOD_GET_METRIC_FROM_KAFKA_BY_TOTAL_BROKERS_JMX = "getMetricFromKafkaByTotalBrokersJMX"; public static final String CLUSTER_METHOD_GET_METRIC_FROM_KAFKA_BY_CONTROLLER_JMX = "getMetricFromKafkaByControllerJMX"; public static final String CLUSTER_METHOD_GET_ZK_COUNT = "getZKCount"; @@ -114,7 +114,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust public static final String CLUSTER_METHOD_GET_JOBS_FAILED = "getJobsFailed"; @Autowired - private HealthScoreService healthScoreService; + private HealthStateService healthStateService; @Autowired private BrokerService brokerService; @@ -188,7 +188,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust registerVCHandler( CLUSTER_METHOD_GET_PARTITION_SIZE, this::getPartitionSize); registerVCHandler( CLUSTER_METHOD_GET_PARTITION_NO_LEADER_SIZE, this::getPartitionNoLeaderSize); - registerVCHandler( CLUSTER_METHOD_GET_HEALTH_SCORE, this::getClusterHealthScore); + registerVCHandler( CLUSTER_METHOD_GET_HEALTH_METRICS, this::getClusterHealthMetrics); registerVCHandler( CLUSTER_METHOD_GET_METRIC_FROM_KAFKA_BY_TOTAL_BROKERS_JMX, this::getMetricFromKafkaByTotalBrokersJMX); registerVCHandler( CLUSTER_METHOD_GET_METRIC_FROM_KAFKA_BY_CONTROLLER_JMX, this::getMetricFromKafkaByControllerJMX); @@ -361,15 +361,13 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust } - /** - * 获取集群的健康分 - */ - private Result getClusterHealthScore(VersionItemParam metricParam){ + private Result getClusterHealthMetrics(VersionItemParam metricParam){ ClusterMetricParam param = (ClusterMetricParam)metricParam; - ClusterMetrics clusterMetrics = healthScoreService.calClusterHealthScore(param.getClusterId()); + ClusterMetrics clusterMetrics = healthStateService.calClusterHealthMetrics(param.getClusterId()); return Result.buildSuc(clusterMetrics); } + /** * 获取集群的 totalLogSize * @param metricParam diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java index 936897a3..6d38f2a3 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java @@ -20,7 +20,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; -import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService; +import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO; @@ -64,7 +64,7 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe private GroupService groupService; @Autowired - private HealthScoreService healthScoreService; + private HealthStateService healthStateService; @Autowired private PartitionService partitionService; @@ -265,8 +265,8 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe private Result> getMetricHealthScore(VersionItemParam param) { GroupMetricParam groupMetricParam = (GroupMetricParam)param; - return Result.buildSuc(Arrays.asList(healthScoreService.calGroupHealthScore( - groupMetricParam.getClusterPhyId(), groupMetricParam.getGroupName())) + return Result.buildSuc(Arrays.asList( + healthStateService.calGroupHealthMetrics(groupMetricParam.getClusterPhyId(), groupMetricParam.getGroupName())) ); } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java new file mode 100644 index 00000000..5d3e658d --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java @@ -0,0 +1,281 @@ +package com.xiaojukeji.know.streaming.km.core.service.health.checker.zookeeper; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthAmountRatioConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.zookeeper.ZookeeperParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; +import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.ZookeeperUtils; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@Service +public class HealthCheckZookeeperService extends AbstractHealthCheckService { + private static final ILog log = LogFactory.getLog(HealthCheckZookeeperService.class); + + @Autowired + private ClusterPhyService clusterPhyService; + + @Autowired + private ZookeeperService zookeeperService; + + @Autowired + private ZookeeperMetricService zookeeperMetricService; + + @PostConstruct + private void init() { + functionMap.putIfAbsent(HealthCheckNameEnum.ZK_BRAIN_SPLIT.getConfigName(), this::checkBrainSplit); + functionMap.putIfAbsent(HealthCheckNameEnum.ZK_OUTSTANDING_REQUESTS.getConfigName(), this::checkOutstandingRequests); + functionMap.putIfAbsent(HealthCheckNameEnum.ZK_WATCH_COUNT.getConfigName(), this::checkWatchCount); + functionMap.putIfAbsent(HealthCheckNameEnum.ZK_ALIVE_CONNECTIONS.getConfigName(), this::checkAliveConnections); + functionMap.putIfAbsent(HealthCheckNameEnum.ZK_APPROXIMATE_DATA_SIZE.getConfigName(), this::checkApproximateDataSize); + functionMap.putIfAbsent(HealthCheckNameEnum.ZK_SENT_RATE.getConfigName(), this::checkSentRate); + } + + @Override + public List getResList(Long clusterPhyId) { + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId); + if (clusterPhy == null) { + return new ArrayList<>(); + } + + try { + return Arrays.asList(new ZookeeperParam( + clusterPhyId, + ZookeeperUtils.connectStringParser(clusterPhy.getZookeeper()), + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class) + )); + } catch (Exception e) { + log.error("class=HealthCheckZookeeperService||method=getResList||clusterPhyId={}||errMsg=exception!", clusterPhyId, e); + } + + return new ArrayList<>(); + } + + @Override + public HealthCheckDimensionEnum getHealthCheckDimensionEnum() { + return HealthCheckDimensionEnum.ZOOKEEPER; + } + + private HealthCheckResult checkBrainSplit(Tuple singleConfigSimpleTuple) { + ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); + HealthCompareValueConfig valueConfig = (HealthCompareValueConfig) singleConfigSimpleTuple.getV2(); + + List infoList = zookeeperService.listFromDBByCluster(param.getClusterPhyId()); + HealthCheckResult checkResult = new HealthCheckResult( + HealthCheckDimensionEnum.ZOOKEEPER.getDimension(), + HealthCheckNameEnum.ZK_BRAIN_SPLIT.getConfigName(), + param.getClusterPhyId(), + "" + ); + + long value = infoList.stream().filter(elem -> ZKRoleEnum.LEADER.getRole().equals(elem.getRole())).count(); + + checkResult.setPassed(value == valueConfig.getValue().longValue() ? Constant.YES : Constant.NO); + return checkResult; + } + + private HealthCheckResult checkOutstandingRequests(Tuple singleConfigSimpleTuple) { + ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); + HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); + + Result metricsResult = zookeeperMetricService.collectMetricsFromZookeeper( + new ZookeeperMetricParam( + param.getClusterPhyId(), + param.getZkAddressList(), + param.getZkConfig(), + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS + ) + ); + if (metricsResult.failed() || !metricsResult.hasData()) { + log.error( + "class=HealthCheckZookeeperService||method=checkOutstandingRequests||param={}||config={}||result={}||errMsg=get metrics failed", + param, valueConfig, metricsResult + ); + return null; + } + + HealthCheckResult checkResult = new HealthCheckResult( + HealthCheckDimensionEnum.ZOOKEEPER.getDimension(), + HealthCheckNameEnum.ZK_OUTSTANDING_REQUESTS.getConfigName(), + param.getClusterPhyId(), + "" + ); + + Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS); + + + checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO); + + return checkResult; + } + + private HealthCheckResult checkWatchCount(Tuple singleConfigSimpleTuple) { + ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); + HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); + + Result metricsResult = zookeeperMetricService.collectMetricsFromZookeeper( + new ZookeeperMetricParam( + param.getClusterPhyId(), + param.getZkAddressList(), + param.getZkConfig(), + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT + ) + ); + + if (metricsResult.failed() || !metricsResult.hasData()) { + log.error( + "class=HealthCheckZookeeperService||method=checkWatchCount||param={}||config={}||result={}||errMsg=get metrics failed", + param, valueConfig, metricsResult + ); + return null; + } + + HealthCheckResult checkResult = new HealthCheckResult( + HealthCheckDimensionEnum.ZOOKEEPER.getDimension(), + HealthCheckNameEnum.ZK_WATCH_COUNT.getConfigName(), + param.getClusterPhyId(), + "" + ); + + Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT); + + + checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO); + + return checkResult; + } + + private HealthCheckResult checkAliveConnections(Tuple singleConfigSimpleTuple) { + ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); + HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); + + Result metricsResult = zookeeperMetricService.collectMetricsFromZookeeper( + new ZookeeperMetricParam( + param.getClusterPhyId(), + param.getZkAddressList(), + param.getZkConfig(), + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS + ) + ); + + if (metricsResult.failed() || !metricsResult.hasData()) { + log.error( + "class=HealthCheckZookeeperService||method=checkAliveConnections||param={}||config={}||result={}||errMsg=get metrics failed", + param, valueConfig, metricsResult + ); + return null; + } + + HealthCheckResult checkResult = new HealthCheckResult( + HealthCheckDimensionEnum.ZOOKEEPER.getDimension(), + HealthCheckNameEnum.ZK_ALIVE_CONNECTIONS.getConfigName(), + param.getClusterPhyId(), + "" + ); + + Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS); + + + checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO); + + return checkResult; + } + + private HealthCheckResult checkApproximateDataSize(Tuple singleConfigSimpleTuple) { + ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); + HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); + + Result metricsResult = zookeeperMetricService.collectMetricsFromZookeeper( + new ZookeeperMetricParam( + param.getClusterPhyId(), + param.getZkAddressList(), + param.getZkConfig(), + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE + ) + ); + + if (metricsResult.failed() || !metricsResult.hasData()) { + log.error( + "class=HealthCheckZookeeperService||method=checkApproximateDataSize||param={}||config={}||result={}||errMsg=get metrics failed", + param, valueConfig, metricsResult + ); + return null; + } + + HealthCheckResult checkResult = new HealthCheckResult( + HealthCheckDimensionEnum.ZOOKEEPER.getDimension(), + HealthCheckNameEnum.ZK_APPROXIMATE_DATA_SIZE.getConfigName(), + param.getClusterPhyId(), + "" + ); + + Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE); + + + checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO); + + return checkResult; + } + + private HealthCheckResult checkSentRate(Tuple singleConfigSimpleTuple) { + ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); + HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); + + Result metricsResult = zookeeperMetricService.collectMetricsFromZookeeper( + new ZookeeperMetricParam( + param.getClusterPhyId(), + param.getZkAddressList(), + param.getZkConfig(), + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_PACKETS_SENT + ) + ); + + if (metricsResult.failed() || !metricsResult.hasData()) { + log.error( + "class=HealthCheckZookeeperService||method=checkSentRate||param={}||config={}||result={}||errMsg=get metrics failed", + param, valueConfig, metricsResult + ); + return null; + } + + HealthCheckResult checkResult = new HealthCheckResult( + HealthCheckDimensionEnum.ZOOKEEPER.getDimension(), + HealthCheckNameEnum.ZK_SENT_RATE.getConfigName(), + param.getClusterPhyId(), + "" + ); + + Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_PACKETS_SENT); + + + checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO); + + return checkResult; + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java new file mode 100644 index 00000000..3ef0a82d --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java @@ -0,0 +1,50 @@ +package com.xiaojukeji.know.streaming.km.core.service.health.state; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; + +import java.util.List; + + +public interface HealthStateService { + /** + * 集群健康指标 + */ + ClusterMetrics calClusterHealthMetrics(Long clusterPhyId); + + /** + * 获取Broker健康指标 + */ + BrokerMetrics calBrokerHealthMetrics(Long clusterPhyId, Integer brokerId); + + /** + * 获取Topic健康指标 + */ + TopicMetrics calTopicHealthMetrics(Long clusterPhyId, String topicName); + + /** + * 获取Group健康指标 + */ + GroupMetrics calGroupHealthMetrics(Long clusterPhyId, String groupName); + + /** + * 获取Zookeeper健康指标 + */ + ZookeeperMetrics calZookeeperHealthMetrics(Long clusterPhyId); + + /** + * 获取集群健康检查结果 + */ + List getClusterHealthResult(Long clusterPhyId); + + /** + * 获取集群某个维度健康检查结果 + */ + List getDimensionHealthResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum); + + /** + * 获取集群某个资源的健康检查结果 + */ + List getResHealthResult(Long clusterPhyId, Integer dimension, String resNme); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java new file mode 100644 index 00000000..7f41b0d8 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java @@ -0,0 +1,408 @@ +package com.xiaojukeji.know.streaming.km.core.service.health.state.impl; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAggResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*; +import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; +import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; +import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import org.apache.commons.collections.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.BrokerMetricVersionItems.*; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.BrokerMetricVersionItems.BROKER_METRIC_HEALTH_STATE; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.ClusterMetricVersionItems.*; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.GroupMetricVersionItems.*; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.GroupMetricVersionItems.GROUP_METRIC_HEALTH_CHECK_TOTAL; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems.*; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems.TOPIC_METRIC_HEALTH_CHECK_TOTAL; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems.*; + + +@Service +public class HealthStateServiceImpl implements HealthStateService { + @Autowired + private HealthCheckResultService healthCheckResultService; + + @Autowired + private ZookeeperService zookeeperService; + + @Autowired + private BrokerService brokerService; + + @Override + public ClusterMetrics calClusterHealthMetrics(Long clusterPhyId) { + ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); + + // 集群维度指标 + List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.CLUSTER); + if (ValidateUtils.isEmptyList(resultList)) { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER, 0.0f); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER, 0.0f); + } else { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER, this.getHealthCheckPassed(resultList)); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER, (float)resultList.size()); + } + + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_CLUSTER, (float)this.calHealthState(resultList).getDimension()); + + // 获取指标 + metrics.putMetric(this.calClusterBrokersHealthMetrics(clusterPhyId).getMetrics()); + metrics.putMetric(this.calClusterTopicsHealthMetrics(clusterPhyId).getMetrics()); + metrics.putMetric(this.calClusterGroupsHealthMetrics(clusterPhyId).getMetrics()); + metrics.putMetric(this.calZookeeperHealthMetrics(clusterPhyId).getMetrics()); + + // 统计最终结果 + Float passed = 0.0f; + passed += metrics.getMetric(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED); + passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS); + passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS); + passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS); + passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER); + + Float total = 0.0f; + total += metrics.getMetric(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL); + total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS); + total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS); + total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS); + total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER); + + // 状态 + Float state = 0.0f; + state = Math.max(state, metrics.getMetric(ZOOKEEPER_METRIC_HEALTH_STATE)); + state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_TOPICS)); + state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_BROKERS)); + state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_GROUPS)); + state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_CLUSTER)); + + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED, passed); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL, total); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE, state); + + return metrics; + } + + @Override + public BrokerMetrics calBrokerHealthMetrics(Long clusterPhyId, Integer brokerId) { + List healthScoreResultList = this.getResHealthResult(clusterPhyId, HealthCheckDimensionEnum.BROKER.getDimension(), String.valueOf(brokerId)); + + BrokerMetrics metrics = new BrokerMetrics(clusterPhyId, brokerId); + if (ValidateUtils.isEmptyList(healthScoreResultList)) { + metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension()); + metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_PASSED, 0.0f); + metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_TOTAL, 0.0f); + } else { + metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_PASSED, getHealthCheckResultPassed(healthScoreResultList)); + metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_TOTAL, Float.valueOf(healthScoreResultList.size())); + + // 计算健康状态 + Broker broker = brokerService.getBrokerFromCacheFirst(clusterPhyId, brokerId); + if (broker == null) { + // DB中不存在,则默认是存活的 + metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension()); + } else if (!broker.alive()) { + metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.DEAD.getDimension()); + } else { + metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)this.calHealthScoreResultState(healthScoreResultList).getDimension()); + } + } + + return metrics; + } + + @Override + public TopicMetrics calTopicHealthMetrics(Long clusterPhyId, String topicName) { + List healthScoreResultList = this.getResHealthResult(clusterPhyId, HealthCheckDimensionEnum.TOPIC.getDimension(), topicName); + + TopicMetrics metrics = new TopicMetrics(topicName, clusterPhyId,true); + if (ValidateUtils.isEmptyList(healthScoreResultList)) { + metrics.getMetrics().put(TOPIC_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension()); + metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_PASSED, 0.0f); + metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_TOTAL, 0.0f); + } else { + metrics.getMetrics().put(TOPIC_METRIC_HEALTH_STATE, (float)this.calHealthScoreResultState(healthScoreResultList).getDimension()); + metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckResultPassed(healthScoreResultList)); + metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_TOTAL, Float.valueOf(healthScoreResultList.size())); + } + + return metrics; + } + + @Override + public GroupMetrics calGroupHealthMetrics(Long clusterPhyId, String groupName) { + List healthScoreResultList = this.getResHealthResult(clusterPhyId, HealthCheckDimensionEnum.GROUP.getDimension(), groupName); + + GroupMetrics metrics = new GroupMetrics(clusterPhyId, groupName, true); + if (ValidateUtils.isEmptyList(healthScoreResultList)) { + metrics.getMetrics().put(GROUP_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension()); + metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_PASSED, 0.0f); + metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_TOTAL, 0.0f); + } else { + metrics.getMetrics().put(GROUP_METRIC_HEALTH_STATE, (float)this.calHealthScoreResultState(healthScoreResultList).getDimension()); + metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_PASSED, getHealthCheckResultPassed(healthScoreResultList)); + metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_TOTAL, Float.valueOf(healthScoreResultList.size())); + } + + return metrics; + } + + @Override + public ZookeeperMetrics calZookeeperHealthMetrics(Long clusterPhyId) { + List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.ZOOKEEPER); + + ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId); + if (ValidateUtils.isEmptyList(resultList)) { + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED, 0.0f); + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL, 0.0f); + } else { + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckPassed(resultList)); + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL, (float)resultList.size()); + } + + if (zookeeperService.allServerDown(clusterPhyId)) { + // 所有服务挂掉 + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_STATE, (float)HealthStateEnum.DEAD.getDimension()); + return metrics; + } + + if (zookeeperService.existServerDown(clusterPhyId)) { + // 存在服务挂掉 + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_STATE, (float)HealthStateEnum.POOR.getDimension()); + return metrics; + } + + // 服务未挂时,依据检查结果计算状态 + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_STATE, (float)this.calHealthState(resultList).getDimension()); + return metrics; + } + + @Override + public List getClusterHealthResult(Long clusterPhyId) { + List poList = healthCheckResultService.getClusterHealthCheckResult(clusterPhyId); + + // <检查项,<检查结果>> + Map> checkResultMap = new HashMap<>(); + for (HealthCheckResultPO po: poList) { + checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); + checkResultMap.get(po.getConfigName()).add(po); + } + + Map configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId); + + List healthScoreResultList = new ArrayList<>(); + for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.values()) { + BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName()); + if (baseConfig == null) { + continue; + } + + healthScoreResultList.add(new HealthScoreResult( + nameEnum, + baseConfig, + checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>())) + ); + } + + return healthScoreResultList; + } + + @Override + public List getDimensionHealthResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum) { + List poList = healthCheckResultService.getClusterResourcesHealthCheckResult(clusterPhyId, dimensionEnum.getDimension()); + + // <检查项,<通过的数量,不通过的数量>> + Map> checkResultMap = new HashMap<>(); + for (HealthCheckResultPO po: poList) { + checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); + checkResultMap.get(po.getConfigName()).add(po); + } + + Map configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId); + + List healthScoreResultList = new ArrayList<>(); + for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimension(dimensionEnum)) { + BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName()); + if (baseConfig == null) { + continue; + } + + healthScoreResultList.add(new HealthScoreResult(nameEnum, baseConfig, checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>()))); + } + + return healthScoreResultList; + } + + @Override + public List getResHealthResult(Long clusterPhyId, Integer dimension, String resNme) { + List poList = healthCheckResultService.getResHealthCheckResult(clusterPhyId, dimension, resNme); + Map> checkResultMap = new HashMap<>(); + for (HealthCheckResultPO po: poList) { + checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); + checkResultMap.get(po.getConfigName()).add(po); + } + + Map configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId); + + List healthScoreResultList = new ArrayList<>(); + for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimensionCode(dimension)) { + BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName()); + if (baseConfig == null) { + continue; + } + + healthScoreResultList.add(new HealthScoreResult(nameEnum, baseConfig, checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>()))); + } + + return healthScoreResultList; + } + + + /**************************************************** private method ****************************************************/ + + + private ClusterMetrics calClusterTopicsHealthMetrics(Long clusterPhyId) { + List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.TOPIC); + + ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); + if (ValidateUtils.isEmptyList(resultList)) { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS, 0.0f); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS, 0.0f); + } else { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS, this.getHealthCheckPassed(resultList)); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS, (float)resultList.size()); + } + + // 服务未挂时,依据检查结果计算状态 + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_TOPICS, (float)this.calHealthState(resultList).getDimension()); + return metrics; + } + + private ClusterMetrics calClusterGroupsHealthMetrics(Long clusterPhyId) { + List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.GROUP); + + ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); + if (ValidateUtils.isEmptyList(resultList)) { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS, 0.0f); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS, 0.0f); + } else { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS, this.getHealthCheckPassed(resultList)); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS, (float)resultList.size()); + } + + // 服务未挂时,依据检查结果计算状态 + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_GROUPS, (float)this.calHealthState(resultList).getDimension()); + return metrics; + } + + private ClusterMetrics calClusterBrokersHealthMetrics(Long clusterPhyId) { + List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.BROKER); + + ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); + if (ValidateUtils.isEmptyList(resultList)) { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS, 0.0f); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS, 0.0f); + } else { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS, this.getHealthCheckPassed(resultList)); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS, (float)resultList.size()); + } + + if (brokerService.allServerDown(clusterPhyId)) { + // 所有服务挂掉 + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_BROKERS, (float)HealthStateEnum.DEAD.getDimension()); + return metrics; + } + + if (brokerService.existServerDown(clusterPhyId)) { + // 存在服务挂掉 + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_BROKERS, (float)HealthStateEnum.POOR.getDimension()); + return metrics; + } + + // 服务未挂时,依据检查结果计算状态 + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_BROKERS, (float)this.calHealthState(resultList).getDimension()); + return metrics; + } + + private List getDimensionHealthCheckAggResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum) { + List poList = healthCheckResultService.getClusterResourcesHealthCheckResult(clusterPhyId, dimensionEnum.getDimension()); + + Map /*检查结果列表*/> groupByCheckNamePOMap = new HashMap<>(); + for (HealthCheckResultPO po: poList) { + groupByCheckNamePOMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); + groupByCheckNamePOMap.get(po.getConfigName()).add(po); + } + + List stateList = new ArrayList<>(); + for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimension(dimensionEnum)) { + stateList.add(new HealthCheckAggResult(nameEnum, groupByCheckNamePOMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>()))); + } + + return stateList; + } + + private float getHealthCheckPassed(List resultList){ + if(ValidateUtils.isEmptyList(resultList)) { + return 0f; + } + + return Float.valueOf(resultList.stream().filter(elem -> elem.getPassed()).count()); + } + + private HealthStateEnum calHealthState(List resultList) { + if(ValidateUtils.isEmptyList(resultList)) { + return HealthStateEnum.GOOD; + } + + boolean existNotPassed = false; + for (HealthCheckAggResult aggResult: resultList) { + if (aggResult.getCheckNameEnum().isAvailableChecker() && !aggResult.getPassed()) { + return HealthStateEnum.POOR; + } + + if (!aggResult.getPassed()) { + existNotPassed = true; + } + } + + return existNotPassed? HealthStateEnum.MEDIUM: HealthStateEnum.GOOD; + } + + private float getHealthCheckResultPassed(List healthScoreResultList){ + if(CollectionUtils.isEmpty(healthScoreResultList)){return 0f;} + + return Float.valueOf(healthScoreResultList.stream().filter(elem -> elem.getPassed()).count()); + } + + private HealthStateEnum calHealthScoreResultState(List resultList) { + if(ValidateUtils.isEmptyList(resultList)) { + return HealthStateEnum.GOOD; + } + + boolean existNotPassed = false; + for (HealthScoreResult aggResult: resultList) { + if (aggResult.getCheckNameEnum().isAvailableChecker() && !aggResult.getPassed()) { + return HealthStateEnum.POOR; + } + + if (!aggResult.getPassed()) { + existNotPassed = true; + } + } + + return existNotPassed? HealthStateEnum.MEDIUM: HealthStateEnum.GOOD; + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java index 478c142b..703bf59d 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java @@ -31,7 +31,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; -import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService; +import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; @@ -69,7 +69,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe public static final String TOPIC_METHOD_GET_REPLICAS_COUNT = "getReplicasCount"; @Autowired - private HealthScoreService healthScoreService; + private HealthStateService healthStateService; @Autowired private KafkaJMXClient kafkaJMXClient; @@ -394,7 +394,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe String topic = topicMetricParam.getTopic(); Long clusterId = topicMetricParam.getClusterId(); - TopicMetrics topicMetric = healthScoreService.calTopicHealthScore(clusterId, topic); + TopicMetrics topicMetric = healthStateService.calTopicHealthMetrics(clusterId, topic); return Result.buildSuc(Arrays.asList(topicMetric)); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/BrokerMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/BrokerMetricVersionItems.java index e5502cc4..be68ac06 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/BrokerMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/BrokerMetricVersionItems.java @@ -16,7 +16,7 @@ import static com.xiaojukeji.know.streaming.km.core.service.broker.impl.BrokerMe @Component public class BrokerMetricVersionItems extends BaseMetricVersionMetric { - public static final String BROKER_METRIC_HEALTH_SCORE = "HealthScore"; + public static final String BROKER_METRIC_HEALTH_STATE = "HealthState"; public static final String BROKER_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed"; public static final String BROKER_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal"; public static final String BROKER_METRIC_TOTAL_REQ_QUEUE = "TotalRequestQueueSize"; @@ -57,8 +57,8 @@ public class BrokerMetricVersionItems extends BaseMetricVersionMetric { // HealthScore 指标 items.add(buildAllVersionsItem() - .name(BROKER_METRIC_HEALTH_SCORE).unit("分").desc("健康分").category(CATEGORY_HEALTH) - .extendMethod(BROKER_METHOD_GET_HEALTH_SCORE)); + .name(BROKER_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH) + .extendMethod( BROKER_METHOD_GET_HEALTH_SCORE )); items.add(buildAllVersionsItem() .name(BROKER_METRIC_HEALTH_CHECK_PASSED ).unit("个").desc("健康检查通过数").category(CATEGORY_HEALTH) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java index 00a5e0cd..ea81da3a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java @@ -19,36 +19,41 @@ import static com.xiaojukeji.know.streaming.km.core.service.cluster.impl.Cluster */ @Component public class ClusterMetricVersionItems extends BaseMetricVersionMetric { - /** - * 健康分 - */ - public static final String CLUSTER_METRIC_HEALTH_SCORE = "HealthScore"; - public static final String CLUSTER_METRIC_HEALTH_SCORE_TOPICS = "HealthScore_Topics"; - public static final String CLUSTER_METRIC_HEALTH_SCORE_BROKERS = "HealthScore_Brokers"; - public static final String CLUSTER_METRIC_HEALTH_SCORE_GROUPS = "HealthScore_Groups"; - public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster"; - - /** - * 健康巡检 + * 整体的健康指标 */ + public static final String CLUSTER_METRIC_HEALTH_STATE = "HealthState"; public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed"; public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal"; + /** + * Topics健康指标 + */ + public static final String CLUSTER_METRIC_HEALTH_STATE_TOPICS = "HealthState_Topics"; public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS = "HealthCheckPassed_Topics"; public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS = "HealthCheckTotal_Topics"; + /** + * Brokers健康指标 + */ + public static final String CLUSTER_METRIC_HEALTH_STATE_BROKERS = "HealthState_Brokers"; public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS = "HealthCheckPassed_Brokers"; public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS = "HealthCheckTotal_Brokers"; + /** + * Groups健康指标 + */ + public static final String CLUSTER_METRIC_HEALTH_STATE_GROUPS = "HealthState_Groups"; public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS = "HealthCheckPassed_Groups"; public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS = "HealthCheckTotal_Groups"; + /** + * Cluster健康指标 + */ + public static final String CLUSTER_METRIC_HEALTH_STATE_CLUSTER = "HealthState_Cluster"; public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster"; public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster"; - - public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize"; public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize"; public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize"; @@ -113,64 +118,64 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric { // HealthScore 指标 itemList.add(buildAllVersionsItem() - .name(CLUSTER_METRIC_HEALTH_SCORE).unit("分").desc("集群总体的健康分").category(CATEGORY_HEALTH) - .extendMethod(CLUSTER_METHOD_GET_HEALTH_SCORE)); + .name(CLUSTER_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕").desc("集群健康状态(0:好 1:中 2:差 3:宕)").category(CATEGORY_HEALTH) + .extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS)); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_PASSED).unit("个").desc("集群总体健康检查通过数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL).unit("个").desc("集群总体健康检查总数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); itemList.add(buildAllVersionsItem() - .name(CLUSTER_METRIC_HEALTH_SCORE_TOPICS).unit("分").desc("集群Topics的健康分").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .name(CLUSTER_METRIC_HEALTH_STATE_TOPICS).unit("0:好 1:中 2:差 3:宕机").desc("集群Topics健康状态").category(CATEGORY_HEALTH) + .extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS)); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS).unit("个").desc("集群Topics健康检查通过数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS).unit("个").desc("集群Topics健康检查总数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); itemList.add(buildAllVersionsItem() - .name(CLUSTER_METRIC_HEALTH_SCORE_BROKERS).unit("分").desc("集群Brokers的健康分").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .name(CLUSTER_METRIC_HEALTH_STATE_BROKERS).unit("0:好 1:中 2:差 3:宕机").desc("集群Brokers健康状态").category(CATEGORY_HEALTH) + .extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS)); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS).unit("个").desc("集群Brokers健康检查通过数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS).unit("个").desc("集群Brokers健康检查总数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); itemList.add(buildAllVersionsItem() - .name(CLUSTER_METRIC_HEALTH_SCORE_GROUPS).unit("分").desc("集群Groups的健康分").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .name(CLUSTER_METRIC_HEALTH_STATE_GROUPS).unit("0:好 1:中 2:差 3:宕机").desc("集群Groups健康状态").category(CATEGORY_HEALTH) + .extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS)); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS).unit("个").desc("集群Groups健康检查通过数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS).unit("个").desc("集群Groups健康检查总数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); itemList.add(buildAllVersionsItem() - .name(CLUSTER_METRIC_HEALTH_SCORE_CLUSTER).unit("分").desc("集群自身的健康分").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .name(CLUSTER_METRIC_HEALTH_STATE_CLUSTER).unit("0:好 1:中 2:差 3:宕机").desc("集群自身健康状态").category(CATEGORY_HEALTH) + .extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS)); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER).unit("个").desc("集群自身健康检查通过数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); itemList.add(buildAllVersionsItem() .name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER).unit("个").desc("集群自身健康检查总数").category(CATEGORY_HEALTH) - .extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE )); + .extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS )); // TotalRequestQueueSize 指标 itemList.add(buildAllVersionsItem() diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/GroupMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/GroupMetricVersionItems.java index efa1359a..3ca7b4c6 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/GroupMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/GroupMetricVersionItems.java @@ -7,13 +7,14 @@ import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import static com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem.CATEGORY_HEALTH; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_GROUP; import static com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.*; @Component public class GroupMetricVersionItems extends BaseMetricVersionMetric { - public static final String GROUP_METRIC_HEALTH_SCORE = "HealthScore"; + public static final String GROUP_METRIC_HEALTH_STATE = "HealthState"; public static final String GROUP_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed"; public static final String GROUP_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal"; public static final String GROUP_METRIC_OFFSET_CONSUMED = "OffsetConsumed"; @@ -33,15 +34,15 @@ public class GroupMetricVersionItems extends BaseMetricVersionMetric { // HealthScore 指标 itemList.add(buildAllVersionsItem() - .name(GROUP_METRIC_HEALTH_SCORE).unit("分").desc("健康分") + .name(GROUP_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH) .extendMethod( GROUP_METHOD_GET_HEALTH_SCORE )); itemList.add(buildAllVersionsItem() - .name(GROUP_METRIC_HEALTH_CHECK_PASSED ).unit("个").desc("健康检查通过数") + .name(GROUP_METRIC_HEALTH_CHECK_PASSED ).unit("个").desc("健康检查通过数").category(CATEGORY_HEALTH) .extendMethod( GROUP_METHOD_GET_HEALTH_SCORE )); itemList.add(buildAllVersionsItem() - .name(GROUP_METRIC_HEALTH_CHECK_TOTAL ).unit("个").desc("健康检查总数") + .name(GROUP_METRIC_HEALTH_CHECK_TOTAL ).unit("个").desc("健康检查总数").category(CATEGORY_HEALTH) .extendMethod( GROUP_METHOD_GET_HEALTH_SCORE )); // OffsetConsumed 指标 diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/TopicMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/TopicMetricVersionItems.java index 3a52b3e8..017435bc 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/TopicMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/TopicMetricVersionItems.java @@ -16,9 +16,10 @@ import static com.xiaojukeji.know.streaming.km.core.service.topic.impl.TopicMetr @Component public class TopicMetricVersionItems extends BaseMetricVersionMetric { - public static final String TOPIC_METRIC_HEALTH_SCORE = "HealthScore"; + public static final String TOPIC_METRIC_HEALTH_STATE = "HealthState"; public static final String TOPIC_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed"; public static final String TOPIC_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal"; + public static final String TOPIC_METRIC_TOTAL_PRODUCE_REQUESTS = "TotalProduceRequests"; public static final String TOPIC_METRIC_BYTES_REJECTED = "BytesRejected"; public static final String TOPIC_METRIC_FAILED_FETCH_REQ = "FailedFetchRequests"; @@ -47,7 +48,7 @@ public class TopicMetricVersionItems extends BaseMetricVersionMetric { // HealthScore 指标 itemList.add(buildAllVersionsItem() - .name(TOPIC_METRIC_HEALTH_SCORE).unit("分").desc("健康分").category(CATEGORY_HEALTH) + .name(TOPIC_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH) .extendMethod( TOPIC_METHOD_GET_HEALTH_SCORE )); itemList.add(buildAllVersionsItem() diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java index 9b0d4d2b..a037053a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java @@ -15,6 +15,12 @@ import static com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl.Zooke @Component public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric { + /** + * 健康状态 + */ + public static final String ZOOKEEPER_METRIC_HEALTH_STATE = "HealthState"; + public static final String ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed"; + public static final String ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal"; /** * 性能 @@ -23,7 +29,7 @@ public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric { public static final String ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY = "MinRequestLatency"; public static final String ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY = "MaxRequestLatency"; public static final String ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS = "OutstandingRequests"; - public static final String ZOOKEEPER_METRIC_NODE_COUNT = "NodeCount"; + public static final String ZOOKEEPER_METRIC_NODE_COUNT = "ZnodeCount"; public static final String ZOOKEEPER_METRIC_WATCH_COUNT = "WatchCount"; public static final String ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS = "NumAliveConnections"; public static final String ZOOKEEPER_METRIC_PACKETS_RECEIVED = "PacketsReceived"; @@ -52,6 +58,20 @@ public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric { public List init(){ List items = new ArrayList<>(); + // 健康状态 + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED).unit("个").desc("健康巡检通过数").category(CATEGORY_HEALTH) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL).unit("个").desc("健康巡检总数").category(CATEGORY_HEALTH) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE)); + + // 性能指标 items.add(buildAllVersionsItem() .name(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY).unit("ms").desc("平均响应延迟").category(CATEGORY_PERFORMANCE) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java index 212513aa..ee26f3cf 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java @@ -29,6 +29,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; import com.xiaojukeji.know.streaming.km.core.cache.ZookeeperLocalCache; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; @@ -68,6 +69,9 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo @Autowired private KafkaJMXClient kafkaJMXClient; + @Autowired + private HealthStateService healthStateService; + @Override protected VersionItemTypeEnum getVersionItemType() { return VersionItemTypeEnum.METRIC_ZOOKEEPER; @@ -84,6 +88,7 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD, this::getMetricFromMonitorCmd); registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD, this::getMetricFromServerCmd); registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX, this::getMetricFromKafkaByJMX); + registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE, this::getMetricFromHealthService); } @Override @@ -302,4 +307,13 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo return Result.buildFailure(VC_JMX_CONNECT_ERROR); } } + + private Result getMetricFromHealthService(VersionItemParam metricParam) { + ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam; + + String metricName = param.getMetricName(); + Long clusterPhyId = param.getClusterPhyId(); + + return Result.buildSuc(healthStateService.calZookeeperHealthMetrics(clusterPhyId)); + } } diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/health/KafkaHealthController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/health/KafkaHealthController.java index 24ede2ae..d4fc8094 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/health/KafkaHealthController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/health/KafkaHealthController.java @@ -10,7 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.converter.HealthScoreVOConverter; import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigGroupEnum; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; -import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService; +import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; @@ -28,7 +28,7 @@ import java.util.List; @RequestMapping(ApiPrefix.API_V3_PREFIX) public class KafkaHealthController { @Autowired - private HealthScoreService healthScoreService; + private HealthStateService healthStateService; @Autowired private HealthCheckResultService healthCheckResultService; @@ -40,12 +40,11 @@ public class KafkaHealthController { @RequestParam(required = false) Integer dimensionCode) { HealthCheckDimensionEnum dimensionEnum = HealthCheckDimensionEnum.getByCode(dimensionCode); if (!dimensionEnum.equals(HealthCheckDimensionEnum.UNKNOWN)) { - return Result.buildSuc(HealthScoreVOConverter.convert2HealthScoreResultDetailVOList(healthScoreService.getDimensionHealthScoreResult(clusterPhyId, dimensionEnum), false)); + return Result.buildSuc(HealthScoreVOConverter.convert2HealthScoreResultDetailVOList(healthStateService.getDimensionHealthResult(clusterPhyId, dimensionEnum))); } return Result.buildSuc(HealthScoreVOConverter.convert2HealthScoreResultDetailVOList( - healthScoreService.getClusterHealthScoreResult(clusterPhyId), - true + healthStateService.getClusterHealthResult(clusterPhyId) )); } @@ -56,7 +55,7 @@ public class KafkaHealthController { @PathVariable Integer dimensionCode, @PathVariable String resName) { return Result.buildSuc(HealthScoreVOConverter.convert2HealthScoreBaseResultVOList( - healthScoreService.getResHealthScoreResult(clusterPhyId, dimensionCode, resName) + healthStateService.getResHealthResult(clusterPhyId, dimensionCode, resName) )); }