mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Optimize]优化健康巡检相关指标的计算(#726)
1、增加缓存,减少健康状态指标计算时的IO; 2、健康巡检调整为按照资源维度并发处理; 3、明确HealthCheckResultService和HealthStateService的功能边界;
This commit is contained in:
@@ -3,30 +3,31 @@ package com.xiaojukeji.know.streaming.km.task.kafka.health;
|
||||
import com.didiglobal.logi.job.common.TaskResult;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.collector.service.CollectThreadPoolService;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
||||
import com.xiaojukeji.know.streaming.km.task.kafka.metrics.AbstractAsyncMetricsDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(AbstractHealthCheckTask.class);
|
||||
private static final ILog LOGGER = LogFactory.getLog(AbstractHealthCheckTask.class);
|
||||
|
||||
@Autowired
|
||||
private HealthCheckResultService healthCheckResultService;
|
||||
|
||||
@Autowired
|
||||
private CollectThreadPoolService collectThreadPoolService;
|
||||
|
||||
public abstract AbstractHealthCheckService getCheckService();
|
||||
|
||||
@Override
|
||||
@@ -38,32 +39,37 @@ public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispat
|
||||
// 获取配置,<配置名,配置信息>
|
||||
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
|
||||
|
||||
// 检查结果
|
||||
List<HealthCheckResult> resultList = new ArrayList<>();
|
||||
|
||||
// 遍历Check-Service
|
||||
// 获取资源列表
|
||||
List<ClusterParam> paramList = this.getCheckService().getResList(clusterPhy.getId());
|
||||
|
||||
// 检查结果
|
||||
List<HealthCheckResult> checkResultList = Collections.synchronizedList(new ArrayList<>());
|
||||
if (ValidateUtils.isEmptyList(paramList)) {
|
||||
// 当前无该维度的资源,则直接设置为
|
||||
resultList.addAll(this.getNoResResult(clusterPhy.getId(), this.getCheckService(), healthConfigMap));
|
||||
checkResultList.addAll(this.getNoResResult(clusterPhy.getId(), this.getCheckService(), healthConfigMap));
|
||||
}
|
||||
|
||||
// 获取合适的线程池
|
||||
FutureWaitUtil<Void> futureWaitUtil = collectThreadPoolService.selectSuitableFutureUtil(clusterPhy.getId() * 100 + this.getCheckService().getHealthCheckDimensionEnum().getDimension());
|
||||
|
||||
// 遍历资源
|
||||
for (ClusterParam clusterParam: paramList) {
|
||||
resultList.addAll(this.checkAndGetResult(clusterParam, healthConfigMap));
|
||||
futureWaitUtil.runnableTask(
|
||||
String.format("class=%s||method=calAndUpdateHealthCheckResult||clusterId=%d", this.getCheckService().getClass().getSimpleName(), clusterPhy.getId()),
|
||||
30000,
|
||||
() -> checkResultList.addAll(this.checkAndGetResult(clusterParam, healthConfigMap))
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
healthCheckResultService.batchReplace(clusterPhy.getId(), resultList);
|
||||
} catch (Exception e) {
|
||||
log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
|
||||
}
|
||||
futureWaitUtil.waitExecute(30000);
|
||||
|
||||
// 删除10分钟之前的检查结果
|
||||
try {
|
||||
healthCheckResultService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 20 * 60 * 1000));
|
||||
healthCheckResultService.batchReplace(clusterPhy.getId(), this.getCheckService().getHealthCheckDimensionEnum().getDimension(), checkResultList);
|
||||
} catch (Exception e) {
|
||||
log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
|
||||
LOGGER.error(
|
||||
"extendClass={}||method=calAndUpdateHealthCheckResult||clusterPhyId={}||errMsg=exception!",
|
||||
this.getCheckService().getClass().getSimpleName(), clusterPhy.getId(), e
|
||||
);
|
||||
}
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
|
||||
@@ -12,7 +12,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
@AllArgsConstructor
|
||||
@Task(name = "TopicHealthCheckTask",
|
||||
description = "Topic健康检查",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
cron = "30 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
|
||||
Reference in New Issue
Block a user