mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-07 06:02:07 +08:00
[Bugfix]修复更新健康巡检结果时出现死锁的问题(#728)
This commit is contained in:
@@ -3,13 +3,11 @@ package com.xiaojukeji.know.streaming.km.core.service.health.checkresult.impl;
|
|||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.didiglobal.logi.log.ILog;
|
import com.didiglobal.logi.log.ILog;
|
||||||
import com.didiglobal.logi.log.LogFactory;
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAggResult;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAggResult;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.config.PlatformClusterConfigPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.config.PlatformClusterConfigPO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigGroupEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigGroupEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
|
||||||
@@ -19,6 +17,7 @@ import com.xiaojukeji.know.streaming.km.core.service.config.PlatformClusterConfi
|
|||||||
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
||||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.health.HealthCheckResultDAO;
|
import com.xiaojukeji.know.streaming.km.persistence.mysql.health.HealthCheckResultDAO;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.dao.DuplicateKeyException;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
@@ -125,15 +124,37 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void batchReplace(Long clusterPhyId, Integer dimension, List<HealthCheckResult> healthCheckResults) {
|
public void batchReplace(Long clusterPhyId, Integer dimension, List<HealthCheckResult> healthCheckResults) {
|
||||||
List<List<HealthCheckResult>> healthCheckResultPartitions = Lists.partition(healthCheckResults, Constant.PER_BATCH_MAX_VALUE);
|
List<HealthCheckResultPO> inDBList = this.listCheckResult(clusterPhyId, dimension);
|
||||||
for (List<HealthCheckResult> checkResultPartition : healthCheckResultPartitions) {
|
|
||||||
List<HealthCheckResultPO> healthCheckResultPos = ConvertUtil.list2List(checkResultPartition, HealthCheckResultPO.class);
|
// list 转 map
|
||||||
|
Map<String, HealthCheckResultPO> inDBMap = new HashMap<>(inDBList.size());
|
||||||
|
inDBList.forEach(elem -> inDBMap.put(elem.getConfigName() + elem.getResName(), elem));
|
||||||
|
|
||||||
|
for (HealthCheckResult checkResult: healthCheckResults) {
|
||||||
|
HealthCheckResultPO inDB = inDBMap.remove(checkResult.getConfigName() + checkResult.getResName());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
healthCheckResultDAO.batchReplace(healthCheckResultPos);
|
HealthCheckResultPO newPO = ConvertUtil.obj2Obj(checkResult, HealthCheckResultPO.class);
|
||||||
} catch (Exception e) {
|
if (inDB == null) {
|
||||||
LOGGER.error("method=batchReplace||clusterPhyId={}||checkResultList={}||errMsg=exception!", clusterPhyId, healthCheckResultPos, e);
|
healthCheckResultDAO.insert(newPO);
|
||||||
|
} else {
|
||||||
|
newPO.setId(inDB.getId());
|
||||||
|
newPO.setUpdateTime(new Date());
|
||||||
|
healthCheckResultDAO.updateById(newPO);
|
||||||
|
}
|
||||||
|
} catch (DuplicateKeyException dke) {
|
||||||
|
// ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inDBMap.values().forEach(elem -> {
|
||||||
|
if (System.currentTimeMillis() - elem.getUpdateTime().getTime() <= 1200000) {
|
||||||
|
// 20分钟之内的数据,不进行删除
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
healthCheckResultDAO.deleteById(elem.getId());
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<HealthCheckAggResult> convert2HealthCheckAggResultList(List<HealthCheckResultPO> poList, Integer dimensionCode) {
|
private List<HealthCheckAggResult> convert2HealthCheckAggResultList(List<HealthCheckResultPO> poList, Integer dimensionCode) {
|
||||||
|
|||||||
Reference in New Issue
Block a user