健康巡检交由KS线程进行执行

This commit is contained in:
zengqiao
2022-09-03 08:32:33 +08:00
parent dab3eefcc0
commit 9eadafe850

View File

@@ -16,6 +16,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask; import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -24,11 +25,18 @@ import java.util.*;
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@Task(name = "HealthCheckTask", description = "健康检查", cron = "0 0/1 * * * ? *", @Task(name = "HealthCheckTask",
autoRegister = true, consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) description = "健康检查",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class HealthCheckTask extends AbstractClusterPhyDispatchTask { public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
private static final ILog log = LogFactory.getLog(HealthCheckTask.class); private static final ILog log = LogFactory.getLog(HealthCheckTask.class);
@Autowired
private TaskThreadPoolService taskThreadPoolService;
@Autowired @Autowired
private HealthCheckResultService healthCheckResultService; private HealthCheckResultService healthCheckResultService;
@@ -38,6 +46,16 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
@Override @Override
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
taskThreadPoolService.submitHeavenTask(
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
100000,
() -> this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs)
);
return TaskResult.SUCCESS;
}
private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
// 获取配置,<配置名,配置信息> // 获取配置,<配置名,配置信息>
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId()); Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
@@ -73,8 +91,6 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
} catch (Exception e) { } catch (Exception e) {
log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
} }
return TaskResult.SUCCESS;
} }
private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) { private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {