diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthCheckAggResult.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthCheckAggResult.java index 69d65a20..afd42120 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthCheckAggResult.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthCheckAggResult.java @@ -14,16 +14,16 @@ import java.util.stream.Collectors; @Data @NoArgsConstructor public class HealthCheckAggResult { - private HealthCheckNameEnum checkNameEnum; + protected HealthCheckNameEnum checkNameEnum; - private List poList; + protected List poList; - private Boolean passed; + protected Boolean passed; public HealthCheckAggResult(HealthCheckNameEnum checkNameEnum, List poList) { this.checkNameEnum = checkNameEnum; this.poList = poList; - if (!ValidateUtils.isEmptyList(poList) && poList.stream().filter(elem -> elem.getPassed() <= 0).count() <= 0) { + if (ValidateUtils.isEmptyList(poList) || poList.stream().filter(elem -> elem.getPassed() <= 0).count() <= 0) { passed = true; } else { passed = false; @@ -45,24 +45,12 @@ public class HealthCheckAggResult { return (int) (poList.stream().filter(elem -> elem.getPassed() > 0).count()); } - /** - * 计算当前检查的健康分 - * 比如:计算集群Broker健康检查中的某一项的健康分 - */ - public Integer calRawHealthScore() { - if (poList == null || poList.isEmpty()) { - return 100; - } - - return 100 * this.getPassedCount() / this.getTotalCount(); - } - public List getNotPassedResNameList() { if (poList == null) { return new ArrayList<>(); } - return poList.stream().filter(elem -> elem.getPassed() <= 0).map(elem -> elem.getResName()).collect(Collectors.toList()); + return poList.stream().filter(elem -> elem.getPassed() <= 0 && !ValidateUtils.isBlank(elem.getResName())).map(elem -> elem.getResName()).collect(Collectors.toList()); } public Date getCreateTime() { diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthScoreResult.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthScoreResult.java index c503c129..302feb5b 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthScoreResult.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/health/HealthScoreResult.java @@ -3,87 +3,20 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.health; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import lombok.Data; import lombok.NoArgsConstructor; -import java.util.ArrayList; -import java.util.Date; import java.util.List; -import java.util.stream.Collectors; @Data @NoArgsConstructor -public class HealthScoreResult { - private HealthCheckNameEnum checkNameEnum; - +public class HealthScoreResult extends HealthCheckAggResult { private BaseClusterHealthConfig baseConfig; - private List poList; - - private Boolean passed; - public HealthScoreResult(HealthCheckNameEnum checkNameEnum, BaseClusterHealthConfig baseConfig, List poList) { - this.checkNameEnum = checkNameEnum; + super(checkNameEnum, poList); this.baseConfig = baseConfig; - this.poList = poList; - if (!ValidateUtils.isEmptyList(poList) && poList.stream().filter(elem -> elem.getPassed() <= 0).count() <= 0) { - passed = true; - } else { - passed = false; - } - } - - public Integer getTotalCount() { - if (poList == null) { - return 0; - } - - return poList.size(); - } - - public Integer getPassedCount() { - if (poList == null) { - return 0; - } - return (int) (poList.stream().filter(elem -> elem.getPassed() > 0).count()); - } - - /** - * 计算当前检查的健康分 - * 比如:计算集群Broker健康检查中的某一项的健康分 - */ - public Integer calRawHealthScore() { - if (poList == null || poList.isEmpty()) { - return 100; - } - - return 100 * this.getPassedCount() / this.getTotalCount(); - } - - public List getNotPassedResNameList() { - if (poList == null) { - return new ArrayList<>(); - } - - return poList.stream().filter(elem -> elem.getPassed() <= 0 && !ValidateUtils.isBlank(elem.getResName())).map(elem -> elem.getResName()).collect(Collectors.toList()); - } - - public Date getCreateTime() { - if (ValidateUtils.isEmptyList(poList)) { - return null; - } - - return poList.get(0).getCreateTime(); - } - - public Date getUpdateTime() { - if (ValidateUtils.isEmptyList(poList)) { - return null; - } - - return poList.get(0).getUpdateTime(); } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthScoreBaseResultVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthScoreBaseResultVO.java index c9abc528..113a74ab 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthScoreBaseResultVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/health/HealthScoreBaseResultVO.java @@ -30,8 +30,9 @@ public class HealthScoreBaseResultVO extends BaseTimeVO { @ApiModelProperty(value="检查说明", example = "Group延迟") private String configDesc; + @Deprecated @ApiModelProperty(value="得分", example = "100") - private Integer score; + private Integer score = 100; @ApiModelProperty(value="结果", example = "true") private Boolean passed; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/HealthScoreVOConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/HealthScoreVOConverter.java index 150306dc..e82960b1 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/HealthScoreVOConverter.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/HealthScoreVOConverter.java @@ -24,15 +24,7 @@ public class HealthScoreVOConverter { vo.setConfigName(healthScoreResult.getCheckNameEnum().getConfigName()); vo.setConfigItem(healthScoreResult.getCheckNameEnum().getConfigItem()); vo.setConfigDesc(healthScoreResult.getCheckNameEnum().getConfigDesc()); - - vo.setScore(healthScoreResult.calRawHealthScore()); - if (healthScoreResult.getTotalCount() <= 0) { - // 未知 - vo.setPassed(null); - } else { - vo.setPassed(healthScoreResult.getPassedCount().equals(healthScoreResult.getTotalCount())); - } - + vo.setPassed(healthScoreResult.getPassed()); vo.setCheckConfig(convert2HealthCheckConfigVO(ConfigGroupEnum.HEALTH.name(), healthScoreResult.getBaseConfig())); vo.setNotPassedResNameList(healthScoreResult.getNotPassedResNameList()); @@ -51,8 +43,7 @@ public class HealthScoreVOConverter { vo.setDimensionName(healthScoreResult.getCheckNameEnum().getDimensionEnum().getMessage()); vo.setConfigName(healthScoreResult.getCheckNameEnum().getConfigName()); vo.setConfigDesc(healthScoreResult.getCheckNameEnum().getConfigDesc()); - vo.setScore(healthScoreResult.calRawHealthScore()); - vo.setPassed(healthScoreResult.getPassedCount().equals(healthScoreResult.getTotalCount())); + vo.setPassed(healthScoreResult.getPassed()); vo.setCheckConfig(convert2HealthCheckConfigVO(ConfigGroupEnum.HEALTH.name(), healthScoreResult.getBaseConfig())); vo.setCreateTime(healthScoreResult.getCreateTime()); vo.setUpdateTime(healthScoreResult.getUpdateTime()); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java index daa4e641..d1b08181 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java @@ -20,6 +20,8 @@ public enum HealthCheckDimensionEnum { ZOOKEEPER(4, "Zookeeper"), + MAX_VAL(100, "所有的dimension的值需要小于MAX_VAL") + ; private final int dimension; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java index 0cb6f832..5412988c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java @@ -5,6 +5,8 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; +import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; import java.util.List; import java.util.Map; @@ -26,6 +28,11 @@ public class DataBaseDataLocalCache { .maximumSize(500) .build(); + private static final Cache>> healthCheckResultCache = Caffeine.newBuilder() + .expireAfterWrite(90, TimeUnit.SECONDS) + .maximumSize(1000) + .build(); + public static Map getTopicMetrics(Long clusterPhyId) { return topicLatestMetricsCache.getIfPresent(clusterPhyId); } @@ -50,6 +57,22 @@ public class DataBaseDataLocalCache { partitionsCache.put(clusterPhyId, partitionMap); } + public static Map> getHealthCheckResults(Long clusterId, HealthCheckDimensionEnum dimensionEnum) { + return healthCheckResultCache.getIfPresent(getHealthCheckCacheKey(clusterId, dimensionEnum.getDimension())); + } + + public static void putHealthCheckResults(Long cacheKey, Map> poMap) { + healthCheckResultCache.put(cacheKey, poMap); + } + + public static void putHealthCheckResults(Long clusterId, HealthCheckDimensionEnum dimensionEnum, Map> poMap) { + healthCheckResultCache.put(getHealthCheckCacheKey(clusterId, dimensionEnum.getDimension()), poMap); + } + + public static Long getHealthCheckCacheKey(Long clusterId, Integer dimensionCode) { + return clusterId * HealthCheckDimensionEnum.MAX_VAL.getDimension() + dimensionCode; + } + /**************************************************** private method ****************************************************/ private DataBaseDataLocalCache() { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java index 1ac1e86a..70d138be 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java @@ -8,10 +8,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; +import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; @@ -42,6 +44,9 @@ public class DatabaseDataFlusher { @Autowired private ClusterMetricService clusterMetricService; + @Autowired + private HealthCheckResultService healthCheckResultService; + @Autowired private PartitionService partitionService; @@ -52,6 +57,8 @@ public class DatabaseDataFlusher { this.flushClusterLatestMetricsCache(); this.flushTopicLatestMetricsCache(); + + this.flushHealthCheckResultCache(); } @Scheduled(cron="0 0/1 * * * ?") @@ -76,6 +83,28 @@ public class DatabaseDataFlusher { } } + @Scheduled(cron="0 0/1 * * * ?") + public void flushHealthCheckResultCache() { + FutureUtil.quickStartupFutureUtil.submitTask(() -> { + List poList = healthCheckResultService.listAll(); + + Map>> newPOMap = new ConcurrentHashMap<>(); + + // 更新缓存 + poList.forEach(po -> { + Long cacheKey = DataBaseDataLocalCache.getHealthCheckCacheKey(po.getClusterPhyId(), po.getDimension()); + + newPOMap.putIfAbsent(cacheKey, new ConcurrentHashMap<>()); + newPOMap.get(cacheKey).putIfAbsent(po.getResName(), new ArrayList<>()); + newPOMap.get(cacheKey).get(po.getResName()).add(po); + }); + + for (Map.Entry>> entry: newPOMap.entrySet()) { + DataBaseDataLocalCache.putHealthCheckResults(entry.getKey(), entry.getValue()); + } + }); + } + @Scheduled(cron = "0 0/1 * * * ?") private void flushClusterLatestMetricsCache() { for (ClusterPhy clusterPhy: clusterPhyService.listAllClusters()) { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/AbstractHealthCheckService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/AbstractHealthCheckService.java index b25a7b8c..c6b2cf3f 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/AbstractHealthCheckService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/AbstractHealthCheckService.java @@ -5,7 +5,6 @@ import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; @@ -30,7 +29,7 @@ public abstract class AbstractHealthCheckService { public abstract HealthCheckDimensionEnum getHealthCheckDimensionEnum(); public HealthCheckResult checkAndGetResult(ClusterParam clusterParam, BaseClusterHealthConfig clusterHealthConfig) { - if (ValidateUtils.anyNull( clusterParam, clusterHealthConfig)) { + if (ValidateUtils.anyNull(clusterParam, clusterHealthConfig)) { return null; } @@ -48,8 +47,10 @@ public abstract class AbstractHealthCheckService { try { return function.apply(new Tuple<>(clusterParam, clusterHealthConfig)); } catch (Exception e) { - log.error("method=checkAndGetResult||clusterPhyParam={}||clusterHealthConfig={}||errMsg=exception!", - clusterParam, clusterHealthConfig, e); + log.error( + "method=checkAndGetResult||clusterParam={}||clusterHealthConfig={}||errMsg=exception!", + clusterParam, clusterHealthConfig, e + ); } return null; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/broker/HealthCheckBrokerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/broker/HealthCheckBrokerService.java index c9b173af..6da0ed4f 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/broker/HealthCheckBrokerService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/broker/HealthCheckBrokerService.java @@ -9,7 +9,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckRes import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.broker.BrokerParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; @@ -19,7 +18,6 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.BrokerMetricVersionItems; -import lombok.Data; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -28,7 +26,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -@Data @Service public class HealthCheckBrokerService extends AbstractHealthCheckService { private static final ILog log = LogFactory.getLog(HealthCheckBrokerService.class); @@ -48,9 +45,10 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService { @Override public List getResList(Long clusterPhyId) { List paramList = new ArrayList<>(); - for (Broker broker: brokerService.listAliveBrokersFromDB(clusterPhyId)) { + for (Broker broker: brokerService.listAliveBrokersFromCacheFirst(clusterPhyId)) { paramList.add(new BrokerParam(clusterPhyId, broker.getBrokerId())); } + return paramList; } @@ -73,8 +71,11 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService { String.valueOf(param.getBrokerId()) ); - Result metricsResult = brokerMetricService.getLatestMetricsFromES( - param.getClusterPhyId(), param.getBrokerId()); + Result metricsResult = brokerMetricService.collectBrokerMetricsFromKafka( + param.getClusterPhyId(), + param.getBrokerId(), + BrokerMetricVersionItems.BROKER_METRIC_NETWORK_RPO_AVG_IDLE + ); if (metricsResult.failed()) { log.error("method=checkBrokerNetworkProcessorAvgIdleTooLow||param={}||config={}||result={}||errMsg=get metrics failed", @@ -82,14 +83,14 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService { return null; } - Float avgIdle = metricsResult.getData().getMetrics().get( BrokerMetricVersionItems.BROKER_METRIC_NETWORK_RPO_AVG_IDLE); + Float avgIdle = metricsResult.getData().getMetrics().get(BrokerMetricVersionItems.BROKER_METRIC_NETWORK_RPO_AVG_IDLE); if (avgIdle == null) { log.error("method=checkBrokerNetworkProcessorAvgIdleTooLow||param={}||config={}||result={}||errMsg=get metrics failed", param, singleConfig, metricsResult); return null; } - checkResult.setPassed(avgIdle >= singleConfig.getValue()? 1: 0); + checkResult.setPassed(avgIdle >= singleConfig.getValue()? Constant.YES: Constant.NO); return checkResult; } @@ -111,7 +112,7 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService { Result metricsResult = brokerMetricService.collectBrokerMetricsFromKafka( param.getClusterPhyId(), param.getBrokerId(), - Arrays.asList( BrokerMetricVersionItems.BROKER_METRIC_TOTAL_REQ_QUEUE) + Arrays.asList(BrokerMetricVersionItems.BROKER_METRIC_TOTAL_REQ_QUEUE) ); if (metricsResult.failed()) { @@ -120,7 +121,7 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService { return null; } - Float queueSize = metricsResult.getData().getMetrics().get( BrokerMetricVersionItems.BROKER_METRIC_TOTAL_REQ_QUEUE); + Float queueSize = metricsResult.getData().getMetrics().get(BrokerMetricVersionItems.BROKER_METRIC_TOTAL_REQ_QUEUE); if (queueSize == null) { log.error("method=checkBrokerRequestQueueFull||param={}||config={}||result={}||errMsg=get metrics failed", param, singleConfig, metricsResult); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/group/HealthCheckGroupService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/group/HealthCheckGroupService.java index 8cc40c20..d5022cfc 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/group/HealthCheckGroupService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/group/HealthCheckGroupService.java @@ -6,7 +6,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.Ba import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthDetectedInLatestMinutesConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.GroupParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchTerm; @@ -17,7 +16,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; -import lombok.Data; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -27,7 +25,6 @@ import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.GROUP_METRIC_STATE; -@Data @Service public class HealthCheckGroupService extends AbstractHealthCheckService { private static final ILog log = LogFactory.getLog(HealthCheckGroupService.class); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/topic/HealthCheckTopicService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/topic/HealthCheckTopicService.java index 613d2902..2557fd5b 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/topic/HealthCheckTopicService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/topic/HealthCheckTopicService.java @@ -7,7 +7,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.He import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthDetectedInLatestMinutesConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -32,7 +31,7 @@ import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafk @Service public class HealthCheckTopicService extends AbstractHealthCheckService { - private static final ILog log = LogFactory.getLog(HealthCheckTopicService.class); + private static final ILog LOGGER = LogFactory.getLog(HealthCheckTopicService.class); @Autowired private TopicService topicService; @@ -52,7 +51,7 @@ public class HealthCheckTopicService extends AbstractHealthCheckService { @Override public List getResList(Long clusterPhyId) { List paramList = new ArrayList<>(); - for (Topic topic: topicService.listTopicsFromDB(clusterPhyId)) { + for (Topic topic: topicService.listTopicsFromCacheFirst(clusterPhyId)) { paramList.add(new TopicParam(clusterPhyId, topic.getTopicName())); } return paramList; @@ -86,12 +85,12 @@ public class HealthCheckTopicService extends AbstractHealthCheckService { ); if (countResult.failed() || !countResult.hasData()) { - log.error("method=checkTopicUnderReplicatedPartition||param={}||config={}||result={}||errMsg=get metrics failed", + LOGGER.error("method=checkTopicUnderReplicatedPartition||param={}||config={}||result={}||errMsg=get metrics failed", param, singleConfig, countResult); return null; } - checkResult.setPassed(countResult.getData() >= singleConfig.getDetectedTimes()? 0: 1); + checkResult.setPassed(countResult.getData() >= singleConfig.getDetectedTimes()? Constant.NO: Constant.YES); return checkResult; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java index 45e6e0b8..f79af127 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java @@ -6,11 +6,9 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthAmountRatioConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.zookeeper.ZookeeperParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -22,26 +20,23 @@ import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.ZookeeperUtils; -import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ZookeeperMetricVersionItems; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; @Service public class HealthCheckZookeeperService extends AbstractHealthCheckService { private static final ILog log = LogFactory.getLog(HealthCheckZookeeperService.class); - @Autowired - private ClusterPhyService clusterPhyService; - @Autowired private ZookeeperService zookeeperService; @@ -60,22 +55,24 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { @Override public List getResList(Long clusterPhyId) { - ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId); + ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId); if (clusterPhy == null) { return new ArrayList<>(); } try { - return Arrays.asList(new ZookeeperParam( - clusterPhyId, - ZookeeperUtils.connectStringParser(clusterPhy.getZookeeper()), - ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class) - )); + return Collections.singletonList( + new ZookeeperParam( + clusterPhyId, + ZookeeperUtils.connectStringParser(clusterPhy.getZookeeper()), + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class) + ) + ); } catch (Exception e) { - log.error("class=HealthCheckZookeeperService||method=getResList||clusterPhyId={}||errMsg=exception!", clusterPhyId, e); + log.error("method=getResList||clusterPhyId={}||errMsg=exception!", clusterPhyId, e); } - return new ArrayList<>(); + return Collections.emptyList(); } @Override @@ -85,7 +82,6 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { private HealthCheckResult checkBrainSplit(Tuple singleConfigSimpleTuple) { ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); - HealthCompareValueConfig valueConfig = (HealthCompareValueConfig) singleConfigSimpleTuple.getV2(); List infoList = zookeeperService.listFromDBByCluster(param.getClusterPhyId()); HealthCheckResult checkResult = new HealthCheckResult( @@ -97,7 +93,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { long value = infoList.stream().filter(elem -> ZKRoleEnum.LEADER.getRole().equals(elem.getRole())).count(); - checkResult.setPassed(value == valueConfig.getValue().longValue() ? Constant.YES : Constant.NO); + checkResult.setPassed(value == 1 ? Constant.YES : Constant.NO); return checkResult; } @@ -116,7 +112,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { ); if (metricsResult.failed() || !metricsResult.hasData()) { log.error( - "class=HealthCheckZookeeperService||method=checkOutstandingRequests||clusterPhyId={}||param={}||config={}||result={}||errMsg=get metrics failed",clusterPhyId ,param, valueConfig, metricsResult + "method=checkOutstandingRequests||clusterPhyId={}||param={}||config={}||result={}||errMsg=get metrics failed",clusterPhyId ,param, valueConfig, metricsResult ); return null; } @@ -130,14 +126,14 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { Float value = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS); if(null == value){ - log.error("class=HealthCheckZookeeperService||method=checkOutstandingRequests||clusterPhyId={}|| errMsg=get OutstandingRequests metric failed, may be collect failed or zk mntr command not in whitelist.", clusterPhyId); + log.error("method=checkOutstandingRequests||clusterPhyId={}|| errMsg=get OutstandingRequests metric failed, may be collect failed or zk mntr command not in whitelist.", clusterPhyId); return null; } Integer amount = valueConfig.getAmount(); Double ratio = valueConfig.getRatio(); if (null == amount || null == ratio) { - log.error("class=HealthCheckZookeeperService||method=checkOutstandingRequests||clusterPhyId={}||result={}||errMsg=get valueConfig amount/ratio config failed", clusterPhyId,valueConfig); + log.error("method=checkOutstandingRequests||clusterPhyId={}||result={}||errMsg=get valueConfig amount/ratio config failed", clusterPhyId,valueConfig); return null; } @@ -163,7 +159,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { if (metricsResult.failed() || !metricsResult.hasData()) { log.error( - "class=HealthCheckZookeeperService||method=checkWatchCount||param={}||config={}||result={}||errMsg=get metrics failed", + "method=checkWatchCount||param={}||config={}||result={}||errMsg=get metrics failed", param, valueConfig, metricsResult ); return null; @@ -199,7 +195,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { if (metricsResult.failed() || !metricsResult.hasData()) { log.error( - "class=HealthCheckZookeeperService||method=checkAliveConnections||param={}||config={}||result={}||errMsg=get metrics failed", + "method=checkAliveConnections||param={}||config={}||result={}||errMsg=get metrics failed", param, valueConfig, metricsResult ); return null; @@ -235,7 +231,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { if (metricsResult.failed() || !metricsResult.hasData()) { log.error( - "class=HealthCheckZookeeperService||method=checkApproximateDataSize||param={}||config={}||result={}||errMsg=get metrics failed", + "method=checkApproximateDataSize||param={}||config={}||result={}||errMsg=get metrics failed", param, valueConfig, metricsResult ); return null; @@ -271,7 +267,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { if (metricsResult.failed() || !metricsResult.hasData()) { log.error( - "class=HealthCheckZookeeperService||method=checkSentRate||param={}||config={}||result={}||errMsg=get metrics failed", + "method=checkSentRate||param={}||config={}||result={}||errMsg=get metrics failed", param, valueConfig, metricsResult ); return null; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java index 6aaddcdb..66a48904 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java @@ -1,25 +1,27 @@ package com.xiaojukeji.know.streaming.km.core.service.health.checkresult; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAggResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; -import java.util.Date; import java.util.List; import java.util.Map; public interface HealthCheckResultService { - int replace(HealthCheckResult healthCheckResult); + List getHealthCheckAggResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum, String resNme); + List getHealthCheckAggResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum); - int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime); + List listAll(); + List listCheckResult(Long clusterPhyId); + List listCheckResult(Long clusterPhyId, Integer resDimension); + List listCheckResult(Long clusterPhyId, Integer resDimension, String resNme); - List getClusterHealthCheckResult(Long clusterPhyId); - - List getClusterResourcesHealthCheckResult(Long clusterPhyId, Integer resDimension); - - List getResHealthCheckResult(Long clusterPhyId, Integer dimension, String resNme); + List listCheckResultFromCache(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum); + List listCheckResultFromCache(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum, String resNme); Map getClusterHealthConfig(Long clusterPhyId); - void batchReplace(Long clusterPhyId, List healthCheckResults); + void batchReplace(Long clusterPhyId, Integer dimension, List healthCheckResults); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java index cad2f396..796ef2d8 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java @@ -5,13 +5,16 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.google.common.collect.Lists; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAggResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.po.config.PlatformClusterConfigPO; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigGroupEnum; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.core.service.config.PlatformClusterConfigService; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.persistence.mysql.health.HealthCheckResultDAO; @@ -22,7 +25,7 @@ import java.util.*; @Service public class HealthCheckResultServiceImpl implements HealthCheckResultService { - private static final ILog log = LogFactory.getLog(HealthCheckResultServiceImpl.class); + private static final ILog LOGGER = LogFactory.getLog(HealthCheckResultServiceImpl.class); @Autowired private HealthCheckResultDAO healthCheckResultDAO; @@ -31,42 +34,71 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService { private PlatformClusterConfigService platformClusterConfigService; @Override - public int replace(HealthCheckResult healthCheckResult) { - return healthCheckResultDAO.replace(ConvertUtil.obj2Obj(healthCheckResult, HealthCheckResultPO.class)); + public List getHealthCheckAggResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum, String resNme) { + List poList = this.listCheckResultFromCache(clusterPhyId, dimensionEnum, resNme); + + return this.convert2HealthCheckAggResultList(poList, dimensionEnum.getDimension()); } @Override - public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(HealthCheckResultPO::getClusterPhyId, clusterPhyId); - lambdaQueryWrapper.le(HealthCheckResultPO::getUpdateTime, beforeTime); - return healthCheckResultDAO.delete(lambdaQueryWrapper); + public List getHealthCheckAggResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum) { + List poList = this.listCheckResultFromCache(clusterPhyId, dimensionEnum); + + return this.convert2HealthCheckAggResultList(poList, dimensionEnum.getDimension()); } @Override - public List getClusterHealthCheckResult(Long clusterPhyId) { + public List listAll() { + return healthCheckResultDAO.selectList(null); + } + + @Override + public List listCheckResult(Long clusterPhyId) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(HealthCheckResultPO::getClusterPhyId, clusterPhyId); + return healthCheckResultDAO.selectList(lambdaQueryWrapper); } @Override - public List getClusterResourcesHealthCheckResult(Long clusterPhyId, Integer resDimension) { + public List listCheckResult(Long clusterPhyId, Integer resDimension) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(HealthCheckResultPO::getDimension, resDimension); lambdaQueryWrapper.eq(HealthCheckResultPO::getClusterPhyId, clusterPhyId); + return healthCheckResultDAO.selectList(lambdaQueryWrapper); } @Override - public List getResHealthCheckResult(Long clusterPhyId, Integer resDimension, String resNme) { + public List listCheckResult(Long clusterPhyId, Integer resDimension, String resNme) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(HealthCheckResultPO::getDimension, resDimension); lambdaQueryWrapper.eq(HealthCheckResultPO::getClusterPhyId, clusterPhyId); lambdaQueryWrapper.eq(HealthCheckResultPO::getResName, resNme); + return healthCheckResultDAO.selectList(lambdaQueryWrapper); } + @Override + public List listCheckResultFromCache(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum) { + Map> poMap = DataBaseDataLocalCache.getHealthCheckResults(clusterPhyId, dimensionEnum); + if (poMap != null) { + return poMap.values().stream().collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll); + } + + return new ArrayList<>(); + } + + @Override + public List listCheckResultFromCache(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum, String resNme) { + Map> poMap = DataBaseDataLocalCache.getHealthCheckResults(clusterPhyId, dimensionEnum); + if (poMap != null) { + return poMap.getOrDefault(resNme, new ArrayList<>()); + } + + return new ArrayList<>(); + } + @Override public Map getClusterHealthConfig(Long clusterPhyId) { Map configPOMap = platformClusterConfigService.getByClusterAndGroupWithoutDefault(clusterPhyId, ConfigGroupEnum.HEALTH.name()); @@ -76,7 +108,7 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService { try { HealthCheckNameEnum nameEnum = HealthCheckNameEnum.getByName(po.getValueName()); if (HealthCheckNameEnum.UNKNOWN.equals(nameEnum)) { - log.warn("method=getClusterHealthConfig||config={}||errMsg=config name illegal", po); + LOGGER.warn("method=getClusterHealthConfig||config={}||errMsg=config name illegal", po); continue; } @@ -85,22 +117,37 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService { healthConfig.setClusterPhyId(clusterPhyId); configMap.put(po.getValueName(), healthConfig); } catch (Exception e) { - log.error("method=getClusterHealthConfig||config={}||errMsg=exception!", po, e); + LOGGER.error("method=getClusterHealthConfig||config={}||errMsg=exception!", po, e); } } return configMap; } @Override - public void batchReplace(Long clusterPhyId, List healthCheckResults) { + public void batchReplace(Long clusterPhyId, Integer dimension, List healthCheckResults) { List> healthCheckResultPartitions = Lists.partition(healthCheckResults, Constant.PER_BATCH_MAX_VALUE); for (List checkResultPartition : healthCheckResultPartitions) { List healthCheckResultPos = ConvertUtil.list2List(checkResultPartition, HealthCheckResultPO.class); try { healthCheckResultDAO.batchReplace(healthCheckResultPos); } catch (Exception e) { - log.error("method=batchReplace||clusterPhyId={}||checkResultList={}||errMsg=exception!", clusterPhyId, healthCheckResultPos, e); + LOGGER.error("method=batchReplace||clusterPhyId={}||checkResultList={}||errMsg=exception!", clusterPhyId, healthCheckResultPos, e); } } } + + private List convert2HealthCheckAggResultList(List poList, Integer dimensionCode) { + Map /*检查结果列表*/> groupByCheckNamePOMap = new HashMap<>(); + for (HealthCheckResultPO po: poList) { + groupByCheckNamePOMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); + groupByCheckNamePOMap.get(po.getConfigName()).add(po); + } + + List stateList = new ArrayList<>(); + for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimensionCode(dimensionCode)) { + stateList.add(new HealthCheckAggResult(nameEnum, groupByCheckNamePOMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>()))); + } + + return stateList; + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java index 3ef0a82d..35692cb8 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java @@ -9,42 +9,18 @@ import java.util.List; public interface HealthStateService { /** - * 集群健康指标 + * 健康指标 */ ClusterMetrics calClusterHealthMetrics(Long clusterPhyId); - - /** - * 获取Broker健康指标 - */ BrokerMetrics calBrokerHealthMetrics(Long clusterPhyId, Integer brokerId); - - /** - * 获取Topic健康指标 - */ TopicMetrics calTopicHealthMetrics(Long clusterPhyId, String topicName); - - /** - * 获取Group健康指标 - */ GroupMetrics calGroupHealthMetrics(Long clusterPhyId, String groupName); - - /** - * 获取Zookeeper健康指标 - */ ZookeeperMetrics calZookeeperHealthMetrics(Long clusterPhyId); /** * 获取集群健康检查结果 */ List getClusterHealthResult(Long clusterPhyId); - - /** - * 获取集群某个维度健康检查结果 - */ List getDimensionHealthResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum); - - /** - * 获取集群某个资源的健康检查结果 - */ List getResHealthResult(Long clusterPhyId, Integer dimension, String resNme); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java index 8cb44fd4..5669f300 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java @@ -14,22 +14,16 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; -import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.BrokerMetricVersionItems.*; -import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.BrokerMetricVersionItems.BROKER_METRIC_HEALTH_STATE; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*; -import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.GROUP_METRIC_HEALTH_CHECK_TOTAL; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.*; -import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.TOPIC_METRIC_HEALTH_CHECK_TOTAL; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ZookeeperMetricVersionItems.*; @@ -49,7 +43,7 @@ public class HealthStateServiceImpl implements HealthStateService { ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); // 集群维度指标 - List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.CLUSTER); + List resultList = healthCheckResultService.getHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.CLUSTER); if (ValidateUtils.isEmptyList(resultList)) { metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER, 0.0f); metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER, 0.0f); @@ -98,16 +92,16 @@ public class HealthStateServiceImpl implements HealthStateService { @Override public BrokerMetrics calBrokerHealthMetrics(Long clusterPhyId, Integer brokerId) { - List healthScoreResultList = this.getResHealthResult(clusterPhyId, HealthCheckDimensionEnum.BROKER.getDimension(), String.valueOf(brokerId)); + List aggResultList = healthCheckResultService.getHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.BROKER, String.valueOf(brokerId)); BrokerMetrics metrics = new BrokerMetrics(clusterPhyId, brokerId); - if (ValidateUtils.isEmptyList(healthScoreResultList)) { + if (ValidateUtils.isEmptyList(aggResultList)) { metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension()); metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_PASSED, 0.0f); metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_TOTAL, 0.0f); } else { - metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_PASSED, getHealthCheckResultPassed(healthScoreResultList)); - metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_TOTAL, Float.valueOf(healthScoreResultList.size())); + metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckPassed(aggResultList)); + metrics.getMetrics().put(BROKER_METRIC_HEALTH_CHECK_TOTAL, (float)aggResultList.size()); // 计算健康状态 Broker broker = brokerService.getBrokerFromCacheFirst(clusterPhyId, brokerId); @@ -117,7 +111,7 @@ public class HealthStateServiceImpl implements HealthStateService { } else if (!broker.alive()) { metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.DEAD.getDimension()); } else { - metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)this.calHealthScoreResultState(healthScoreResultList).getDimension()); + metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)this.calHealthState(aggResultList).getDimension()); } } @@ -126,17 +120,17 @@ public class HealthStateServiceImpl implements HealthStateService { @Override public TopicMetrics calTopicHealthMetrics(Long clusterPhyId, String topicName) { - List healthScoreResultList = this.getResHealthResult(clusterPhyId, HealthCheckDimensionEnum.TOPIC.getDimension(), topicName); + List aggResultList = healthCheckResultService.getHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.TOPIC, topicName); TopicMetrics metrics = new TopicMetrics(topicName, clusterPhyId,true); - if (ValidateUtils.isEmptyList(healthScoreResultList)) { + if (ValidateUtils.isEmptyList(aggResultList)) { metrics.getMetrics().put(TOPIC_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension()); metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_PASSED, 0.0f); metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_TOTAL, 0.0f); } else { - metrics.getMetrics().put(TOPIC_METRIC_HEALTH_STATE, (float)this.calHealthScoreResultState(healthScoreResultList).getDimension()); - metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckResultPassed(healthScoreResultList)); - metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_TOTAL, Float.valueOf(healthScoreResultList.size())); + metrics.getMetrics().put(TOPIC_METRIC_HEALTH_STATE, (float)this.calHealthState(aggResultList).getDimension()); + metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckPassed(aggResultList)); + metrics.getMetrics().put(TOPIC_METRIC_HEALTH_CHECK_TOTAL, (float)aggResultList.size()); } return metrics; @@ -144,17 +138,17 @@ public class HealthStateServiceImpl implements HealthStateService { @Override public GroupMetrics calGroupHealthMetrics(Long clusterPhyId, String groupName) { - List healthScoreResultList = this.getResHealthResult(clusterPhyId, HealthCheckDimensionEnum.GROUP.getDimension(), groupName); + List aggResultList = healthCheckResultService.getHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.GROUP, groupName); GroupMetrics metrics = new GroupMetrics(clusterPhyId, groupName, true); - if (ValidateUtils.isEmptyList(healthScoreResultList)) { + if (ValidateUtils.isEmptyList(aggResultList)) { metrics.getMetrics().put(GROUP_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension()); metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_PASSED, 0.0f); metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_TOTAL, 0.0f); } else { - metrics.getMetrics().put(GROUP_METRIC_HEALTH_STATE, (float)this.calHealthScoreResultState(healthScoreResultList).getDimension()); - metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_PASSED, getHealthCheckResultPassed(healthScoreResultList)); - metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_TOTAL, Float.valueOf(healthScoreResultList.size())); + metrics.getMetrics().put(GROUP_METRIC_HEALTH_STATE, (float)this.calHealthState(aggResultList).getDimension()); + metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckPassed(aggResultList)); + metrics.getMetrics().put(GROUP_METRIC_HEALTH_CHECK_TOTAL, (float)aggResultList.size()); } return metrics; @@ -162,15 +156,15 @@ public class HealthStateServiceImpl implements HealthStateService { @Override public ZookeeperMetrics calZookeeperHealthMetrics(Long clusterPhyId) { - List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.ZOOKEEPER); + List aggResultList = healthCheckResultService.getHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.ZOOKEEPER); ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId); - if (ValidateUtils.isEmptyList(resultList)) { + if (ValidateUtils.isEmptyList(aggResultList)) { metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED, 0.0f); metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL, 0.0f); } else { - metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckPassed(resultList)); - metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL, (float)resultList.size()); + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckPassed(aggResultList)); + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL, (float)aggResultList.size()); } if (zookeeperService.allServerDown(clusterPhyId)) { @@ -186,88 +180,29 @@ public class HealthStateServiceImpl implements HealthStateService { } // 服务未挂时,依据检查结果计算状态 - metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_STATE, (float)this.calHealthState(resultList).getDimension()); + metrics.getMetrics().put(ZOOKEEPER_METRIC_HEALTH_STATE, (float)this.calHealthState(aggResultList).getDimension()); return metrics; } @Override public List getClusterHealthResult(Long clusterPhyId) { - List poList = healthCheckResultService.getClusterHealthCheckResult(clusterPhyId); + List poList = healthCheckResultService.listCheckResult(clusterPhyId); - // <检查项,<检查结果>> - Map> checkResultMap = new HashMap<>(); - for (HealthCheckResultPO po: poList) { - checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); - checkResultMap.get(po.getConfigName()).add(po); - } - - Map configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId); - - List healthScoreResultList = new ArrayList<>(); - for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.values()) { - BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName()); - if (baseConfig == null) { - continue; - } - - healthScoreResultList.add(new HealthScoreResult( - nameEnum, - baseConfig, - checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>())) - ); - } - - return healthScoreResultList; + return this.convert2HealthScoreResultList(clusterPhyId, poList, null); } @Override public List getDimensionHealthResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum) { - List poList = healthCheckResultService.getClusterResourcesHealthCheckResult(clusterPhyId, dimensionEnum.getDimension()); + List poList = healthCheckResultService.listCheckResult(clusterPhyId, dimensionEnum.getDimension()); - // <检查项,<通过的数量,不通过的数量>> - Map> checkResultMap = new HashMap<>(); - for (HealthCheckResultPO po: poList) { - checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); - checkResultMap.get(po.getConfigName()).add(po); - } - - Map configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId); - - List healthScoreResultList = new ArrayList<>(); - for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimension(dimensionEnum)) { - BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName()); - if (baseConfig == null) { - continue; - } - - healthScoreResultList.add(new HealthScoreResult(nameEnum, baseConfig, checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>()))); - } - - return healthScoreResultList; + return this.convert2HealthScoreResultList(clusterPhyId, poList, dimensionEnum.getDimension()); } @Override public List getResHealthResult(Long clusterPhyId, Integer dimension, String resNme) { - List poList = healthCheckResultService.getResHealthCheckResult(clusterPhyId, dimension, resNme); - Map> checkResultMap = new HashMap<>(); - for (HealthCheckResultPO po: poList) { - checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); - checkResultMap.get(po.getConfigName()).add(po); - } + List poList = healthCheckResultService.listCheckResult(clusterPhyId, dimension, resNme); - Map configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId); - - List healthScoreResultList = new ArrayList<>(); - for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimensionCode(dimension)) { - BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName()); - if (baseConfig == null) { - continue; - } - - healthScoreResultList.add(new HealthScoreResult(nameEnum, baseConfig, checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>()))); - } - - return healthScoreResultList; + return this.convert2HealthScoreResultList(clusterPhyId, poList, dimension); } @@ -275,7 +210,7 @@ public class HealthStateServiceImpl implements HealthStateService { private ClusterMetrics calClusterTopicsHealthMetrics(Long clusterPhyId) { - List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.TOPIC); + List resultList = healthCheckResultService.getHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.TOPIC); ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); if (ValidateUtils.isEmptyList(resultList)) { @@ -292,7 +227,7 @@ public class HealthStateServiceImpl implements HealthStateService { } private ClusterMetrics calClusterGroupsHealthMetrics(Long clusterPhyId) { - List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.GROUP); + List resultList = healthCheckResultService.getHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.GROUP); ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); if (ValidateUtils.isEmptyList(resultList)) { @@ -309,7 +244,7 @@ public class HealthStateServiceImpl implements HealthStateService { } private ClusterMetrics calClusterBrokersHealthMetrics(Long clusterPhyId) { - List resultList = this.getDimensionHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.BROKER); + List resultList = healthCheckResultService.getHealthCheckAggResult(clusterPhyId, HealthCheckDimensionEnum.BROKER); ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); if (ValidateUtils.isEmptyList(resultList)) { @@ -337,29 +272,45 @@ public class HealthStateServiceImpl implements HealthStateService { return metrics; } - private List getDimensionHealthCheckAggResult(Long clusterPhyId, HealthCheckDimensionEnum dimensionEnum) { - List poList = healthCheckResultService.getClusterResourcesHealthCheckResult(clusterPhyId, dimensionEnum.getDimension()); - Map /*检查结果列表*/> groupByCheckNamePOMap = new HashMap<>(); + /**************************************************** 聚合数据 ****************************************************/ + + public List convert2HealthScoreResultList(Long clusterPhyId, List poList, Integer dimensionCode) { + Map> checkResultMap = new HashMap<>(); for (HealthCheckResultPO po: poList) { - groupByCheckNamePOMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); - groupByCheckNamePOMap.get(po.getConfigName()).add(po); + checkResultMap.putIfAbsent(po.getConfigName(), new ArrayList<>()); + checkResultMap.get(po.getConfigName()).add(po); } - List stateList = new ArrayList<>(); - for (HealthCheckNameEnum nameEnum: HealthCheckNameEnum.getByDimension(dimensionEnum)) { - stateList.add(new HealthCheckAggResult(nameEnum, groupByCheckNamePOMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>()))); + Map configMap = healthCheckResultService.getClusterHealthConfig(clusterPhyId); + + List nameEnums = + dimensionCode == null? + Arrays.stream(HealthCheckNameEnum.values()).collect(Collectors.toList()): HealthCheckNameEnum.getByDimensionCode(dimensionCode); + + List resultList = new ArrayList<>(); + for (HealthCheckNameEnum nameEnum: nameEnums) { + BaseClusterHealthConfig baseConfig = configMap.get(nameEnum.getConfigName()); + if (baseConfig == null) { + continue; + } + + resultList.add(new HealthScoreResult(nameEnum, baseConfig, checkResultMap.getOrDefault(nameEnum.getConfigName(), new ArrayList<>()))); } - return stateList; + return resultList; } - private float getHealthCheckPassed(List resultList){ - if(ValidateUtils.isEmptyList(resultList)) { + + /**************************************************** 计算指标 ****************************************************/ + + + private float getHealthCheckPassed(List aggResultList){ + if(ValidateUtils.isEmptyList(aggResultList)) { return 0f; } - return Float.valueOf(resultList.stream().filter(elem -> elem.getPassed()).count()); + return Float.valueOf(aggResultList.stream().filter(elem -> elem.getPassed()).count()); } private HealthStateEnum calHealthState(List resultList) { @@ -380,29 +331,4 @@ public class HealthStateServiceImpl implements HealthStateService { return existNotPassed? HealthStateEnum.MEDIUM: HealthStateEnum.GOOD; } - - private float getHealthCheckResultPassed(List healthScoreResultList){ - if(CollectionUtils.isEmpty(healthScoreResultList)){return 0f;} - - return Float.valueOf(healthScoreResultList.stream().filter(elem -> elem.getPassed()).count()); - } - - private HealthStateEnum calHealthScoreResultState(List resultList) { - if(ValidateUtils.isEmptyList(resultList)) { - return HealthStateEnum.GOOD; - } - - boolean existNotPassed = false; - for (HealthScoreResult aggResult: resultList) { - if (aggResult.getCheckNameEnum().isAvailableChecker() && !aggResult.getPassed()) { - return HealthStateEnum.POOR; - } - - if (!aggResult.getPassed()) { - existNotPassed = true; - } - } - - return existNotPassed? HealthStateEnum.MEDIUM: HealthStateEnum.GOOD; - } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/AbstractHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/AbstractHealthCheckTask.java index f8c2185b..bc367a71 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/AbstractHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/AbstractHealthCheckTask.java @@ -3,30 +3,31 @@ package com.xiaojukeji.know.streaming.km.task.kafka.health; import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.collector.service.CollectThreadPoolService; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.task.kafka.metrics.AbstractAsyncMetricsDispatchTask; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(AbstractHealthCheckTask.class); + private static final ILog LOGGER = LogFactory.getLog(AbstractHealthCheckTask.class); @Autowired private HealthCheckResultService healthCheckResultService; + @Autowired + private CollectThreadPoolService collectThreadPoolService; + public abstract AbstractHealthCheckService getCheckService(); @Override @@ -38,32 +39,37 @@ public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispat // 获取配置,<配置名,配置信息> Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); - // 检查结果 - List resultList = new ArrayList<>(); - - // 遍历Check-Service + // 获取资源列表 List paramList = this.getCheckService().getResList(clusterPhy.getId()); + + // 检查结果 + List checkResultList = Collections.synchronizedList(new ArrayList<>()); if (ValidateUtils.isEmptyList(paramList)) { // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), this.getCheckService(), healthConfigMap)); + checkResultList.addAll(this.getNoResResult(clusterPhy.getId(), this.getCheckService(), healthConfigMap)); } + // 获取合适的线程池 + FutureWaitUtil futureWaitUtil = collectThreadPoolService.selectSuitableFutureUtil(clusterPhy.getId() * 100 + this.getCheckService().getHealthCheckDimensionEnum().getDimension()); + // 遍历资源 for (ClusterParam clusterParam: paramList) { - resultList.addAll(this.checkAndGetResult(clusterParam, healthConfigMap)); + futureWaitUtil.runnableTask( + String.format("class=%s||method=calAndUpdateHealthCheckResult||clusterId=%d", this.getCheckService().getClass().getSimpleName(), clusterPhy.getId()), + 30000, + () -> checkResultList.addAll(this.checkAndGetResult(clusterParam, healthConfigMap)) + ); } - try { - healthCheckResultService.batchReplace(clusterPhy.getId(), resultList); - } catch (Exception e) { - log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); - } + futureWaitUtil.waitExecute(30000); - // 删除10分钟之前的检查结果 try { - healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 20 * 60 * 1000)); + healthCheckResultService.batchReplace(clusterPhy.getId(), this.getCheckService().getHealthCheckDimensionEnum().getDimension(), checkResultList); } catch (Exception e) { - log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + LOGGER.error( + "extendClass={}||method=calAndUpdateHealthCheckResult||clusterPhyId={}||errMsg=exception!", + this.getCheckService().getClass().getSimpleName(), clusterPhy.getId(), e + ); } return TaskResult.SUCCESS; diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/TopicHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/TopicHealthCheckTask.java index c3226feb..167c8c57 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/TopicHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/TopicHealthCheckTask.java @@ -12,7 +12,7 @@ import org.springframework.beans.factory.annotation.Autowired; @AllArgsConstructor @Task(name = "TopicHealthCheckTask", description = "Topic健康检查", - cron = "0 0/1 * * * ? *", + cron = "30 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60)