[Optimize]健康巡检增加ClusterParam, 从而拆分Kafka和Connect相关的巡检任务

This commit is contained in:
zengqiao
2022-11-10 16:24:39 +08:00
parent e456be91ef
commit 7661826ea5
9 changed files with 50 additions and 33 deletions

View File

@@ -0,0 +1,10 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
/**
* @author wyc
* @date 2022/11/9
*/
public class ClusterParam extends VersionItemParam {
}

View File

@@ -8,6 +8,6 @@ import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ClusterPhyParam extends VersionItemParam {
public class ClusterPhyParam extends ClusterParam {
protected Long clusterPhyId;
}

View File

@@ -4,6 +4,7 @@ import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
@@ -21,15 +22,15 @@ public abstract class AbstractHealthCheckService {
protected static final Map<
String,
Function<Tuple<ClusterPhyParam, BaseClusterHealthConfig>, HealthCheckResult>
Function<Tuple<ClusterParam, BaseClusterHealthConfig>, HealthCheckResult>
> functionMap = new ConcurrentHashMap<>();
public abstract List<ClusterPhyParam> getResList(Long clusterPhyId);
public abstract List<ClusterParam> getResList(Long clusterPhyId);
public abstract HealthCheckDimensionEnum getHealthCheckDimensionEnum();
public HealthCheckResult checkAndGetResult(ClusterPhyParam clusterPhyParam, BaseClusterHealthConfig clusterHealthConfig) {
if (ValidateUtils.anyNull(clusterPhyParam.getClusterPhyId(), clusterPhyParam, clusterHealthConfig)) {
public HealthCheckResult checkAndGetResult(ClusterParam clusterParam, BaseClusterHealthConfig clusterHealthConfig) {
if (ValidateUtils.anyNull( clusterParam, clusterHealthConfig)) {
return null;
}
@@ -39,16 +40,16 @@ public abstract class AbstractHealthCheckService {
return null;
}
Function<Tuple<ClusterPhyParam, BaseClusterHealthConfig>, HealthCheckResult> function = functionMap.get(clusterHealthConfig.getCheckNameEnum().getConfigName());
Function<Tuple<ClusterParam, BaseClusterHealthConfig>, HealthCheckResult> function = functionMap.get(clusterHealthConfig.getCheckNameEnum().getConfigName());
if (function == null) {
return null;
}
try {
return function.apply(new Tuple<>(clusterPhyParam, clusterHealthConfig));
return function.apply(new Tuple<>(clusterParam, clusterHealthConfig));
} catch (Exception e) {
log.error("method=checkAndGetResult||clusterPhyParam={}||clusterHealthConfig={}||errMsg=exception!",
clusterPhyParam, clusterHealthConfig, e);
clusterParam, clusterHealthConfig, e);
}
return null;

View File

@@ -8,6 +8,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.He
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.broker.BrokerParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
@@ -45,8 +46,8 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService {
}
@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
List<ClusterPhyParam> paramList = new ArrayList<>();
public List<ClusterParam> getResList(Long clusterPhyId) {
List<ClusterParam> paramList = new ArrayList<>();
for (Broker broker: brokerService.listAliveBrokersFromDB(clusterPhyId)) {
paramList.add(new BrokerParam(clusterPhyId, broker.getBrokerId()));
}
@@ -61,7 +62,7 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService {
/**
* Broker网络处理线程平均值过低
*/
private HealthCheckResult checkBrokerNetworkProcessorAvgIdleTooLow(Tuple<ClusterPhyParam, BaseClusterHealthConfig> paramTuple) {
private HealthCheckResult checkBrokerNetworkProcessorAvgIdleTooLow(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
BrokerParam param = (BrokerParam) paramTuple.getV1();
HealthCompareValueConfig singleConfig = (HealthCompareValueConfig) paramTuple.getV2();
@@ -96,7 +97,7 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService {
/**
* Broker请求队列满
*/
private HealthCheckResult checkBrokerRequestQueueFull(Tuple<ClusterPhyParam, BaseClusterHealthConfig> paramTuple) {
private HealthCheckResult checkBrokerRequestQueueFull(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
BrokerParam param = (BrokerParam) paramTuple.getV1();
HealthCompareValueConfig singleConfig = (HealthCompareValueConfig) paramTuple.getV2();

View File

@@ -6,6 +6,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.Ba
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
@@ -34,7 +35,7 @@ public class HealthCheckClusterService extends AbstractHealthCheckService {
}
@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
public List<ClusterParam> getResList(Long clusterPhyId) {
return Arrays.asList(new ClusterPhyParam(clusterPhyId));
}
@@ -46,8 +47,8 @@ public class HealthCheckClusterService extends AbstractHealthCheckService {
/**
* 检查NoController
*/
private HealthCheckResult checkClusterNoController(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ClusterPhyParam param = singleConfigSimpleTuple.getV1();
private HealthCheckResult checkClusterNoController(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ClusterPhyParam param =(ClusterPhyParam) singleConfigSimpleTuple.getV1();
HealthCompareValueConfig valueConfig = (HealthCompareValueConfig) singleConfigSimpleTuple.getV2();
Result<ClusterMetrics> clusterMetricsResult = clusterMetricService.getLatestMetricsFromES(param.getClusterPhyId(), Arrays.asList(ClusterMetricVersionItems.CLUSTER_METRIC_ACTIVE_CONTROLLER_COUNT));

View File

@@ -5,6 +5,7 @@ import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthDetectedInLatestMinutesConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.GroupParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
@@ -43,7 +44,7 @@ public class HealthCheckGroupService extends AbstractHealthCheckService {
}
@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
public List<ClusterParam> getResList(Long clusterPhyId) {
return groupService.getGroupsFromDB(clusterPhyId).stream().map(elem -> new GroupParam(clusterPhyId, elem)).collect(Collectors.toList());
}
@@ -55,7 +56,7 @@ public class HealthCheckGroupService extends AbstractHealthCheckService {
/**
* 检查Group re-balance太频繁
*/
private HealthCheckResult checkReBalanceTooFrequently(Tuple<ClusterPhyParam, BaseClusterHealthConfig> paramTuple) {
private HealthCheckResult checkReBalanceTooFrequently(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
GroupParam param = (GroupParam) paramTuple.getV1();
HealthDetectedInLatestMinutesConfig singleConfig = (HealthDetectedInLatestMinutesConfig) paramTuple.getV2();

View File

@@ -6,6 +6,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.Ba
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthDetectedInLatestMinutesConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
@@ -49,8 +50,8 @@ public class HealthCheckTopicService extends AbstractHealthCheckService {
}
@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
List<ClusterPhyParam> paramList = new ArrayList<>();
public List<ClusterParam> getResList(Long clusterPhyId) {
List<ClusterParam> paramList = new ArrayList<>();
for (Topic topic: topicService.listTopicsFromDB(clusterPhyId)) {
paramList.add(new TopicParam(clusterPhyId, topic.getTopicName()));
}
@@ -65,7 +66,7 @@ public class HealthCheckTopicService extends AbstractHealthCheckService {
/**
* 检查Topic长期未同步
*/
private HealthCheckResult checkTopicUnderReplicatedPartition(Tuple<ClusterPhyParam, BaseClusterHealthConfig> paramTuple) {
private HealthCheckResult checkTopicUnderReplicatedPartition(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
TopicParam param = (TopicParam) paramTuple.getV1();
HealthDetectedInLatestMinutesConfig singleConfig = (HealthDetectedInLatestMinutesConfig) paramTuple.getV2();
@@ -97,7 +98,7 @@ public class HealthCheckTopicService extends AbstractHealthCheckService {
/**
* 检查NoLeader
*/
private HealthCheckResult checkTopicNoLeader(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkTopicNoLeader(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
TopicParam param = (TopicParam) singleConfigSimpleTuple.getV1();
List<Partition> partitionList = partitionService.listPartitionFromCacheFirst(param.getClusterPhyId(), param.getTopicName());

View File

@@ -9,6 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.He
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.zookeeper.ZookeeperParam;
@@ -58,7 +59,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService {
}
@Override
public List<ClusterPhyParam> getResList(Long clusterPhyId) {
public List<ClusterParam> getResList(Long clusterPhyId) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null) {
return new ArrayList<>();
@@ -82,7 +83,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService {
return HealthCheckDimensionEnum.ZOOKEEPER;
}
private HealthCheckResult checkBrainSplit(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkBrainSplit(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthCompareValueConfig valueConfig = (HealthCompareValueConfig) singleConfigSimpleTuple.getV2();
@@ -100,7 +101,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService {
return checkResult;
}
private HealthCheckResult checkOutstandingRequests(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkOutstandingRequests(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();
@@ -135,7 +136,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService {
return checkResult;
}
private HealthCheckResult checkWatchCount(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkWatchCount(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();
@@ -171,7 +172,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService {
return checkResult;
}
private HealthCheckResult checkAliveConnections(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkAliveConnections(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();
@@ -207,7 +208,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService {
return checkResult;
}
private HealthCheckResult checkApproximateDataSize(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkApproximateDataSize(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();
@@ -243,7 +244,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService {
return checkResult;
}
private HealthCheckResult checkSentRate(Tuple<ClusterPhyParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
private HealthCheckResult checkSentRate(Tuple<ClusterParam, BaseClusterHealthConfig> singleConfigSimpleTuple) {
ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1();
HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2();

View File

@@ -6,6 +6,7 @@ import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
@@ -41,15 +42,15 @@ public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispat
List<HealthCheckResult> resultList = new ArrayList<>();
// 遍历Check-Service
List<ClusterPhyParam> paramList = this.getCheckService().getResList(clusterPhy.getId());
List<ClusterParam> paramList = this.getCheckService().getResList(clusterPhy.getId());
if (ValidateUtils.isEmptyList(paramList)) {
// 当前无该维度的资源,则直接设置为
resultList.addAll(this.getNoResResult(clusterPhy.getId(), this.getCheckService(), healthConfigMap));
}
// 遍历资源
for (ClusterPhyParam clusterPhyParam: paramList) {
resultList.addAll(this.checkAndGetResult(clusterPhyParam, healthConfigMap));
for (ClusterParam clusterParam: paramList) {
resultList.addAll(this.checkAndGetResult(clusterParam, healthConfigMap));
}
try {
@@ -93,13 +94,13 @@ public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispat
return resultList;
}
private List<HealthCheckResult> checkAndGetResult(ClusterPhyParam clusterPhyParam,
private List<HealthCheckResult> checkAndGetResult(ClusterParam clusterParam,
Map<String, BaseClusterHealthConfig> healthConfigMap) {
List<HealthCheckResult> resultList = new ArrayList<>();
// 进行检查
for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) {
HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterPhyParam, clusterHealthConfig);
HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterParam, clusterHealthConfig);
if (healthCheckResult == null) {
continue;
}