diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index a91f0809..639ad0f3 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -43,6 +43,7 @@ public class Constant { */ public static final Integer DEFAULT_CLUSTER_HEALTH_SCORE = 90; + public static final Integer PER_BATCH_MAX_VALUE = 100; public static final String DEFAULT_USER_NAME = "know-streaming-app"; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java index 19cb292a..6aaddcdb 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java @@ -20,4 +20,6 @@ public interface HealthCheckResultService { List getResHealthCheckResult(Long clusterPhyId, Integer dimension, String resNme); Map getClusterHealthConfig(Long clusterPhyId); + + void batchReplace(Long clusterPhyId, List healthCheckResults); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java index 09c30a1b..cad2f396 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java @@ -3,10 +3,12 @@ package com.xiaojukeji.know.streaming.km.core.service.health.checkresult.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.google.common.collect.Lists; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.po.config.PlatformClusterConfigPO; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigGroupEnum; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; @@ -88,4 +90,17 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService { } return configMap; } + + @Override + public void batchReplace(Long clusterPhyId, List healthCheckResults) { + List> healthCheckResultPartitions = Lists.partition(healthCheckResults, Constant.PER_BATCH_MAX_VALUE); + for (List checkResultPartition : healthCheckResultPartitions) { + List healthCheckResultPos = ConvertUtil.list2List(checkResultPartition, HealthCheckResultPO.class); + try { + healthCheckResultDAO.batchReplace(healthCheckResultPos); + } catch (Exception e) { + log.error("method=batchReplace||clusterPhyId={}||checkResultList={}||errMsg=exception!", clusterPhyId, healthCheckResultPos, e); + } + } + } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/health/HealthCheckResultDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/health/HealthCheckResultDAO.java index 225437d7..64830fd1 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/health/HealthCheckResultDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/health/HealthCheckResultDAO.java @@ -4,7 +4,11 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; import org.springframework.stereotype.Repository; +import java.util.List; + @Repository public interface HealthCheckResultDAO extends BaseMapper { int replace(HealthCheckResultPO healthCheckResultPO); + + int batchReplace(List healthCheckResultPos); } diff --git a/km-persistence/src/main/resources/mybatis/HealthCheckResultMapper.xml b/km-persistence/src/main/resources/mybatis/HealthCheckResultMapper.xml index bc1bf8ca..076b7522 100644 --- a/km-persistence/src/main/resources/mybatis/HealthCheckResultMapper.xml +++ b/km-persistence/src/main/resources/mybatis/HealthCheckResultMapper.xml @@ -20,4 +20,16 @@ VALUES (#{dimension}, #{configName}, #{clusterPhyId}, #{resName}, #{passed}) + + + insert into ks_km_health_check_result (dimension, config_name, cluster_phy_id, res_name, passed) + values + + (#{item.dimension}, #{item.configName}, #{item.clusterPhyId}, #{item.resName}, #{item.passed}) + + on duplicate key update + dimension = dimension, config_name = config_name, cluster_phy_id = cluster_phy_id, + res_name = res_name, passed = passed + + diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java index 1f6c83ff..4d614881 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/health/AbstractHealthCheckTask.java @@ -52,12 +52,10 @@ public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispat resultList.addAll(this.checkAndGetResult(clusterPhyParam, healthConfigMap)); } - for (HealthCheckResult checkResult: resultList) { - try { - healthCheckResultService.replace(checkResult); - } catch (Exception e) { - log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||checkResult={}||errMsg=exception!", clusterPhy.getId(), checkResult, e); - } + try { + healthCheckResultService.batchReplace(clusterPhy.getId(), resultList); + } catch (Exception e) { + log.error("class=AbstractHealthCheckTask||method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); } // 删除10分钟之前的检查结果