From 63fbe728c45bf976e11999a56c33f85539107634 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 12 Oct 2022 11:11:25 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ZK=E6=8C=87=E6=A0=87?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5=E6=99=AE=E7=BD=97=E7=B1=B3=E4=BF=AE=E6=96=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../component/AbstractMonitorSinkService.java | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java index b4fd1986..b2ca9283 100644 --- a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java +++ b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java @@ -37,29 +37,32 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< @Override public void onApplicationEvent(BaseMetricEvent event) { executor.execute( () -> { - if(event instanceof BrokerMetricEvent){ + if (event instanceof BrokerMetricEvent) { BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event; sinkMetrics(brokerMetric2SinkPoint(brokerMetricEvent.getBrokerMetrics())); - }else if(event instanceof ClusterMetricEvent){ + } else if(event instanceof ClusterMetricEvent) { ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event; sinkMetrics(clusterMetric2SinkPoint(clusterMetricEvent.getClusterMetrics())); - }else if(event instanceof TopicMetricEvent){ + } else if(event instanceof TopicMetricEvent) { TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event; sinkMetrics(topicMetric2SinkPoint(topicMetricEvent.getTopicMetrics())); - }else if(event instanceof PartitionMetricEvent){ + } else if(event instanceof PartitionMetricEvent) { PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event; sinkMetrics(partitionMetric2SinkPoint(partitionMetricEvent.getPartitionMetrics())); - }else if(event instanceof GroupMetricEvent){ + } else if(event instanceof GroupMetricEvent) { GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event; sinkMetrics(groupMetric2SinkPoint(groupMetricEvent.getGroupMetrics())); - }else if(event instanceof ReplicaMetricEvent){ + } else if(event instanceof ReplicaMetricEvent) { ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event; sinkMetrics(replicationMetric2SinkPoint(replicaMetricEvent.getReplicationMetrics())); + } else if(event instanceof ZookeeperMetricEvent) { + ZookeeperMetricEvent zookeeperMetricEvent = (ZookeeperMetricEvent)event; + sinkMetrics(zookeeperMetric2SinkPoint(zookeeperMetricEvent.getZookeeperMetrics())); } } ); } @@ -72,6 +75,7 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< public abstract Boolean sinkMetrics(List pointList); /**************************************************** private method ****************************************************/ + private List brokerMetric2SinkPoint(List brokerMetrics){ List pointList = new ArrayList<>(); @@ -161,8 +165,23 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< return pointList; } - private List genSinkPoint(String metricPre, Map metrics, - long timeStamp, Map tagsMap){ + private List zookeeperMetric2SinkPoint(List zookeeperMetricsList){ + List pointList = new ArrayList<>(); + + for(ZookeeperMetrics z : zookeeperMetricsList){ + Map tagsMap = new HashMap<>(); + tagsMap.put(CLUSTER_ID.getName(), z.getClusterPhyId()); + + pointList.addAll(genSinkPoint("Zookeeper", z.getMetrics(), z.getTimestamp(), tagsMap)); + } + + return pointList; + } + + private List genSinkPoint(String metricPre, + Map metrics, + long timeStamp, + Map tagsMap) { List pointList = new ArrayList<>(); for(String metricName : metrics.keySet()){ From 177bb80f3120e8e2c5dccd8a2e16a0bca83c84e2 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 12 Oct 2022 16:36:04 +0800 Subject: [PATCH 2/6] =?UTF-8?q?application.yml=E6=96=87=E4=BB=B6=E4=B8=AD?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0ES=E7=94=A8=E6=88=B7=E5=90=8D=E5=AF=86?= =?UTF-8?q?=E7=A0=81=E7=9A=84=E9=85=8D=E7=BD=AE=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- km-rest/src/main/resources/application.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index 08cac4af..4a4b7f1c 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -84,7 +84,8 @@ client-pool: es: client: address: 127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061 - client-cnt: 10 + pass: # ES账号密码,如果有账号密码,按照 username:password 的格式填写,没有则不需要填写 + client-cnt: 10 # 创建的ES客户端数 io-thread-cnt: 2 max-retry-cnt: 5 From 2b600e96eba64a95cd136afc24fcaa3d7cb09a8d Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 12 Oct 2022 16:41:27 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E5=81=A5=E5=BA=B7=E6=A3=80=E6=9F=A5?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task/health/AbstractHealthCheckTask.java | 115 ++++++++++++++++++ .../km/task/health/BrokerHealthCheckTask.java | 109 +---------------- .../task/health/ClusterHealthCheckTask.java | 109 +---------------- .../km/task/health/GroupHealthCheckTask.java | 107 +--------------- .../km/task/health/TopicHealthCheckTask.java | 108 +--------------- 5 files changed, 127 insertions(+), 421 deletions(-) create mode 100644 km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java new file mode 100644 index 00000000..1f6c83ff --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java @@ -0,0 +1,115 @@ +package com.xiaojukeji.know.streaming.km.task.health; + +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; +import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispatchTask { + private static final ILog log = LogFactory.getLog(AbstractHealthCheckTask.class); + + @Autowired + private HealthCheckResultService healthCheckResultService; + + public abstract AbstractHealthCheckService getCheckService(); + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); + } + + private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + // 获取配置,<配置名,配置信息> + Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); + + // 检查结果 + List resultList = new ArrayList<>(); + + // 遍历Check-Service + List paramList = this.getCheckService().getResList(clusterPhy.getId()); + if (ValidateUtils.isEmptyList(paramList)) { + // 当前无该维度的资源,则直接设置为 + resultList.addAll(this.getNoResResult(clusterPhy.getId(), this.getCheckService(), healthConfigMap)); + } + + // 遍历资源 + for (ClusterPhyParam clusterPhyParam: paramList) { + resultList.addAll(this.checkAndGetResult(clusterPhyParam, healthConfigMap)); + } + + for (HealthCheckResult checkResult: resultList) { + try { + healthCheckResultService.replace(checkResult); + } catch (Exception e) { + log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); + } + } + + // 删除10分钟之前的检查结果 + try { + healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); + } catch (Exception e) { + log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + } + + return TaskResult.SUCCESS; + } + + private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); + if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { + // 类型不匹配 + continue; + } + + // 记录 + HealthCheckResult checkResult = new HealthCheckResult( + dimensionEnum.getDimension(), + clusterHealthConfig.getCheckNameEnum().getConfigName(), + clusterPhyId, + "-1" + ); + checkResult.setPassed(Constant.YES); + resultList.add(checkResult); + } + + return resultList; + } + + private List checkAndGetResult(ClusterPhyParam clusterPhyParam, + Map healthConfigMap) { + List resultList = new ArrayList<>(); + + // 进行检查 + for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { + HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterPhyParam, clusterHealthConfig); + if (healthCheckResult == null) { + continue; + } + + // 记录 + resultList.add(healthCheckResult); + } + + return resultList; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java index 7b611823..ef02be8e 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/BrokerHealthCheckTask.java @@ -1,30 +1,13 @@ package com.xiaojukeji.know.streaming.km.task.health; import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.broker.HealthCheckBrokerService; -import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; -import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; - @NoArgsConstructor @AllArgsConstructor @Task(name = "BrokerHealthCheckTask", @@ -33,98 +16,12 @@ import java.util.Map; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class BrokerHealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(BrokerHealthCheckTask.class); - - @Autowired - private HealthCheckResultService healthCheckResultService; - +public class BrokerHealthCheckTask extends AbstractHealthCheckTask { @Autowired private HealthCheckBrokerService healthCheckBrokerService; @Override - public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); - } - - private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - // 获取配置,<配置名,配置信息> - Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); - - // 检查结果 - List resultList = new ArrayList<>(); - - // 遍历Check-Service - List paramList = healthCheckBrokerService.getResList(clusterPhy.getId()); - if (ValidateUtils.isEmptyList(paramList)) { - // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckBrokerService, healthConfigMap)); - } - - // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(healthCheckBrokerService, clusterPhyParam, healthConfigMap)); - } - - for (HealthCheckResult checkResult: resultList) { - try { - healthCheckResultService.replace(checkResult); - } catch (Exception e) { - log.error("class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); - } - } - - // 删除10分钟之前的检查结果 - try { - healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); - } catch (Exception e) { - log.error("class=BrokerHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); - } - - return TaskResult.SUCCESS; - } - - private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); - if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { - // 类型不匹配 - continue; - } - - // 记录 - HealthCheckResult checkResult = new HealthCheckResult( - dimensionEnum.getDimension(), - clusterHealthConfig.getCheckNameEnum().getConfigName(), - clusterPhyId, - "-1" - ); - checkResult.setPassed(Constant.YES); - resultList.add(checkResult); - } - - return resultList; - } - - private List checkAndGetResult(AbstractHealthCheckService healthCheckService, - ClusterPhyParam clusterPhyParam, - Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); - if (healthCheckResult == null) { - continue; - } - - // 记录 - resultList.add(healthCheckResult); - } - - return resultList; + public AbstractHealthCheckService getCheckService() { + return healthCheckBrokerService; } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java index cb7f78b2..43c16cb8 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/ClusterHealthCheckTask.java @@ -1,30 +1,13 @@ package com.xiaojukeji.know.streaming.km.task.health; import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.cluster.HealthCheckClusterService; -import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; -import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; - @NoArgsConstructor @AllArgsConstructor @Task(name = "ClusterHealthCheckTask", @@ -33,98 +16,12 @@ import java.util.Map; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class ClusterHealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(ClusterHealthCheckTask.class); - - @Autowired - private HealthCheckResultService healthCheckResultService; - +public class ClusterHealthCheckTask extends AbstractHealthCheckTask { @Autowired private HealthCheckClusterService healthCheckClusterService; @Override - public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); - } - - private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - // 获取配置,<配置名,配置信息> - Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); - - // 检查结果 - List resultList = new ArrayList<>(); - - // 遍历Check-Service - List paramList = healthCheckClusterService.getResList(clusterPhy.getId()); - if (ValidateUtils.isEmptyList(paramList)) { - // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckClusterService, healthConfigMap)); - } - - // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(healthCheckClusterService, clusterPhyParam, healthConfigMap)); - } - - for (HealthCheckResult checkResult: resultList) { - try { - healthCheckResultService.replace(checkResult); - } catch (Exception e) { - log.error("class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); - } - } - - // 删除10分钟之前的检查结果 - try { - healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); - } catch (Exception e) { - log.error("class=ClusterHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); - } - - return TaskResult.SUCCESS; - } - - private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); - if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { - // 类型不匹配 - continue; - } - - // 记录 - HealthCheckResult checkResult = new HealthCheckResult( - dimensionEnum.getDimension(), - clusterHealthConfig.getCheckNameEnum().getConfigName(), - clusterPhyId, - "-1" - ); - checkResult.setPassed(Constant.YES); - resultList.add(checkResult); - } - - return resultList; - } - - private List checkAndGetResult(AbstractHealthCheckService healthCheckService, - ClusterPhyParam clusterPhyParam, - Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); - if (healthCheckResult == null) { - continue; - } - - // 记录 - resultList.add(healthCheckResult); - } - - return resultList; + public AbstractHealthCheckService getCheckService() { + return healthCheckClusterService; } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java index 581a679a..d24f981d 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/GroupHealthCheckTask.java @@ -1,29 +1,13 @@ package com.xiaojukeji.know.streaming.km.task.health; import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.group.HealthCheckGroupService; -import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; -import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; @NoArgsConstructor @AllArgsConstructor @@ -33,98 +17,13 @@ import java.util.Map; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class GroupHealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(GroupHealthCheckTask.class); - - @Autowired - private HealthCheckResultService healthCheckResultService; +public class GroupHealthCheckTask extends AbstractHealthCheckTask { @Autowired private HealthCheckGroupService healthCheckGroupService; @Override - public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); - } - - private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - // 获取配置,<配置名,配置信息> - Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); - - // 检查结果 - List resultList = new ArrayList<>(); - - // 遍历Check-Service - List paramList = healthCheckGroupService.getResList(clusterPhy.getId()); - if (ValidateUtils.isEmptyList(paramList)) { - // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckGroupService, healthConfigMap)); - } - - // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(healthCheckGroupService, clusterPhyParam, healthConfigMap)); - } - - for (HealthCheckResult checkResult: resultList) { - try { - healthCheckResultService.replace(checkResult); - } catch (Exception e) { - log.error("class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); - } - } - - // 删除10分钟之前的检查结果 - try { - healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); - } catch (Exception e) { - log.error("class=GroupHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); - } - - return TaskResult.SUCCESS; - } - - private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); - if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { - // 类型不匹配 - continue; - } - - // 记录 - HealthCheckResult checkResult = new HealthCheckResult( - dimensionEnum.getDimension(), - clusterHealthConfig.getCheckNameEnum().getConfigName(), - clusterPhyId, - "-1" - ); - checkResult.setPassed(Constant.YES); - resultList.add(checkResult); - } - - return resultList; - } - - private List checkAndGetResult(AbstractHealthCheckService healthCheckService, - ClusterPhyParam clusterPhyParam, - Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); - if (healthCheckResult == null) { - continue; - } - - // 记录 - resultList.add(healthCheckResult); - } - - return resultList; + public AbstractHealthCheckService getCheckService() { + return healthCheckGroupService; } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java index 8badae99..25a1e531 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/TopicHealthCheckTask.java @@ -1,30 +1,13 @@ package com.xiaojukeji.know.streaming.km.task.health; import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.topic.HealthCheckTopicService; -import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; -import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; - @NoArgsConstructor @AllArgsConstructor @Task(name = "TopicHealthCheckTask", @@ -33,98 +16,13 @@ import java.util.Map; autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) -public class TopicHealthCheckTask extends AbstractAsyncMetricsDispatchTask { - private static final ILog log = LogFactory.getLog(TopicHealthCheckTask.class); - - @Autowired - private HealthCheckResultService healthCheckResultService; +public class TopicHealthCheckTask extends AbstractHealthCheckTask { @Autowired private HealthCheckTopicService healthCheckTopicService; @Override - public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs); - } - - private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - // 获取配置,<配置名,配置信息> - Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); - - // 检查结果 - List resultList = new ArrayList<>(); - - // 遍历Check-Service - List paramList = healthCheckTopicService.getResList(clusterPhy.getId()); - if (ValidateUtils.isEmptyList(paramList)) { - // 当前无该维度的资源,则直接设置为 - resultList.addAll(this.getNoResResult(clusterPhy.getId(), healthCheckTopicService, healthConfigMap)); - } - - // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(healthCheckTopicService, clusterPhyParam, healthConfigMap)); - } - - for (HealthCheckResult checkResult: resultList) { - try { - healthCheckResultService.replace(checkResult); - } catch (Exception e) { - log.error("class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); - } - } - - // 删除10分钟之前的检查结果 - try { - healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 10 * 60 * 1000)); - } catch (Exception e) { - log.error("class=TopicHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); - } - - return TaskResult.SUCCESS; - } - - private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckDimensionEnum dimensionEnum = healthCheckService.getHealthCheckDimensionEnum(); - if (!clusterHealthConfig.getCheckNameEnum().getDimensionEnum().equals(dimensionEnum)) { - // 类型不匹配 - continue; - } - - // 记录 - HealthCheckResult checkResult = new HealthCheckResult( - dimensionEnum.getDimension(), - clusterHealthConfig.getCheckNameEnum().getConfigName(), - clusterPhyId, - "-1" - ); - checkResult.setPassed(Constant.YES); - resultList.add(checkResult); - } - - return resultList; - } - - private List checkAndGetResult(AbstractHealthCheckService healthCheckService, - ClusterPhyParam clusterPhyParam, - Map healthConfigMap) { - List resultList = new ArrayList<>(); - - // 进行检查 - for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckResult healthCheckResult = healthCheckService.checkAndGetResult(clusterPhyParam, clusterHealthConfig); - if (healthCheckResult == null) { - continue; - } - - // 记录 - resultList.add(healthCheckResult); - } - - return resultList; + public AbstractHealthCheckService getCheckService() { + return healthCheckTopicService; } } From a6dcbcd35b1f30ccbd3ce55fe4171f7d97ababc6 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 12 Oct 2022 16:43:16 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=9C=AA=E8=A2=AB?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E7=9A=84import?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../know/streaming/km/rest/api/v3/version/VersionController.java | 1 - 1 file changed, 1 deletion(-) diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/version/VersionController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/version/VersionController.java index 52cc4807..f8e00430 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/version/VersionController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/version/VersionController.java @@ -15,7 +15,6 @@ import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; -import javax.validation.Valid; import java.util.List; import java.util.Map; import java.util.SortedMap; From 442f34278c531efb1b5391387c40f13cdbb2c722 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 12 Oct 2022 16:44:07 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E6=8C=87=E6=A0=87=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E5=A2=9E=E5=8A=A0=E8=BF=94=E5=9B=9EZK?= =?UTF-8?q?=E7=9A=84=E6=8C=87=E6=A0=87=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/biz/version/impl/VersionControlManagerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java index 52a91520..0460b601 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java @@ -14,7 +14,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; import com.xiaojukeji.know.streaming.km.common.bean.vo.config.metric.UserMetricConfigVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.version.VersionItemVO; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.VersionUtil; @@ -108,6 +107,7 @@ public class VersionControlManagerImpl implements VersionControlManager { allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_BROKER.getCode()), VersionItemVO.class)); allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_PARTITION.getCode()), VersionItemVO.class)); allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_REPLICATION.getCode()), VersionItemVO.class)); + allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(METRIC_ZOOKEEPER.getCode()), VersionItemVO.class)); allVersionItemVO.addAll(ConvertUtil.list2List(versionControlService.listVersionControlItem(WEB_OP.getCode()), VersionItemVO.class)); Map map = allVersionItemVO.stream().collect( From dc1899a1cdc33d74e8be2765f06e7c0164400763 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 12 Oct 2022 16:45:47 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=9B=86=E7=BE=A4ZK?= =?UTF-8?q?=E5=88=97=E8=A1=A8=E4=B8=AD=E7=BC=BA=E5=B0=91=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E7=8A=B6=E6=80=81=E5=AD=97=E6=AE=B5=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java index 960b5d5a..477b3bf4 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java @@ -14,6 +14,9 @@ public class ClusterZookeepersOverviewVO { @ApiModelProperty(value = "主机ip", example = "121.0.0.1") private String host; + @ApiModelProperty(value = "主机存活状态,1:Live,0:Down", example = "1") + private Integer status; + @ApiModelProperty(value = "端口号", example = "2416") private Integer port;