健康分调整为健康状态

This commit is contained in:
zengqiao
2022-10-29 13:43:33 +08:00
committed by EricZeng
parent e98ec562a2
commit b101cec6fa
30 changed files with 1091 additions and 171 deletions

View File

@@ -28,7 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
@@ -82,7 +82,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
private ReplicaMetricService replicaMetricService;
@Autowired
private HealthScoreService healthScoreService;
private HealthStateService healthStateService;
@Autowired
private KafkaJMXClient kafkaJMXClient;
@@ -108,7 +108,6 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
registerVCHandler( BROKER_METHOD_GET_HEALTH_SCORE, this::getMetricHealthScore);
registerVCHandler( BROKER_METHOD_GET_PARTITIONS_SKEW, this::getPartitionsSkew);
registerVCHandler( BROKER_METHOD_GET_LEADERS_SKEW, this::getLeadersSkew);
// registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize);
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_0_10_0_0, V_1_0_0, "getLogSizeFromJmx", this::getLogSizeFromJmx);
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_1_0_0, V_MAX, "getLogSizeFromClient", this::getLogSizeFromClient);
@@ -318,7 +317,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
Long clusterId = param.getClusterId();
Integer brokerId = param.getBrokerId();
BrokerMetrics brokerMetrics = healthScoreService.calBrokerHealthScore(clusterId, brokerId);
BrokerMetrics brokerMetrics = healthStateService.calBrokerHealthMetrics(clusterId, brokerId);
return Result.buildSuc(brokerMetrics);
}

View File

@@ -134,7 +134,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
newBrokerPO.setId(inDBBrokerPO.getId());
newBrokerPO.setStatus(Constant.ALIVE);
newBrokerPO.setCreateTime(inDBBrokerPO.getCreateTime());
newBrokerPO.setUpdateTime(inDBBrokerPO.getUpdateTime());
newBrokerPO.setUpdateTime(new Date());
if (newBrokerPO.getStartTimestamp() == null) {
// 如果当前broker获取不到启动时间
// 如果DB中的broker状态为down则使用当前时间否则使用db中已有broker的时间
@@ -363,11 +363,11 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
try {
Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), null, jmxConfig);
return Broker.buildFrom(clusterPhyId, newNode, startTime);
return Broker.buildFrom(clusterPhyId, newNode, startTime, jmxConfig);
} catch (Exception e) {
log.error("class=BrokerServiceImpl||method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e);
}
return Broker.buildFrom(clusterPhyId, newNode, null);
return Broker.buildFrom(clusterPhyId, newNode, null, jmxConfig);
}
}

View File

@@ -39,7 +39,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.job.JobService;
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
@@ -85,7 +85,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
public static final String CLUSTER_METHOD_GET_TOTAL_LOG_SIZE = "getTotalLogSize";
public static final String CLUSTER_METHOD_GET_PARTITION_SIZE = "getPartitionSize";
public static final String CLUSTER_METHOD_GET_PARTITION_NO_LEADER_SIZE = "getPartitionNoLeaderSize";
public static final String CLUSTER_METHOD_GET_HEALTH_SCORE = "getClusterHealthScore";
public static final String CLUSTER_METHOD_GET_HEALTH_METRICS = "getClusterHealthMetrics";
public static final String CLUSTER_METHOD_GET_METRIC_FROM_KAFKA_BY_TOTAL_BROKERS_JMX = "getMetricFromKafkaByTotalBrokersJMX";
public static final String CLUSTER_METHOD_GET_METRIC_FROM_KAFKA_BY_CONTROLLER_JMX = "getMetricFromKafkaByControllerJMX";
public static final String CLUSTER_METHOD_GET_ZK_COUNT = "getZKCount";
@@ -114,7 +114,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
public static final String CLUSTER_METHOD_GET_JOBS_FAILED = "getJobsFailed";
@Autowired
private HealthScoreService healthScoreService;
private HealthStateService healthStateService;
@Autowired
private BrokerService brokerService;
@@ -188,7 +188,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
registerVCHandler( CLUSTER_METHOD_GET_PARTITION_SIZE, this::getPartitionSize);
registerVCHandler( CLUSTER_METHOD_GET_PARTITION_NO_LEADER_SIZE, this::getPartitionNoLeaderSize);
registerVCHandler( CLUSTER_METHOD_GET_HEALTH_SCORE, this::getClusterHealthScore);
registerVCHandler( CLUSTER_METHOD_GET_HEALTH_METRICS, this::getClusterHealthMetrics);
registerVCHandler( CLUSTER_METHOD_GET_METRIC_FROM_KAFKA_BY_TOTAL_BROKERS_JMX, this::getMetricFromKafkaByTotalBrokersJMX);
registerVCHandler( CLUSTER_METHOD_GET_METRIC_FROM_KAFKA_BY_CONTROLLER_JMX, this::getMetricFromKafkaByControllerJMX);
@@ -361,15 +361,13 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
}
/**
* 获取集群的健康分
*/
private Result<ClusterMetrics> getClusterHealthScore(VersionItemParam metricParam){
private Result<ClusterMetrics> getClusterHealthMetrics(VersionItemParam metricParam){
ClusterMetricParam param = (ClusterMetricParam)metricParam;
ClusterMetrics clusterMetrics = healthScoreService.calClusterHealthScore(param.getClusterId());
ClusterMetrics clusterMetrics = healthStateService.calClusterHealthMetrics(param.getClusterId());
return Result.buildSuc(clusterMetrics);
}
/**
* 获取集群的 totalLogSize
* @param metricParam

View File

@@ -20,7 +20,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO;
@@ -64,7 +64,7 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe
private GroupService groupService;
@Autowired
private HealthScoreService healthScoreService;
private HealthStateService healthStateService;
@Autowired
private PartitionService partitionService;
@@ -265,8 +265,8 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe
private Result<List<GroupMetrics>> getMetricHealthScore(VersionItemParam param) {
GroupMetricParam groupMetricParam = (GroupMetricParam)param;
return Result.buildSuc(Arrays.asList(healthScoreService.calGroupHealthScore(
groupMetricParam.getClusterPhyId(), groupMetricParam.getGroupName()))
return Result.buildSuc(Arrays.asList(
healthStateService.calGroupHealthMetrics(groupMetricParam.getClusterPhyId(), groupMetricParam.getGroupName()))
);
}
}

View File

@@ -0,0 +1,281 @@
package com.xiaojukeji.know.streaming.km.core.service.health.checker.zookeeper;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthAmountRatioConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.zookeeper.ZookeeperParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.ZookeeperUtils;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Service
public class HealthCheckZookeeperService extends AbstractHealthCheckService {
private static final ILog log = LogFactory.getLog(HealthCheckZookeeperService.class);
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private ZookeeperService zookeeperService;
@Autowired
private ZookeeperMetricService zookeeperMetricService;
@PostConstruct
private void init() {
functionMap.putIfAbsent(HealthCheckNameEnum.ZK_BRAIN_SPLIT.getConfigName(), this::checkBrainSplit);
functionMap.putIfAbsent(HealthCheckNameEnum.ZK_OUTSTANDING_REQUESTS.getConfigName(), this::checkOutstandingRequests);
functionMap.putIfAbsent(HealthCheckNameEnum.ZK_WATCH_COUNT.getConfigName(), this::checkWatchCount);
functionMap.putIfAbsent(HealthCheckNameEnum.ZK_ALIVE_CONNECTIONS.getConfigName(), this::checkAliveConnections);
functionMap.putIfAbsent(HealthCheckNameEnum.ZK_APPROXIMATE_DATA_SIZE.getConfigName(), this::checkApproximateDataSize);
functionMap.putIfAbsent(HealthCheckNameEnum.ZK_SENT_RATE.getConfigName(), this::checkSentRate);
}
@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null) {
return new ArrayList<>();
}
try {
return Arrays.asList(new ZookeeperParam(
clusterPhyId,
ZookeeperUtils.connectStringParser(clusterPhy.getZookeeper()),
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class)
));
} catch (Exception e) {
log.error("class=HealthCheckZookeeperService||method=getResList||clusterPhyId={}||errMsg=exception!", clusterPhyId, e);
}
return new ArrayList<>();
}
@Override
public HealthCheckDimensionEnum getHealthCheckDimensionEnum() {
return HealthCheckDimensionEnum.ZOOKEEPER;
}
private HealthCheckResult checkBrainSplit(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthCompareValueConfig valueConfig = (HealthCompareValueConfig) singleConfigSimpleTuple.getV2();
List<ZookeeperInfo> infoList = zookeeperService.listFromDBByCluster(param.getClusterPhyId());
HealthCheckResult checkResult = new HealthCheckResult(
HealthCheckDimensionEnum.ZOOKEEPER.getDimension(),
HealthCheckNameEnum.ZK_BRAIN_SPLIT.getConfigName(),
param.getClusterPhyId(),
""
);
long value = infoList.stream().filter(elem -> ZKRoleEnum.LEADER.getRole().equals(elem.getRole())).count();
checkResult.setPassed(value == valueConfig.getValue().longValue() ? Constant.YES : Constant.NO);
return checkResult;
}
private HealthCheckResult checkOutstandingRequests(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();
Result<ZookeeperMetrics> metricsResult = zookeeperMetricService.collectMetricsFromZookeeper(
new ZookeeperMetricParam(
param.getClusterPhyId(),
param.getZkAddressList(),
param.getZkConfig(),
ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS
)
);
if (metricsResult.failed() || !metricsResult.hasData()) {
log.error(
"class=HealthCheckZookeeperService||method=checkOutstandingRequests||param={}||config={}||result={}||errMsg=get metrics failed",
param, valueConfig, metricsResult
);
return null;
}
HealthCheckResult checkResult = new HealthCheckResult(
HealthCheckDimensionEnum.ZOOKEEPER.getDimension(),
HealthCheckNameEnum.ZK_OUTSTANDING_REQUESTS.getConfigName(),
param.getClusterPhyId(),
""
);
Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS);
checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO);
return checkResult;
}
private HealthCheckResult checkWatchCount(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();
Result<ZookeeperMetrics> metricsResult = zookeeperMetricService.collectMetricsFromZookeeper(
new ZookeeperMetricParam(
param.getClusterPhyId(),
param.getZkAddressList(),
param.getZkConfig(),
ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT
)
);
if (metricsResult.failed() || !metricsResult.hasData()) {
log.error(
"class=HealthCheckZookeeperService||method=checkWatchCount||param={}||config={}||result={}||errMsg=get metrics failed",
param, valueConfig, metricsResult
);
return null;
}
HealthCheckResult checkResult = new HealthCheckResult(
HealthCheckDimensionEnum.ZOOKEEPER.getDimension(),
HealthCheckNameEnum.ZK_WATCH_COUNT.getConfigName(),
param.getClusterPhyId(),
""
);
Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT);
checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO);
return checkResult;
}
private HealthCheckResult checkAliveConnections(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();
Result<ZookeeperMetrics> metricsResult = zookeeperMetricService.collectMetricsFromZookeeper(
new ZookeeperMetricParam(
param.getClusterPhyId(),
param.getZkAddressList(),
param.getZkConfig(),
ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS
)
);
if (metricsResult.failed() || !metricsResult.hasData()) {
log.error(
"class=HealthCheckZookeeperService||method=checkAliveConnections||param={}||config={}||result={}||errMsg=get metrics failed",
param, valueConfig, metricsResult
);
return null;
}
HealthCheckResult checkResult = new HealthCheckResult(
HealthCheckDimensionEnum.ZOOKEEPER.getDimension(),
HealthCheckNameEnum.ZK_ALIVE_CONNECTIONS.getConfigName(),
param.getClusterPhyId(),
""
);
Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS);
checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO);
return checkResult;
}
private HealthCheckResult checkApproximateDataSize(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();
Result<ZookeeperMetrics> metricsResult = zookeeperMetricService.collectMetricsFromZookeeper(
new ZookeeperMetricParam(
param.getClusterPhyId(),
param.getZkAddressList(),
param.getZkConfig(),
ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE
)
);
if (metricsResult.failed() || !metricsResult.hasData()) {
log.error(
"class=HealthCheckZookeeperService||method=checkApproximateDataSize||param={}||config={}||result={}||errMsg=get metrics failed",
param, valueConfig, metricsResult
);
return null;
}
HealthCheckResult checkResult = new HealthCheckResult(
HealthCheckDimensionEnum.ZOOKEEPER.getDimension(),
HealthCheckNameEnum.ZK_APPROXIMATE_DATA_SIZE.getConfigName(),
param.getClusterPhyId(),
""
);
Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE);
checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO);
return checkResult;
}
private HealthCheckResult checkSentRate(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();
Result<ZookeeperMetrics> metricsResult = zookeeperMetricService.collectMetricsFromZookeeper(
new ZookeeperMetricParam(
param.getClusterPhyId(),
param.getZkAddressList(),
param.getZkConfig(),
ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_PACKETS_SENT
)
);
if (metricsResult.failed() || !metricsResult.hasData()) {
log.error(
"class=HealthCheckZookeeperService||method=checkSentRate||param={}||config={}||result={}||errMsg=get metrics failed",
param, valueConfig, metricsResult
);
return null;
}
HealthCheckResult checkResult = new HealthCheckResult(
HealthCheckDimensionEnum.ZOOKEEPER.getDimension(),
HealthCheckNameEnum.ZK_SENT_RATE.getConfigName(),
param.getClusterPhyId(),
""
);
Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_PACKETS_SENT);
checkResult.setPassed(value.intValue() <= valueConfig.getAmount().doubleValue() * valueConfig.getRatio().doubleValue() ? Constant.YES : Constant.NO);
return checkResult;
}
}

View File

@@ -0,0 +1,50 @@
package com.xiaojukeji.know.streaming.km.core.service.health.state;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
import java.util.List;
public interface HealthStateService {
/**
* 集群健康指标
*/
ClusterMetrics calClusterHealthMetrics(Long clusterPhyId);
/**
* 获取Broker健康指标
*/
BrokerMetrics calBrokerHealthMetrics(Long clusterPhyId, Integer brokerId);
/**
* 获取Topic健康指标
*/
TopicMetrics calTopicHealthMetrics(Long clusterPhyId, String topicName);
/**
* 获取Group健康指标
*/
GroupMetrics calGroupHealthMetrics(Long clusterPhyId, String groupName);
/**
* 获取Zookeeper健康指标
*/
ZookeeperMetrics calZookeeperHealthMetrics(Long clusterPhyId);
/**
* 获取集群健康检查结果
*/
List<HealthScoreResult> getClusterHealthResult(Long clusterPhyId);
/**
* 获取集群某个维度健康检查结果
*/
List<HealthScoreResult> getDimensionHealthResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum);
/**
* 获取集群某个资源的健康检查结果
*/
List<HealthScoreResult> getResHealthResult(Long clusterPhyId, Integer dimension, String resNme);
}

View File

@@ -0,0 +1,408 @@
package com.xiaojukeji.know.streaming.km.core.service.health.state.impl;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAggResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*;
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.BrokerMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.BrokerMetricVersionItems.BROKER_METRIC_HEALTH_STATE;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.ClusterMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.GroupMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.GroupMetricVersionItems.GROUP_METRIC_HEALTH_CHECK_TOTAL;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems.TOPIC_METRIC_HEALTH_CHECK_TOTAL;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems.*;
@Service
public class HealthStateServiceImpl implements HealthStateService {
@Autowired
private HealthCheckResultService healthCheckResultService;
@Autowired
private ZookeeperService zookeeperService;
@Autowired
private BrokerService brokerService;
@Override
public ClusterMetrics calClusterHealthMetrics(Long clusterPhyId) {
ClusterMetrics metrics = new ClusterMetrics(clusterPhyId);
// 集群维度指标
List<HealthCheckAggResult> resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.CLUSTER);
if (ValidateUtils.isEmptyList(resultList)) {
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER, 0.0f);
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER, 0.0f);
} else {
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER, this.getHealthCheckPassed(resultList));
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER, (float)resultList.size());
}
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_CLUSTER, (float)this.calHealthState(resultList).getDimension());
// 获取指标
metrics.putMetric(this.calClusterBrokersHealthMetrics(clusterPhyId).getMetrics());
metrics.putMetric(this.calClusterTopicsHealthMetrics(clusterPhyId).getMetrics());
metrics.putMetric(this.calClusterGroupsHealthMetrics(clusterPhyId).getMetrics());
metrics.putMetric(this.calZookeeperHealthMetrics(clusterPhyId).getMetrics());
// 统计最终结果
Float passed = 0.0f;
passed += metrics.getMetric(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED);
passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS);
passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS);
passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS);
passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER);
Float total = 0.0f;
total += metrics.getMetric(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL);
total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS);
total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS);
total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS);
total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER);
// 状态
Float state = 0.0f;
state = Math.max(state, metrics.getMetric(ZOOKEEPER_METRIC_HEALTH_STATE));
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_TOPICS));
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_BROKERS));
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_GROUPS));
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_CLUSTER));
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED, passed);
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL, total);
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE, state);
return metrics;
}
@Override
public BrokerMetrics calBrokerHealthMetrics(Long clusterPhyId, Integer brokerId) {
List<HealthScoreResult> healthScoreResultList = this.getResHealthResult(clusterPhyId, HealthCheckDimensionEnum.BROKER.getDimension(), String.valueOf(brokerId));
BrokerMetrics metrics = new BrokerMetrics(clusterPhyId, brokerId);
if (ValidateUtils.isEmptyList(healthScoreResultList)) {
metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension());
metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_PASSED, 0.0f);
metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_TOTAL, 0.0f);
} else {
metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_PASSED, getHealthCheckResultPassed(healthScoreResultList));
metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_TOTAL, Float.valueOf(healthScoreResultList.size()));
// 计算健康状态
Broker broker = brokerService.getBrokerFromCacheFirst(clusterPhyId, brokerId);
if (broker == null) {
// DB中不存在则默认是存活的
metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension());
} else if (!broker.alive()) {
metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.DEAD.getDimension());
} else {
metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)this.calHealthScoreResultState(healthScoreResultList).getDimension());
}
}
return metrics;
}
@Override
public TopicMetrics calTopicHealthMetrics(Long clusterPhyId, String topicName) {
List<HealthScoreResult> healthScoreResultList = this.getResHealthResult(clusterPhyId, HealthCheckDimensionEnum.TOPIC.getDimension(), topicName);
TopicMetrics metrics = new TopicMetrics(topicName, clusterPhyId,true);
if (ValidateUtils.isEmptyList(healthScoreResultList)) {
metrics.getMetrics().put(TOPIC_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension());
metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_PASSED, 0.0f);
metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_TOTAL, 0.0f);
} else {
metrics.getMetrics().put(TOPIC_METRIC_HEALTH_STATE, (float)this.calHealthScoreResultState(healthScoreResultList).getDimension());
metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckResultPassed(healthScoreResultList));
metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_TOTAL, Float.valueOf(healthScoreResultList.size()));
}
return metrics;
}
@Override
public GroupMetrics calGroupHealthMetrics(Long clusterPhyId, String groupName) {
List<HealthScoreResult> healthScoreResultList = this.getResHealthResult(clusterPhyId, HealthCheckDimensionEnum.GROUP.getDimension(), groupName);
GroupMetrics metrics = new GroupMetrics(clusterPhyId, groupName, true);
if (ValidateUtils.isEmptyList(healthScoreResultList)) {
metrics.getMetrics().put(GROUP_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension());
metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_PASSED, 0.0f);
metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_TOTAL, 0.0f);
} else {
metrics.getMetrics().put(GROUP_METRIC_HEALTH_STATE, (float)this.calHealthScoreResultState(healthScoreResultList).getDimension());
metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_PASSED, getHealthCheckResultPassed(healthScoreResultList));
metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_TOTAL, Float.valueOf(healthScoreResultList.size()));
}
return metrics;
}
@Override
public ZookeeperMetrics calZookeeperHealthMetrics(Long clusterPhyId) {
List<HealthCheckAggResult> resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.ZOOKEEPER);
ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId);
if (ValidateUtils.isEmptyList(resultList)) {
metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED, 0.0f);
metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL, 0.0f);
} else {
metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckPassed(resultList));
metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL, (float)resultList.size());
}
if (zookeeperService.allServerDown(clusterPhyId)) {
// 所有服务挂掉
metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_STATE, (float)HealthStateEnum.DEAD.getDimension());
return metrics;
}
if (zookeeperService.existServerDown(clusterPhyId)) {
// 存在服务挂掉
metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_STATE, (float)HealthStateEnum.POOR.getDimension());
return metrics;
}
// 服务未挂时,依据检查结果计算状态
metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_STATE, (float)this.calHealthState(resultList).getDimension());
return metrics;
}
@Override
public List<HealthScoreResult> getClusterHealthResult(Long clusterPhyId) {
List<HealthCheckResultPO> poList = healthCheckResultService.getClusterHealthCheckResult(clusterPhyId);
// <检查项,<检查结果>>
Map<String, List<HealthCheckResultPO>> checkResultMap = new HashMap<>();
for (HealthCheckResultPO po: poList) {
checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>());
checkResultMap.get(po.getConfigName()).add(po);
}
Map<String, BaseClusterHealthConfig> configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId);
List<HealthScoreResult> healthScoreResultList = new ArrayList<>();
for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.values()) {
BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName());
if (baseConfig == null) {
continue;
}
healthScoreResultList.add(new HealthScoreResult(
nameEnum,
baseConfig,
checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>()))
);
}
return healthScoreResultList;
}
@Override
public List<HealthScoreResult> getDimensionHealthResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum) {
List<HealthCheckResultPO> poList = healthCheckResultService.getClusterResourcesHealthCheckResult(clusterPhyId, dimensionEnum.getDimension());
// <检查项,<通过的数量,不通过的数量>>
Map<String, List<HealthCheckResultPO>> checkResultMap = new HashMap<>();
for (HealthCheckResultPO po: poList) {
checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>());
checkResultMap.get(po.getConfigName()).add(po);
}
Map<String, BaseClusterHealthConfig> configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId);
List<HealthScoreResult> healthScoreResultList = new ArrayList<>();
for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimension(dimensionEnum)) {
BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName());
if (baseConfig == null) {
continue;
}
healthScoreResultList.add(new HealthScoreResult(nameEnum, baseConfig, checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>())));
}
return healthScoreResultList;
}
@Override
public List<HealthScoreResult> getResHealthResult(Long clusterPhyId, Integer dimension, String resNme) {
List<HealthCheckResultPO> poList = healthCheckResultService.getResHealthCheckResult(clusterPhyId, dimension, resNme);
Map<String, List<HealthCheckResultPO>> checkResultMap = new HashMap<>();
for (HealthCheckResultPO po: poList) {
checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>());
checkResultMap.get(po.getConfigName()).add(po);
}
Map<String, BaseClusterHealthConfig> configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId);
List<HealthScoreResult> healthScoreResultList = new ArrayList<>();
for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimensionCode(dimension)) {
BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName());
if (baseConfig == null) {
continue;
}
healthScoreResultList.add(new HealthScoreResult(nameEnum, baseConfig, checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>())));
}
return healthScoreResultList;
}
/**************************************************** private method ****************************************************/
private ClusterMetrics calClusterTopicsHealthMetrics(Long clusterPhyId) {
List<HealthCheckAggResult> resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.TOPIC);
ClusterMetrics metrics = new ClusterMetrics(clusterPhyId);
if (ValidateUtils.isEmptyList(resultList)) {
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS, 0.0f);
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS, 0.0f);
} else {
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS, this.getHealthCheckPassed(resultList));
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS, (float)resultList.size());
}
// 服务未挂时,依据检查结果计算状态
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_TOPICS, (float)this.calHealthState(resultList).getDimension());
return metrics;
}
private ClusterMetrics calClusterGroupsHealthMetrics(Long clusterPhyId) {
List<HealthCheckAggResult> resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.GROUP);
ClusterMetrics metrics = new ClusterMetrics(clusterPhyId);
if (ValidateUtils.isEmptyList(resultList)) {
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS, 0.0f);
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS, 0.0f);
} else {
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS, this.getHealthCheckPassed(resultList));
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS, (float)resultList.size());
}
// 服务未挂时,依据检查结果计算状态
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_GROUPS, (float)this.calHealthState(resultList).getDimension());
return metrics;
}
private ClusterMetrics calClusterBrokersHealthMetrics(Long clusterPhyId) {
List<HealthCheckAggResult> resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.BROKER);
ClusterMetrics metrics = new ClusterMetrics(clusterPhyId);
if (ValidateUtils.isEmptyList(resultList)) {
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS, 0.0f);
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS, 0.0f);
} else {
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS, this.getHealthCheckPassed(resultList));
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS, (float)resultList.size());
}
if (brokerService.allServerDown(clusterPhyId)) {
// 所有服务挂掉
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_BROKERS, (float)HealthStateEnum.DEAD.getDimension());
return metrics;
}
if (brokerService.existServerDown(clusterPhyId)) {
// 存在服务挂掉
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_BROKERS, (float)HealthStateEnum.POOR.getDimension());
return metrics;
}
// 服务未挂时,依据检查结果计算状态
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE_BROKERS, (float)this.calHealthState(resultList).getDimension());
return metrics;
}
private List<HealthCheckAggResult> getDimensionHealthCheckAggResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum) {
List<HealthCheckResultPO> poList = healthCheckResultService.getClusterResourcesHealthCheckResult(clusterPhyId, dimensionEnum.getDimension());
Map<String /*检查名*/, List<HealthCheckResultPO> /*检查结果列表*/> groupByCheckNamePOMap = new HashMap<>();
for (HealthCheckResultPO po: poList) {
groupByCheckNamePOMap.putIfAbsent(po.getConfigName(), new ArrayList<>());
groupByCheckNamePOMap.get(po.getConfigName()).add(po);
}
List<HealthCheckAggResult> stateList = new ArrayList<>();
for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimension(dimensionEnum)) {
stateList.add(new HealthCheckAggResult(nameEnum, groupByCheckNamePOMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>())));
}
return stateList;
}
private float getHealthCheckPassed(List<HealthCheckAggResult> resultList){
if(ValidateUtils.isEmptyList(resultList)) {
return 0f;
}
return Float.valueOf(resultList.stream().filter(elem -> elem.getPassed()).count());
}
private HealthStateEnum calHealthState(List<HealthCheckAggResult> resultList) {
if(ValidateUtils.isEmptyList(resultList)) {
return HealthStateEnum.GOOD;
}
boolean existNotPassed = false;
for (HealthCheckAggResult aggResult: resultList) {
if (aggResult.getCheckNameEnum().isAvailableChecker() && !aggResult.getPassed()) {
return HealthStateEnum.POOR;
}
if (!aggResult.getPassed()) {
existNotPassed = true;
}
}
return existNotPassed? HealthStateEnum.MEDIUM: HealthStateEnum.GOOD;
}
private float getHealthCheckResultPassed(List<HealthScoreResult> healthScoreResultList){
if(CollectionUtils.isEmpty(healthScoreResultList)){return 0f;}
return Float.valueOf(healthScoreResultList.stream().filter(elem -> elem.getPassed()).count());
}
private HealthStateEnum calHealthScoreResultState(List<HealthScoreResult> resultList) {
if(ValidateUtils.isEmptyList(resultList)) {
return HealthStateEnum.GOOD;
}
boolean existNotPassed = false;
for (HealthScoreResult aggResult: resultList) {
if (aggResult.getCheckNameEnum().isAvailableChecker() && !aggResult.getPassed()) {
return HealthStateEnum.POOR;
}
if (!aggResult.getPassed()) {
existNotPassed = true;
}
}
return existNotPassed? HealthStateEnum.MEDIUM: HealthStateEnum.GOOD;
}
}

View File

@@ -31,7 +31,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
@@ -69,7 +69,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
public static final String TOPIC_METHOD_GET_REPLICAS_COUNT = "getReplicasCount";
@Autowired
private HealthScoreService healthScoreService;
private HealthStateService healthStateService;
@Autowired
private KafkaJMXClient kafkaJMXClient;
@@ -394,7 +394,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
String topic = topicMetricParam.getTopic();
Long clusterId = topicMetricParam.getClusterId();
TopicMetrics topicMetric = healthScoreService.calTopicHealthScore(clusterId, topic);
TopicMetrics topicMetric = healthStateService.calTopicHealthMetrics(clusterId, topic);
return Result.buildSuc(Arrays.asList(topicMetric));
}

View File

@@ -16,7 +16,7 @@ import static com.xiaojukeji.know.streaming.km.core.service.broker.impl.BrokerMe
@Component
public class BrokerMetricVersionItems extends BaseMetricVersionMetric {
public static final String BROKER_METRIC_HEALTH_SCORE = "HealthScore";
public static final String BROKER_METRIC_HEALTH_STATE = "HealthState";
public static final String BROKER_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed";
public static final String BROKER_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal";
public static final String BROKER_METRIC_TOTAL_REQ_QUEUE = "TotalRequestQueueSize";
@@ -57,8 +57,8 @@ public class BrokerMetricVersionItems extends BaseMetricVersionMetric {
// HealthScore 指标
items.add(buildAllVersionsItem()
.name(BROKER_METRIC_HEALTH_SCORE).unit("").desc("健康").category(CATEGORY_HEALTH)
.extendMethod(BROKER_METHOD_GET_HEALTH_SCORE));
.name(BROKER_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH)
.extendMethod( BROKER_METHOD_GET_HEALTH_SCORE ));
items.add(buildAllVersionsItem()
.name(BROKER_METRIC_HEALTH_CHECK_PASSED ).unit("").desc("健康检查通过数").category(CATEGORY_HEALTH)

View File

@@ -19,36 +19,41 @@ import static com.xiaojukeji.know.streaming.km.core.service.cluster.impl.Cluster
*/
@Component
public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
/**
* 健康分
*/
public static final String CLUSTER_METRIC_HEALTH_SCORE = "HealthScore";
public static final String CLUSTER_METRIC_HEALTH_SCORE_TOPICS = "HealthScore_Topics";
public static final String CLUSTER_METRIC_HEALTH_SCORE_BROKERS = "HealthScore_Brokers";
public static final String CLUSTER_METRIC_HEALTH_SCORE_GROUPS = "HealthScore_Groups";
public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster";
/**
* 健康巡检
* 整体的健康指标
*/
public static final String CLUSTER_METRIC_HEALTH_STATE = "HealthState";
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed";
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal";
/**
* Topics健康指标
*/
public static final String CLUSTER_METRIC_HEALTH_STATE_TOPICS = "HealthState_Topics";
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS = "HealthCheckPassed_Topics";
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS = "HealthCheckTotal_Topics";
/**
* Brokers健康指标
*/
public static final String CLUSTER_METRIC_HEALTH_STATE_BROKERS = "HealthState_Brokers";
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS = "HealthCheckPassed_Brokers";
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS = "HealthCheckTotal_Brokers";
/**
* Groups健康指标
*/
public static final String CLUSTER_METRIC_HEALTH_STATE_GROUPS = "HealthState_Groups";
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS = "HealthCheckPassed_Groups";
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS = "HealthCheckTotal_Groups";
/**
* Cluster健康指标
*/
public static final String CLUSTER_METRIC_HEALTH_STATE_CLUSTER = "HealthState_Cluster";
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster";
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster";
public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize";
public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize";
public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";
@@ -113,64 +118,64 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
// HealthScore 指标
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_SCORE).unit("").desc("集群总体的健康分").category(CATEGORY_HEALTH)
.extendMethod(CLUSTER_METHOD_GET_HEALTH_SCORE));
.name(CLUSTER_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕").desc("集群健康状态(0:好 1:中 2:差 3:宕)").category(CATEGORY_HEALTH)
.extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_PASSED).unit("").desc("集群总体健康检查通过数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL).unit("").desc("集群总体健康检查总数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_SCORE_TOPICS).unit("").desc("集群Topics健康").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.name(CLUSTER_METRIC_HEALTH_STATE_TOPICS).unit("0:好 1:中 2:差 3:宕机").desc("集群Topics健康状态").category(CATEGORY_HEALTH)
.extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_PASSED_TOPICS).unit("").desc("集群Topics健康检查通过数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_TOPICS).unit("").desc("集群Topics健康检查总数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_SCORE_BROKERS).unit("").desc("集群Brokers健康").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.name(CLUSTER_METRIC_HEALTH_STATE_BROKERS).unit("0:好 1:中 2:差 3:宕机").desc("集群Brokers健康状态").category(CATEGORY_HEALTH)
.extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_PASSED_BROKERS).unit("").desc("集群Brokers健康检查通过数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_BROKERS).unit("").desc("集群Brokers健康检查总数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_SCORE_GROUPS).unit("").desc("集群Groups健康").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.name(CLUSTER_METRIC_HEALTH_STATE_GROUPS).unit("0:好 1:中 2:差 3:宕机").desc("集群Groups健康状态").category(CATEGORY_HEALTH)
.extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS).unit("").desc("集群Groups健康检查通过数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS).unit("").desc("集群Groups健康检查总数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_SCORE_CLUSTER).unit("").desc("集群自身健康").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.name(CLUSTER_METRIC_HEALTH_STATE_CLUSTER).unit("0:好 1:中 2:差 3:宕机").desc("集群自身健康状态").category(CATEGORY_HEALTH)
.extendMethod(CLUSTER_METHOD_GET_HEALTH_METRICS));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER).unit("").desc("集群自身健康检查通过数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER).unit("").desc("集群自身健康检查总数").category(CATEGORY_HEALTH)
.extendMethod( CLUSTER_METHOD_GET_HEALTH_SCORE ));
.extendMethod( CLUSTER_METHOD_GET_HEALTH_METRICS ));
// TotalRequestQueueSize 指标
itemList.add(buildAllVersionsItem()

View File

@@ -7,13 +7,14 @@ import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem.CATEGORY_HEALTH;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_GROUP;
import static com.xiaojukeji.know.streaming.km.core.service.group.impl.GroupMetricServiceImpl.*;
@Component
public class GroupMetricVersionItems extends BaseMetricVersionMetric {
public static final String GROUP_METRIC_HEALTH_SCORE = "HealthScore";
public static final String GROUP_METRIC_HEALTH_STATE = "HealthState";
public static final String GROUP_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed";
public static final String GROUP_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal";
public static final String GROUP_METRIC_OFFSET_CONSUMED = "OffsetConsumed";
@@ -33,15 +34,15 @@ public class GroupMetricVersionItems extends BaseMetricVersionMetric {
// HealthScore 指标
itemList.add(buildAllVersionsItem()
.name(GROUP_METRIC_HEALTH_SCORE).unit("").desc("健康")
.name(GROUP_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH)
.extendMethod( GROUP_METHOD_GET_HEALTH_SCORE ));
itemList.add(buildAllVersionsItem()
.name(GROUP_METRIC_HEALTH_CHECK_PASSED ).unit("").desc("健康检查通过数")
.name(GROUP_METRIC_HEALTH_CHECK_PASSED ).unit("").desc("健康检查通过数").category(CATEGORY_HEALTH)
.extendMethod( GROUP_METHOD_GET_HEALTH_SCORE ));
itemList.add(buildAllVersionsItem()
.name(GROUP_METRIC_HEALTH_CHECK_TOTAL ).unit("").desc("健康检查总数")
.name(GROUP_METRIC_HEALTH_CHECK_TOTAL ).unit("").desc("健康检查总数").category(CATEGORY_HEALTH)
.extendMethod( GROUP_METHOD_GET_HEALTH_SCORE ));
// OffsetConsumed 指标

View File

@@ -16,9 +16,10 @@ import static com.xiaojukeji.know.streaming.km.core.service.topic.impl.TopicMetr
@Component
public class TopicMetricVersionItems extends BaseMetricVersionMetric {
public static final String TOPIC_METRIC_HEALTH_SCORE = "HealthScore";
public static final String TOPIC_METRIC_HEALTH_STATE = "HealthState";
public static final String TOPIC_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed";
public static final String TOPIC_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal";
public static final String TOPIC_METRIC_TOTAL_PRODUCE_REQUESTS = "TotalProduceRequests";
public static final String TOPIC_METRIC_BYTES_REJECTED = "BytesRejected";
public static final String TOPIC_METRIC_FAILED_FETCH_REQ = "FailedFetchRequests";
@@ -47,7 +48,7 @@ public class TopicMetricVersionItems extends BaseMetricVersionMetric {
// HealthScore 指标
itemList.add(buildAllVersionsItem()
.name(TOPIC_METRIC_HEALTH_SCORE).unit("").desc("健康").category(CATEGORY_HEALTH)
.name(TOPIC_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH)
.extendMethod( TOPIC_METHOD_GET_HEALTH_SCORE ));
itemList.add(buildAllVersionsItem()

View File

@@ -15,6 +15,12 @@ import static com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl.Zooke
@Component
public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric {
/**
* 健康状态
*/
public static final String ZOOKEEPER_METRIC_HEALTH_STATE = "HealthState";
public static final String ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed";
public static final String ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal";
/**
* 性能
@@ -23,7 +29,7 @@ public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric {
public static final String ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY = "MinRequestLatency";
public static final String ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY = "MaxRequestLatency";
public static final String ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS = "OutstandingRequests";
public static final String ZOOKEEPER_METRIC_NODE_COUNT = "NodeCount";
public static final String ZOOKEEPER_METRIC_NODE_COUNT = "ZnodeCount";
public static final String ZOOKEEPER_METRIC_WATCH_COUNT = "WatchCount";
public static final String ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS = "NumAliveConnections";
public static final String ZOOKEEPER_METRIC_PACKETS_RECEIVED = "PacketsReceived";
@@ -52,6 +58,20 @@ public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric {
public List<VersionMetricControlItem> init(){
List<VersionMetricControlItem> items = new ArrayList<>();
// 健康状态
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED).unit("").desc("健康巡检通过数").category(CATEGORY_HEALTH)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL).unit("").desc("健康巡检总数").category(CATEGORY_HEALTH)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE));
// 性能指标
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY).unit("ms").desc("平均响应延迟").category(CATEGORY_PERFORMANCE)

View File

@@ -29,6 +29,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
import com.xiaojukeji.know.streaming.km.core.cache.ZookeeperLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
@@ -68,6 +69,9 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
@Autowired
private KafkaJMXClient kafkaJMXClient;
@Autowired
private HealthStateService healthStateService;
@Override
protected VersionItemTypeEnum getVersionItemType() {
return VersionItemTypeEnum.METRIC_ZOOKEEPER;
@@ -84,6 +88,7 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD, this::getMetricFromMonitorCmd);
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD, this::getMetricFromServerCmd);
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX, this::getMetricFromKafkaByJMX);
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE, this::getMetricFromHealthService);
}
@Override
@@ -302,4 +307,13 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
return Result.buildFailure(VC_JMX_CONNECT_ERROR);
}
}
private Result<ZookeeperMetrics> getMetricFromHealthService(VersionItemParam metricParam) {
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
String metricName = param.getMetricName();
Long clusterPhyId = param.getClusterPhyId();
return Result.buildSuc(healthStateService.calZookeeperHealthMetrics(clusterPhyId));
}
}