Merge pull request #689 from didi/dev

优化健康检查结果替换时出现死锁问题
This commit is contained in:
EricZeng
2022-10-17 10:26:11 +08:00
committed by GitHub
6 changed files with 38 additions and 6 deletions

View File

@@ -43,6 +43,7 @@ public class Constant {
*/ */
public static final Integer DEFAULT_CLUSTER_HEALTH_SCORE = 90; public static final Integer DEFAULT_CLUSTER_HEALTH_SCORE = 90;
public static final Integer PER_BATCH_MAX_VALUE = 100;
public static final String DEFAULT_USER_NAME = "know-streaming-app"; public static final String DEFAULT_USER_NAME = "know-streaming-app";

View File

@@ -20,4 +20,6 @@ public interface HealthCheckResultService {
List<HealthCheckResultPO> getResHealthCheckResult(Long clusterPhyId, Integer dimension, String resNme); List<HealthCheckResultPO> getResHealthCheckResult(Long clusterPhyId, Integer dimension, String resNme);
Map<String, BaseClusterHealthConfig> getClusterHealthConfig(Long clusterPhyId); Map<String, BaseClusterHealthConfig> getClusterHealthConfig(Long clusterPhyId);
void batchReplace(Long clusterPhyId, List<HealthCheckResult> healthCheckResults);
} }

View File

@@ -3,10 +3,12 @@ package com.xiaojukeji.know.streaming.km.core.service.health.checkresult.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.google.common.collect.Lists;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.po.config.PlatformClusterConfigPO; import com.xiaojukeji.know.streaming.km.common.bean.po.config.PlatformClusterConfigPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigGroupEnum; import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigGroupEnum;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
@@ -88,4 +90,17 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService {
} }
return configMap; return configMap;
} }
@Override
public void batchReplace(Long clusterPhyId, List<HealthCheckResult> healthCheckResults) {
List<List<HealthCheckResult>> healthCheckResultPartitions = Lists.partition(healthCheckResults, Constant.PER_BATCH_MAX_VALUE);
for (List<HealthCheckResult> checkResultPartition : healthCheckResultPartitions) {
List<HealthCheckResultPO> healthCheckResultPos = ConvertUtil.list2List(checkResultPartition, HealthCheckResultPO.class);
try {
healthCheckResultDAO.batchReplace(healthCheckResultPos);
} catch (Exception e) {
log.error("method=batchReplace||clusterPhyId={}||checkResultList={}||errMsg=exception!", clusterPhyId, healthCheckResultPos, e);
}
}
}
} }

View File

@@ -4,7 +4,11 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import java.util.List;
@Repository @Repository
public interface HealthCheckResultDAO extends BaseMapper<HealthCheckResultPO> { public interface HealthCheckResultDAO extends BaseMapper<HealthCheckResultPO> {
int replace(HealthCheckResultPO healthCheckResultPO); int replace(HealthCheckResultPO healthCheckResultPO);
int batchReplace(List<HealthCheckResultPO> healthCheckResultPos);
} }

View File

@@ -20,4 +20,16 @@
VALUES VALUES
(#{dimension}, #{configName}, #{clusterPhyId}, #{resName}, #{passed}) (#{dimension}, #{configName}, #{clusterPhyId}, #{resName}, #{passed})
</insert> </insert>
<insert id="batchReplace">
insert into ks_km_health_check_result (dimension, config_name, cluster_phy_id, res_name, passed)
values
<foreach collection="list" item="item" separator=",">
(#{item.dimension}, #{item.configName}, #{item.clusterPhyId}, #{item.resName}, #{item.passed})
</foreach>
on duplicate key update
dimension = dimension, config_name = config_name, cluster_phy_id = cluster_phy_id,
res_name = res_name, passed = passed
</insert>
</mapper> </mapper>

View File

@@ -52,12 +52,10 @@ public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispat
resultList.addAll(this.checkAndGetResult(clusterPhyParam, healthConfigMap)); resultList.addAll(this.checkAndGetResult(clusterPhyParam, healthConfigMap));
} }
for (HealthCheckResult checkResult: resultList) { try {
try { healthCheckResultService.batchReplace(clusterPhy.getId(), resultList);
healthCheckResultService.replace(checkResult); } catch (Exception e) {
} catch (Exception e) { log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e);
}
} }
// 删除10分钟之前的检查结果 // 删除10分钟之前的检查结果