diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java similarity index 76% rename from km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java rename to km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java index 6af11e96..7b611823 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java @@ -9,36 +9,38 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import com.xiaojukeji.know.streaming.km.common.component.SpringTool; import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.broker.HealthCheckBrokerService; +import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; @NoArgsConstructor @AllArgsConstructor -@Task(name = "HealthCheckTask", - description = "健康检查", +@Task(name = "BrokerHealthCheckTask", + description = "Broker健康检查", cron = "0 0/1 * * * ? *", autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class HealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(HealthCheckTask.class); +public class BrokerHealthCheckTask extends AbstractAsyncMetricsDispatchTask { + private static final ILog log = LogFactory.getLog(BrokerHealthCheckTask.class); @Autowired private HealthCheckResultService healthCheckResultService; - private final List healthCheckServiceList = new ArrayList<>( - SpringTool.getBeansOfType(AbstractHealthCheckService.class).values() - ); + @Autowired + private HealthCheckBrokerService healthCheckBrokerService; @Override public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { @@ -53,25 +55,22 @@ public class HealthCheckTask extends AbstractAsyncMetricsDispatchTask { List resultList = new ArrayList<>(); // 遍历Check-Service - for (AbstractHealthCheckService healthCheckService: healthCheckServiceList) { - List paramList = healthCheckService.getResList(clusterPhy.getId()); - if (ValidateUtils.isEmptyList(paramList)) { - // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckService, healthConfigMap)); - continue; - } + List paramList = healthCheckBrokerService.getResList(clusterPhy.getId()); + if (ValidateUtils.isEmptyList(paramList)) { + // 当前无该维度的资源,则直接设置为 + resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckBrokerService, healthConfigMap)); + } - // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(healthCheckService, clusterPhyParam, healthConfigMap)); - } + // 遍历资源 + for (ClusterPhyParam clusterPhyParam: paramList) { + resultList.addAll(this.checkAndGetResult(healthCheckBrokerService, clusterPhyParam, healthConfigMap)); } for (HealthCheckResult checkResult: resultList) { try { healthCheckResultService.replace(checkResult); } catch (Exception e) { - log.error("method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); + log.error("class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); } } @@ -79,7 +78,7 @@ public class HealthCheckTask extends AbstractAsyncMetricsDispatchTask { try { healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); } catch (Exception e) { - log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + log.error("class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); } return TaskResult.SUCCESS; diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java new file mode 100644 index 00000000..cb7f78b2 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java @@ -0,0 +1,130 @@ +package com.xiaojukeji.know.streaming.km.task.health; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.cluster.HealthCheckClusterService; +import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; +import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +@NoArgsConstructor +@AllArgsConstructor +@Task(name = "ClusterHealthCheckTask", + description = "Cluster健康检查", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class ClusterHealthCheckTask extends AbstractAsyncMetricsDispatchTask { + private static final ILog log = LogFactory.getLog(ClusterHealthCheckTask.class); + + @Autowired + private HealthCheckResultService healthCheckResultService; + + @Autowired + private HealthCheckClusterService healthCheckClusterService; + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); + } + + private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + // 获取配置,<配置名,配置信息> + Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); + + // 检查结果 + List resultList = new ArrayList<>(); + + // 遍历Check-Service + List paramList = healthCheckClusterService.getResList(clusterPhy.getId()); + if (ValidateUtils.isEmptyList(paramList)) { + // 当前无该维度的资源,则直接设置为 + resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckClusterService, healthConfigMap)); + } + + // 遍历资源 + for (ClusterPhyParam clusterPhyParam: paramList) { + resultList.addAll(this.checkAndGetResult(healthCheckClusterService, clusterPhyParam, healthConfigMap)); + } + + for (HealthCheckResult checkResult: resultList) { + try { + healthCheckResultService.replace(checkResult); + } catch (Exception e) { + log.error("class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); + } + } + + // 删除10分钟之前的检查结果 + try { + healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); + } catch (Exception e) { + log.error("class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + } + + return TaskResult.SUCCESS; + } + + private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); + if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { + // 类型不匹配 + continue; + } + + // 记录 + HealthCheckResult checkResult = new HealthCheckResult( + dimensionEnum.getDimension(), + clusterHealthConfig.getCheckNameEnum().getConfigName(), + clusterPhyId, + "-1" + ); + checkResult.setPassed(Constant.YES); + resultList.add(checkResult); + } + + return resultList; + } + + private List checkAndGetResult(AbstractHealthCheckService healthCheckService, + ClusterPhyParam clusterPhyParam, + Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); + if (healthCheckResult == null) { + continue; + } + + // 记录 + resultList.add(healthCheckResult); + } + + return resultList; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java new file mode 100644 index 00000000..581a679a --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java @@ -0,0 +1,130 @@ +package com.xiaojukeji.know.streaming.km.task.health; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.group.HealthCheckGroupService; +import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; +import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +@NoArgsConstructor +@AllArgsConstructor +@Task(name = "GroupHealthCheckTask", + description = "Group健康检查", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class GroupHealthCheckTask extends AbstractAsyncMetricsDispatchTask { + private static final ILog log = LogFactory.getLog(GroupHealthCheckTask.class); + + @Autowired + private HealthCheckResultService healthCheckResultService; + + @Autowired + private HealthCheckGroupService healthCheckGroupService; + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); + } + + private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + // 获取配置,<配置名,配置信息> + Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); + + // 检查结果 + List resultList = new ArrayList<>(); + + // 遍历Check-Service + List paramList = healthCheckGroupService.getResList(clusterPhy.getId()); + if (ValidateUtils.isEmptyList(paramList)) { + // 当前无该维度的资源,则直接设置为 + resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckGroupService, healthConfigMap)); + } + + // 遍历资源 + for (ClusterPhyParam clusterPhyParam: paramList) { + resultList.addAll(this.checkAndGetResult(healthCheckGroupService, clusterPhyParam, healthConfigMap)); + } + + for (HealthCheckResult checkResult: resultList) { + try { + healthCheckResultService.replace(checkResult); + } catch (Exception e) { + log.error("class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); + } + } + + // 删除10分钟之前的检查结果 + try { + healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); + } catch (Exception e) { + log.error("class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + } + + return TaskResult.SUCCESS; + } + + private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); + if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { + // 类型不匹配 + continue; + } + + // 记录 + HealthCheckResult checkResult = new HealthCheckResult( + dimensionEnum.getDimension(), + clusterHealthConfig.getCheckNameEnum().getConfigName(), + clusterPhyId, + "-1" + ); + checkResult.setPassed(Constant.YES); + resultList.add(checkResult); + } + + return resultList; + } + + private List checkAndGetResult(AbstractHealthCheckService healthCheckService, + ClusterPhyParam clusterPhyParam, + Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); + if (healthCheckResult == null) { + continue; + } + + // 记录 + resultList.add(healthCheckResult); + } + + return resultList; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java new file mode 100644 index 00000000..8badae99 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java @@ -0,0 +1,130 @@ +package com.xiaojukeji.know.streaming.km.task.health; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.topic.HealthCheckTopicService; +import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; +import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +@NoArgsConstructor +@AllArgsConstructor +@Task(name = "TopicHealthCheckTask", + description = "Topic健康检查", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class TopicHealthCheckTask extends AbstractAsyncMetricsDispatchTask { + private static final ILog log = LogFactory.getLog(TopicHealthCheckTask.class); + + @Autowired + private HealthCheckResultService healthCheckResultService; + + @Autowired + private HealthCheckTopicService healthCheckTopicService; + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); + } + + private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + // 获取配置,<配置名,配置信息> + Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); + + // 检查结果 + List resultList = new ArrayList<>(); + + // 遍历Check-Service + List paramList = healthCheckTopicService.getResList(clusterPhy.getId()); + if (ValidateUtils.isEmptyList(paramList)) { + // 当前无该维度的资源,则直接设置为 + resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckTopicService, healthConfigMap)); + } + + // 遍历资源 + for (ClusterPhyParam clusterPhyParam: paramList) { + resultList.addAll(this.checkAndGetResult(healthCheckTopicService, clusterPhyParam, healthConfigMap)); + } + + for (HealthCheckResult checkResult: resultList) { + try { + healthCheckResultService.replace(checkResult); + } catch (Exception e) { + log.error("class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); + } + } + + // 删除10分钟之前的检查结果 + try { + healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); + } catch (Exception e) { + log.error("class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + } + + return TaskResult.SUCCESS; + } + + private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); + if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { + // 类型不匹配 + continue; + } + + // 记录 + HealthCheckResult checkResult = new HealthCheckResult( + dimensionEnum.getDimension(), + clusterHealthConfig.getCheckNameEnum().getConfigName(), + clusterPhyId, + "-1" + ); + checkResult.setPassed(Constant.YES); + resultList.add(checkResult); + } + + return resultList; + } + + private List checkAndGetResult(AbstractHealthCheckService healthCheckService, + ClusterPhyParam clusterPhyParam, + Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); + if (healthCheckResult == null) { + continue; + } + + // 记录 + resultList.add(healthCheckResult); + } + + return resultList; + } +}