From 7661826ea5e2a28152a605b1d3826784ecddc1fa Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 10 Nov 2022 16:24:39 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=E5=81=A5=E5=BA=B7=E5=B7=A1=E6=A3=80?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0ClusterParam,=20=E4=BB=8E=E8=80=8C=E6=8B=86?= =?UTF-8?q?=E5=88=86Kafka=E5=92=8CConnect=E7=9B=B8=E5=85=B3=E7=9A=84?= =?UTF-8?q?=E5=B7=A1=E6=A3=80=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bean/entity/param/cluster/ClusterParam.java | 10 ++++++++++ .../entity/param/cluster/ClusterPhyParam.java | 2 +- .../checker/AbstractHealthCheckService.java | 15 ++++++++------- .../checker/broker/HealthCheckBrokerService.java | 9 +++++---- .../cluster/HealthCheckClusterService.java | 7 ++++--- .../checker/group/HealthCheckGroupService.java | 5 +++-- .../checker/topic/HealthCheckTopicService.java | 9 +++++---- .../zookeeper/HealthCheckZookeeperService.java | 15 ++++++++------- .../kafka/health/AbstractHealthCheckTask.java | 11 ++++++----- 9 files changed, 50 insertions(+), 33 deletions(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/cluster/ClusterParam.java diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/cluster/ClusterParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/cluster/ClusterParam.java new file mode 100644 index 00000000..95269065 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/cluster/ClusterParam.java @@ -0,0 +1,10 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; + +/** + * @author wyc + * @date 2022/11/9 + */ +public class ClusterParam extends VersionItemParam { +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/cluster/ClusterPhyParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/cluster/ClusterPhyParam.java index d55ceab5..9b01fa84 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/cluster/ClusterPhyParam.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/cluster/ClusterPhyParam.java @@ -8,6 +8,6 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor -public class ClusterPhyParam extends VersionItemParam { +public class ClusterPhyParam extends ClusterParam { protected Long clusterPhyId; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/AbstractHealthCheckService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/AbstractHealthCheckService.java index 4b3e011c..b25a7b8c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/AbstractHealthCheckService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/AbstractHealthCheckService.java @@ -4,6 +4,7 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; @@ -21,15 +22,15 @@ public abstract class AbstractHealthCheckService { protected static final Map< String, - Function, HealthCheckResult> + Function, HealthCheckResult> > functionMap = new ConcurrentHashMap<>(); - public abstract List getResList(Long clusterPhyId); + public abstract List getResList(Long clusterPhyId); public abstract HealthCheckDimensionEnum getHealthCheckDimensionEnum(); - public HealthCheckResult checkAndGetResult(ClusterPhyParam clusterPhyParam, BaseClusterHealthConfig clusterHealthConfig) { - if (ValidateUtils.anyNull(clusterPhyParam.getClusterPhyId(), clusterPhyParam, clusterHealthConfig)) { + public HealthCheckResult checkAndGetResult(ClusterParam clusterParam, BaseClusterHealthConfig clusterHealthConfig) { + if (ValidateUtils.anyNull( clusterParam, clusterHealthConfig)) { return null; } @@ -39,16 +40,16 @@ public abstract class AbstractHealthCheckService { return null; } - Function, HealthCheckResult> function = functionMap.get(clusterHealthConfig.getCheckNameEnum().getConfigName()); + Function, HealthCheckResult> function = functionMap.get(clusterHealthConfig.getCheckNameEnum().getConfigName()); if (function == null) { return null; } try { - return function.apply(new Tuple<>(clusterPhyParam, clusterHealthConfig)); + return function.apply(new Tuple<>(clusterParam, clusterHealthConfig)); } catch (Exception e) { log.error("method=checkAndGetResult||clusterPhyParam={}||clusterHealthConfig={}||errMsg=exception!", - clusterPhyParam, clusterHealthConfig, e); + clusterParam, clusterHealthConfig, e); } return null; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/broker/HealthCheckBrokerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/broker/HealthCheckBrokerService.java index 8e0792d9..c9b173af 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/broker/HealthCheckBrokerService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/broker/HealthCheckBrokerService.java @@ -8,6 +8,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.He import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.broker.BrokerParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.constant.Constant; @@ -45,8 +46,8 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService { } @Override - public List getResList(Long clusterPhyId) { - List paramList = new ArrayList<>(); + public List getResList(Long clusterPhyId) { + List paramList = new ArrayList<>(); for (Broker broker: brokerService.listAliveBrokersFromDB(clusterPhyId)) { paramList.add(new BrokerParam(clusterPhyId, broker.getBrokerId())); } @@ -61,7 +62,7 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService { /** * Broker网络处理线程平均值过低 */ - private HealthCheckResult checkBrokerNetworkProcessorAvgIdleTooLow(Tuple paramTuple) { + private HealthCheckResult checkBrokerNetworkProcessorAvgIdleTooLow(Tuple paramTuple) { BrokerParam param = (BrokerParam) paramTuple.getV1(); HealthCompareValueConfig singleConfig = (HealthCompareValueConfig) paramTuple.getV2(); @@ -96,7 +97,7 @@ public class HealthCheckBrokerService extends AbstractHealthCheckService { /** * Broker请求队列满 */ - private HealthCheckResult checkBrokerRequestQueueFull(Tuple paramTuple) { + private HealthCheckResult checkBrokerRequestQueueFull(Tuple paramTuple) { BrokerParam param = (BrokerParam) paramTuple.getV1(); HealthCompareValueConfig singleConfig = (HealthCompareValueConfig) paramTuple.getV2(); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/cluster/HealthCheckClusterService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/cluster/HealthCheckClusterService.java index 2be267a2..a8201f45 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/cluster/HealthCheckClusterService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/cluster/HealthCheckClusterService.java @@ -6,6 +6,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.Ba import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; @@ -34,7 +35,7 @@ public class HealthCheckClusterService extends AbstractHealthCheckService { } @Override - public List getResList(Long clusterPhyId) { + public List getResList(Long clusterPhyId) { return Arrays.asList(new ClusterPhyParam(clusterPhyId)); } @@ -46,8 +47,8 @@ public class HealthCheckClusterService extends AbstractHealthCheckService { /** * 检查NoController */ - private HealthCheckResult checkClusterNoController(Tuple singleConfigSimpleTuple) { - ClusterPhyParam param = singleConfigSimpleTuple.getV1(); + private HealthCheckResult checkClusterNoController(Tuple singleConfigSimpleTuple) { + ClusterPhyParam param =(ClusterPhyParam) singleConfigSimpleTuple.getV1(); HealthCompareValueConfig valueConfig = (HealthCompareValueConfig) singleConfigSimpleTuple.getV2(); Result clusterMetricsResult = clusterMetricService.getLatestMetricsFromES(param.getClusterPhyId(), Arrays.asList(ClusterMetricVersionItems.CLUSTER_METRIC_ACTIVE_CONTROLLER_COUNT)); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/group/HealthCheckGroupService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/group/HealthCheckGroupService.java index 522d76b8..8cc40c20 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/group/HealthCheckGroupService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/group/HealthCheckGroupService.java @@ -5,6 +5,7 @@ import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthDetectedInLatestMinutesConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.GroupParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -43,7 +44,7 @@ public class HealthCheckGroupService extends AbstractHealthCheckService { } @Override - public List getResList(Long clusterPhyId) { + public List getResList(Long clusterPhyId) { return groupService.getGroupsFromDB(clusterPhyId).stream().map(elem -> new GroupParam(clusterPhyId, elem)).collect(Collectors.toList()); } @@ -55,7 +56,7 @@ public class HealthCheckGroupService extends AbstractHealthCheckService { /** * 检查Group re-balance太频繁 */ - private HealthCheckResult checkReBalanceTooFrequently(Tuple paramTuple) { + private HealthCheckResult checkReBalanceTooFrequently(Tuple paramTuple) { GroupParam param = (GroupParam) paramTuple.getV1(); HealthDetectedInLatestMinutesConfig singleConfig = (HealthDetectedInLatestMinutesConfig) paramTuple.getV2(); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/topic/HealthCheckTopicService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/topic/HealthCheckTopicService.java index 5c37ee2c..613d2902 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/topic/HealthCheckTopicService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/topic/HealthCheckTopicService.java @@ -6,6 +6,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.Ba import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthDetectedInLatestMinutesConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; @@ -49,8 +50,8 @@ public class HealthCheckTopicService extends AbstractHealthCheckService { } @Override - public List getResList(Long clusterPhyId) { - List paramList = new ArrayList<>(); + public List getResList(Long clusterPhyId) { + List paramList = new ArrayList<>(); for (Topic topic: topicService.listTopicsFromDB(clusterPhyId)) { paramList.add(new TopicParam(clusterPhyId, topic.getTopicName())); } @@ -65,7 +66,7 @@ public class HealthCheckTopicService extends AbstractHealthCheckService { /** * 检查Topic长期未同步 */ - private HealthCheckResult checkTopicUnderReplicatedPartition(Tuple paramTuple) { + private HealthCheckResult checkTopicUnderReplicatedPartition(Tuple paramTuple) { TopicParam param = (TopicParam) paramTuple.getV1(); HealthDetectedInLatestMinutesConfig singleConfig = (HealthDetectedInLatestMinutesConfig) paramTuple.getV2(); @@ -97,7 +98,7 @@ public class HealthCheckTopicService extends AbstractHealthCheckService { /** * 检查NoLeader */ - private HealthCheckResult checkTopicNoLeader(Tuple singleConfigSimpleTuple) { + private HealthCheckResult checkTopicNoLeader(Tuple singleConfigSimpleTuple) { TopicParam param = (TopicParam) singleConfigSimpleTuple.getV1(); List partitionList = partitionService.listPartitionFromCacheFirst(param.getClusterPhyId(), param.getTopicName()); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java index b83a4ee4..ef3cb554 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/zookeeper/HealthCheckZookeeperService.java @@ -9,6 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.He import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.zookeeper.ZookeeperParam; @@ -58,7 +59,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { } @Override - public List getResList(Long clusterPhyId) { + public List getResList(Long clusterPhyId) { ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId); if (clusterPhy == null) { return new ArrayList<>(); @@ -82,7 +83,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { return HealthCheckDimensionEnum.ZOOKEEPER; } - private HealthCheckResult checkBrainSplit(Tuple singleConfigSimpleTuple) { + private HealthCheckResult checkBrainSplit(Tuple singleConfigSimpleTuple) { ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); HealthCompareValueConfig valueConfig = (HealthCompareValueConfig) singleConfigSimpleTuple.getV2(); @@ -100,7 +101,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { return checkResult; } - private HealthCheckResult checkOutstandingRequests(Tuple singleConfigSimpleTuple) { + private HealthCheckResult checkOutstandingRequests(Tuple singleConfigSimpleTuple) { ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); @@ -135,7 +136,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { return checkResult; } - private HealthCheckResult checkWatchCount(Tuple singleConfigSimpleTuple) { + private HealthCheckResult checkWatchCount(Tuple singleConfigSimpleTuple) { ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); @@ -171,7 +172,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { return checkResult; } - private HealthCheckResult checkAliveConnections(Tuple singleConfigSimpleTuple) { + private HealthCheckResult checkAliveConnections(Tuple singleConfigSimpleTuple) { ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); @@ -207,7 +208,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { return checkResult; } - private HealthCheckResult checkApproximateDataSize(Tuple singleConfigSimpleTuple) { + private HealthCheckResult checkApproximateDataSize(Tuple singleConfigSimpleTuple) { ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); @@ -243,7 +244,7 @@ public class HealthCheckZookeeperService extends AbstractHealthCheckService { return checkResult; } - private HealthCheckResult checkSentRate(Tuple singleConfigSimpleTuple) { + private HealthCheckResult checkSentRate(Tuple singleConfigSimpleTuple) { ZookeeperParam param = (ZookeeperParam) singleConfigSimpleTuple.getV1(); HealthAmountRatioConfig valueConfig = (HealthAmountRatioConfig) singleConfigSimpleTuple.getV2(); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/AbstractHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/AbstractHealthCheckTask.java index 3c9fdf23..f8c2185b 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/AbstractHealthCheckTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/health/AbstractHealthCheckTask.java @@ -6,6 +6,7 @@ import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; @@ -41,15 +42,15 @@ public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispat List resultList = new ArrayList<>(); // 遍历Check-Service - List paramList = this.getCheckService().getResList(clusterPhy.getId()); + List paramList = this.getCheckService().getResList(clusterPhy.getId()); if (ValidateUtils.isEmptyList(paramList)) { // 当前无该维度的资源,则直接设置为 resultList.addAll(this.getNoResResult(clusterPhy.getId(), this.getCheckService(), healthConfigMap)); } // 遍历资源 - for (ClusterPhyParam clusterPhyParam: paramList) { - resultList.addAll(this.checkAndGetResult(clusterPhyParam, healthConfigMap)); + for (ClusterParam clusterParam: paramList) { + resultList.addAll(this.checkAndGetResult(clusterParam, healthConfigMap)); } try { @@ -93,13 +94,13 @@ public abstract class AbstractHealthCheckTask extends AbstractAsyncMetricsDispat return resultList; } - private List checkAndGetResult(ClusterPhyParam clusterPhyParam, + private List checkAndGetResult(ClusterParam clusterParam, Map healthConfigMap) { List resultList = new ArrayList<>(); // 进行检查 for (BaseClusterHealthConfig clusterHealthConfig: healthConfigMap.values()) { - HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterPhyParam, clusterHealthConfig); + HealthCheckResult healthCheckResult = this.getCheckService().checkAndGetResult(clusterParam, clusterHealthConfig); if (healthCheckResult == null) { continue; }