From 9eadafe8505ad5bebc5acd7af2d1935d8773a85c Mon Sep 17 00:00:00 2001 From: zengqiao Date: Sat, 3 Sep 2022 08:32:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=81=A5=E5=BA=B7=E5=B7=A1=E6=A3=80=E4=BA=A4?= =?UTF-8?q?=E7=94=B1KS=E7=BA=BF=E7=A8=8B=E8=BF=9B=E8=A1=8C=E6=89=A7?= =?UTF-8?q?=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/task/health/HealthCheckTask.java | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java index 0d4c8db9..3e661418 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/HealthCheckTask.java @@ -16,6 +16,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; +import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; @@ -24,11 +25,18 @@ import java.util.*; @NoArgsConstructor @AllArgsConstructor -@Task(name = "HealthCheckTask", description = "健康检查", cron = "0 0/1 * * * ? *", - autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) +@Task(name = "HealthCheckTask", + description = "健康检查", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) public class HealthCheckTask extends AbstractClusterPhyDispatchTask { private static final ILog log = LogFactory.getLog(HealthCheckTask.class); + @Autowired + private TaskThreadPoolService taskThreadPoolService; + @Autowired private HealthCheckResultService healthCheckResultService; @@ -38,6 +46,16 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask { @Override public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + taskThreadPoolService.submitHeavenTask( + String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()), + 100000, + () -> this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs) + ); + + return TaskResult.SUCCESS; + } + + private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) { // 获取配置,<配置名,配置信息> Map healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); @@ -73,8 +91,6 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask { } catch (Exception e) { log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); } - - return TaskResult.SUCCESS; } private List getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map healthConfigMap) {