From 5110b30f622b8a209e7a0392d178dd43ee0c4eec Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 9 Feb 2023 15:35:19 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]MM2=E7=AE=A1=E7=90=86-MM2=E5=81=A5?= =?UTF-8?q?=E5=BA=B7=E5=B7=A1=E6=A3=80(#894)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bean/entity/cluster/ClusterPhysState.java | 2 + .../param/connect/mm2/MirrorMakerParam.java | 32 +++ .../bean/vo/cluster/ClusterPhysStateVO.java | 3 + .../health/HealthCheckDimensionEnum.java | 2 + .../enums/health/HealthCheckNameEnum.java | 43 +++- .../connect/HealthCheckConnectorService.java | 31 ++- .../mm2/HealthCheckMirrorMakerService.java | 205 ++++++++++++++++++ .../checkresult/HealthCheckResultService.java | 2 + .../impl/HealthCheckResultServiceImpl.java | 22 +- .../health/state/HealthStateService.java | 2 + .../state/impl/HealthStateServiceImpl.java | 51 +++++ .../api/v3/health/KafkaHealthController.java | 8 + .../health/MirrorMakerHealthCheckTask.java | 33 +++ 13 files changed, 414 insertions(+), 22 deletions(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/connect/mm2/MirrorMakerParam.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/connect/mm2/HealthCheckMirrorMakerService.java create mode 100644 km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/mm2/health/MirrorMakerHealthCheckTask.java diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/cluster/ClusterPhysState.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/cluster/ClusterPhysState.java index 38daaebc..2f1abc29 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/cluster/ClusterPhysState.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/cluster/ClusterPhysState.java @@ -18,5 +18,7 @@ public class ClusterPhysState { private Integer downCount; + private Integer unknownCount; + private Integer total; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/connect/mm2/MirrorMakerParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/connect/mm2/MirrorMakerParam.java new file mode 100644 index 00000000..357559c6 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/connect/mm2/MirrorMakerParam.java @@ -0,0 +1,32 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.connect.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ConnectClusterParam; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author wyb + * @date 2022/12/21 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class MirrorMakerParam extends ConnectClusterParam { + + private String mirrorMakerName; + + private String connectorType; + + List mirrorMakerTopicList; + + public MirrorMakerParam(Long connectClusterId, String connectorType, String mirrorMakerName, List mirrorMakerTopicList) { + super(connectClusterId); + this.mirrorMakerName = mirrorMakerName; + this.connectorType = connectorType; + this.mirrorMakerTopicList = mirrorMakerTopicList; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/ClusterPhysStateVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/ClusterPhysStateVO.java index fdd814c0..b8a7c8dd 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/ClusterPhysStateVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/ClusterPhysStateVO.java @@ -18,6 +18,9 @@ public class ClusterPhysStateVO { @ApiModelProperty(value = "挂掉集群数", example = "10") private Integer downCount; + @ApiModelProperty(value = "未知状态集群数", example = "10") + private Integer unknownCount; + @ApiModelProperty(value = "集群总数", example = "40") private Integer total; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java index eaa730f4..5312f57a 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckDimensionEnum.java @@ -24,6 +24,8 @@ public enum HealthCheckDimensionEnum { CONNECTOR(6, "Connector", "Connect"), + MIRROR_MAKER(7,"MirrorMaker","MirrorMaker"), + MAX_VAL(100, "所有的dimension的值需要小于MAX_VAL", "Ignore") ; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java index 2d6d4133..20ddccc3 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java @@ -136,7 +136,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.CONNECT_CLUSTER, "TaskStartupFailurePercentage", Constant.HC_CONFIG_NAME_PREFIX+"CONNECT_CLUSTER_TASK_STARTUP_FAILURE_PERCENTAGE", - "connect集群任务启动失败概率", + "Connect集群任务启动失败概率", HealthCompareValueConfig.class, false ), @@ -145,7 +145,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.CONNECTOR, "ConnectorFailedTaskCount", Constant.HC_CONFIG_NAME_PREFIX+"CONNECTOR_FAILED_TASK_COUNT", - "connector失败状态的任务数量", + "Connector失败状态的任务数量", HealthCompareValueConfig.class, false ), @@ -154,13 +154,50 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.CONNECTOR, "ConnectorUnassignedTaskCount", Constant.HC_CONFIG_NAME_PREFIX+"CONNECTOR_UNASSIGNED_TASK_COUNT", - "connector未被分配的任务数量", + "Connector未被分配的任务数量", + HealthCompareValueConfig.class, + false + ), + + MIRROR_MAKER_FAILED_TASK_COUNT( + HealthCheckDimensionEnum.MIRROR_MAKER, + "MirrorMakerFailedTaskCount", + Constant.HC_CONFIG_NAME_PREFIX+"MIRROR_MAKER_FAILED_TASK_COUNT", + "MirrorMaker失败状态的任务数量", + HealthCompareValueConfig.class, + false + ), + + MIRROR_MAKER_UNASSIGNED_TASK_COUNT( + HealthCheckDimensionEnum.MIRROR_MAKER, + "MirrorMakerUnassignedTaskCount", + Constant.HC_CONFIG_NAME_PREFIX+"MIRROR_MAKER_UNASSIGNED_TASK_COUNT", + "MirrorMaker未被分配的任务数量", + HealthCompareValueConfig.class, + false + ), + + MIRROR_MAKER_TOTAL_RECORD_ERRORS( + HealthCheckDimensionEnum.MIRROR_MAKER, + "TotalRecord-errors", + Constant.HC_CONFIG_NAME_PREFIX + "MIRROR_MAKER_TOTAL_RECORD_ERRORS", + "MirrorMaker消息处理错误的次数", + HealthCompareValueConfig.class, + false + ), + + MIRROR_MAKER_REPLICATION_LATENCY_MS_MAX( + HealthCheckDimensionEnum.MIRROR_MAKER, + "ReplicationLatencyMsMax", + Constant.HC_CONFIG_NAME_PREFIX + "MIRROR_MAKER_REPLICATION_LATENCY_MS_MAX", + "MirrorMaker消息复制最大延迟时间", HealthCompareValueConfig.class, false ) + ; /** diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/connect/HealthCheckConnectorService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/connect/HealthCheckConnectorService.java index e4286423..fe60bb86 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/connect/HealthCheckConnectorService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/connect/HealthCheckConnectorService.java @@ -10,7 +10,9 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.Conne import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.connect.ConnectorParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; @@ -55,13 +57,10 @@ public class HealthCheckConnectorService extends AbstractHealthCheckService { @Override public List getResList(Long connectClusterId) { List paramList = new ArrayList<>(); - Result> ret = connectorService.listConnectorsFromCluster(connectClusterId); - if (!ret.hasData()) { - return paramList; - } + List connectorPOList = connectorService.listByConnectClusterIdFromDB(connectClusterId); - for (String connectorName : ret.getData()) { - paramList.add(new ConnectorParam(connectClusterId, connectorName)); + for (ConnectorPO connectorPO : connectorPOList) { + paramList.add(new ConnectorParam(connectClusterId, connectorPO.getConnectorName(), connectorPO.getConnectorType())); } return paramList; @@ -88,9 +87,10 @@ public class HealthCheckConnectorService extends AbstractHealthCheckService { Long connectClusterId = param.getConnectClusterId(); String connectorName = param.getConnectorName(); + String connectorType = param.getConnectorType(); Double compareValue = compareConfig.getValue(); - return this.getHealthCompareResult(connectClusterId, connectorName, CONNECTOR_METRIC_CONNECTOR_FAILED_TASK_COUNT, HealthCheckNameEnum.CONNECTOR_FAILED_TASK_COUNT, compareValue); + return this.getHealthCompareResult(connectClusterId, connectorName, connectorType, HealthCheckDimensionEnum.CONNECTOR.getDimension(), CONNECTOR_METRIC_CONNECTOR_FAILED_TASK_COUNT, HealthCheckNameEnum.CONNECTOR_FAILED_TASK_COUNT, compareValue); } private HealthCheckResult checkUnassignedTaskCount(Tuple paramTuple) { @@ -99,17 +99,18 @@ public class HealthCheckConnectorService extends AbstractHealthCheckService { Long connectClusterId = param.getConnectClusterId(); String connectorName = param.getConnectorName(); + String connectorType = param.getConnectorType(); Double compareValue = compareConfig.getValue(); - return this.getHealthCompareResult(connectClusterId, connectorName, CONNECTOR_METRIC_CONNECTOR_UNASSIGNED_TASK_COUNT, HealthCheckNameEnum.CONNECTOR_UNASSIGNED_TASK_COUNT, compareValue); + return this.getHealthCompareResult(connectClusterId, connectorName, connectorType, HealthCheckDimensionEnum.CONNECTOR.getDimension(), CONNECTOR_METRIC_CONNECTOR_UNASSIGNED_TASK_COUNT, HealthCheckNameEnum.CONNECTOR_UNASSIGNED_TASK_COUNT, compareValue); } - private HealthCheckResult getHealthCompareResult(Long connectClusterId, String connectorName, String metricName, HealthCheckNameEnum healthCheckNameEnum, Double compareValue) { + public HealthCheckResult getHealthCompareResult(Long connectClusterId, String connectorName, String connectorType, Integer dimension, String metricName, HealthCheckNameEnum healthCheckNameEnum, Double compareValue) { - Result ret = connectorMetricService.collectConnectClusterMetricsFromKafka(connectClusterId, connectorName, metricName); + Result ret = connectorMetricService.collectConnectClusterMetricsFromKafka(connectClusterId, connectorName, metricName , ConnectorTypeEnum.getByName(connectorType)); - if (!ret.hasData()) { + if (!ret.hasData() || ret.getData().getMetric(metricName) == null) { log.error("method=getHealthCompareResult||connectClusterId={}||connectorName={}||metricName={}||errMsg=get metrics failed", connectClusterId, connectorName, metricName); return null; @@ -117,14 +118,8 @@ public class HealthCheckConnectorService extends AbstractHealthCheckService { Float value = ret.getData().getMetric(metricName); - if (value == null) { - log.error("method=getHealthCompareResult||connectClusterId={}||connectorName={}||metricName={}||errMsg=get metrics failed", - connectClusterId, connectorName, metricName); - return null; - } - HealthCheckResult checkResult = new HealthCheckResult( - HealthCheckDimensionEnum.CONNECTOR.getDimension(), + dimension, healthCheckNameEnum.getConfigName(), connectClusterId, connectorName diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/connect/mm2/HealthCheckMirrorMakerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/connect/mm2/HealthCheckMirrorMakerService.java new file mode 100644 index 00000000..70df3134 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checker/connect/mm2/HealthCheckMirrorMakerService.java @@ -0,0 +1,205 @@ +package com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.mm2; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.google.common.collect.Table; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic; +import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.connect.mm2.MirrorMakerParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; +import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService; +import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.HealthCheckConnectorService; +import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache; +import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.connector.ConnectorMetricESDAO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.*; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE; +import static com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum.SOURCE; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectorMetricVersionItems.*; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX; + +/** + * @author wyb + * @date 2022/12/21 + */ +@Service +public class HealthCheckMirrorMakerService extends AbstractHealthCheckService { + private static final ILog log = LogFactory.getLog(HealthCheckMirrorMakerService.class); + + @Autowired + private ConnectorService connectorService; + + @Autowired + private MirrorMakerService mirrorMakerService; + + @Autowired + private ConnectClusterService connectClusterService; + + @Autowired + private MirrorMakerMetricService mirrorMakerMetricService; + + @Autowired + private ConnectorMetricESDAO connectorMetricESDAO; + + @Autowired + private HealthCheckConnectorService healthCheckConnectorService; + + private static final Long TEN_MIN = 10 * 60 * 1000L; + + @PostConstruct + private void init() { + functionMap.put(HealthCheckNameEnum.MIRROR_MAKER_UNASSIGNED_TASK_COUNT.getConfigName(), this::checkUnassignedTaskCount); + functionMap.put(HealthCheckNameEnum.MIRROR_MAKER_FAILED_TASK_COUNT.getConfigName(), this::checkFailedTaskCount); + functionMap.put(HealthCheckNameEnum.MIRROR_MAKER_REPLICATION_LATENCY_MS_MAX.getConfigName(), this::checkReplicationLatencyMsMax); + functionMap.put(HealthCheckNameEnum.MIRROR_MAKER_TOTAL_RECORD_ERRORS.getConfigName(), this::checkTotalRecordErrors); + } + + @Override + public List getResList(Long connectClusterId) { + List paramList = new ArrayList<>(); + List mirrorMakerList = connectorService.listByConnectClusterIdFromDB(connectClusterId).stream().filter(elem -> elem.getConnectorType().equals(SOURCE.name()) && elem.getConnectorClassName().equals(MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList()); + + if (mirrorMakerList.isEmpty()) { + return paramList; + } + Result> ret = mirrorMakerService.getMirrorMakerTopicMap(connectClusterId); + + if (!ret.hasData()) { + log.error("method=getResList||connectClusterId={}||get MirrorMakerTopicMap failed!", connectClusterId); + return paramList; + } + + Map mirrorMakerTopicMap = ret.getData(); + + for (ConnectorPO mirrorMaker : mirrorMakerList) { + List mirrorMakerTopicList = mirrorMakerService.getMirrorMakerTopicList(mirrorMaker, mirrorMakerTopicMap); + paramList.add(new MirrorMakerParam(connectClusterId, mirrorMaker.getConnectorType(), mirrorMaker.getConnectorName(), mirrorMakerTopicList)); + } + return paramList; + } + + @Override + public HealthCheckDimensionEnum getHealthCheckDimensionEnum() { + return HealthCheckDimensionEnum.MIRROR_MAKER; + } + + @Override + public Integer getDimensionCodeIfSupport(Long kafkaClusterPhyId) { + List clusterList = connectClusterService.listByKafkaCluster(kafkaClusterPhyId); + if (ValidateUtils.isEmptyList(clusterList)) { + return null; + } + + return this.getHealthCheckDimensionEnum().getDimension(); + } + + private HealthCheckResult checkFailedTaskCount(Tuple paramTuple) { + MirrorMakerParam param = (MirrorMakerParam) paramTuple.getV1(); + HealthCompareValueConfig compareConfig = (HealthCompareValueConfig) paramTuple.getV2(); + + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + String connectorType = param.getConnectorType(); + Double compareValue = compareConfig.getValue(); + + return healthCheckConnectorService.getHealthCompareResult(connectClusterId, mirrorMakerName, connectorType, HealthCheckDimensionEnum.MIRROR_MAKER.getDimension(), CONNECTOR_METRIC_CONNECTOR_FAILED_TASK_COUNT, HealthCheckNameEnum.MIRROR_MAKER_FAILED_TASK_COUNT, compareValue); + } + + private HealthCheckResult checkUnassignedTaskCount(Tuple paramTuple) { + MirrorMakerParam param = (MirrorMakerParam) paramTuple.getV1(); + HealthCompareValueConfig compareConfig = (HealthCompareValueConfig) paramTuple.getV2(); + + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + String connectorType = param.getConnectorType(); + Double compareValue = compareConfig.getValue(); + + return healthCheckConnectorService.getHealthCompareResult(connectClusterId, mirrorMakerName, connectorType, HealthCheckDimensionEnum.MIRROR_MAKER.getDimension(), CONNECTOR_METRIC_CONNECTOR_UNASSIGNED_TASK_COUNT, HealthCheckNameEnum.MIRROR_MAKER_UNASSIGNED_TASK_COUNT, compareValue); + } + + private HealthCheckResult checkReplicationLatencyMsMax(Tuple paramTuple) { + MirrorMakerParam param = (MirrorMakerParam) paramTuple.getV1(); + HealthCompareValueConfig compareConfig = (HealthCompareValueConfig) paramTuple.getV2(); + + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + List mirrorMakerTopicList = param.getMirrorMakerTopicList(); + String metricName = MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX; + + Result ret = mirrorMakerMetricService.collectMirrorMakerMetricsFromKafka(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metricName); + + if (!ret.hasData() || ret.getData().getMetric(metricName) == null) { + log.error("method=checkReplicationLatencyMsMax||connectClusterId={}||metricName={}||errMsg=get metrics failed", + param.getConnectClusterId(), metricName); + return null; + } + + Float value = ret.getData().getMetric(metricName); + + HealthCheckResult checkResult = new HealthCheckResult( + HealthCheckDimensionEnum.MIRROR_MAKER.getDimension(), + HealthCheckNameEnum.MIRROR_MAKER_REPLICATION_LATENCY_MS_MAX.getConfigName(), + connectClusterId, + mirrorMakerName + ); + checkResult.setPassed(value <= compareConfig.getValue() ? Constant.YES : Constant.NO); + return checkResult; + } + + private HealthCheckResult checkTotalRecordErrors(Tuple paramTuple){ + MirrorMakerParam param = (MirrorMakerParam) paramTuple.getV1(); + HealthCompareValueConfig compareConfig = (HealthCompareValueConfig) paramTuple.getV2(); + + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + List mirrorMakerTopicList = param.getMirrorMakerTopicList(); + + ConnectCluster connectCluster = LoadedConnectClusterCache.getByPhyId(connectClusterId); + Long endTime = System.currentTimeMillis(); + Long startTime = endTime - TEN_MIN; + Tuple connectClusterIdAndName = new Tuple<>(connectClusterId, mirrorMakerName); + String metricName = CONNECTOR_METRIC_TOTAL_RECORD_ERRORS; + + Table, List> table = connectorMetricESDAO.listMetricsByConnectors(connectCluster.getKafkaClusterPhyId(), Arrays.asList(metricName), "avg", Arrays.asList(connectClusterIdAndName), startTime, endTime); + List pointVOList = table.get(metricName, connectClusterIdAndName); + Collections.sort(pointVOList, (p1, p2) -> p2.getTimeStamp().compareTo(p1.getTimeStamp())); + + HealthCheckResult checkResult = new HealthCheckResult( + HealthCheckDimensionEnum.MIRROR_MAKER.getDimension(), + HealthCheckNameEnum.MIRROR_MAKER_TOTAL_RECORD_ERRORS.getConfigName(), + connectClusterId, + mirrorMakerName + ); + + double diff = 0; + if (pointVOList.size() > 1) { + diff = Double.valueOf(pointVOList.get(0).getValue()) - Double.valueOf(pointVOList.get(1).getValue()); + } + checkResult.setPassed(diff <= compareConfig.getValue() ? Constant.YES : Constant.NO); + + return checkResult; + } + + +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java index 05346d0d..d8038b5f 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/HealthCheckResultService.java @@ -26,4 +26,6 @@ public interface HealthCheckResultService { void batchReplace(Long clusterPhyId, Integer dimension, List healthCheckResults); List getConnectorHealthCheckResult(Long clusterPhyId); + + List getMirrorMakerHealthCheckResult(Long clusterPhyId); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java index 0d15561f..4c1471b6 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/checkresult/impl/HealthCheckResultServiceImpl.java @@ -27,6 +27,7 @@ import java.util.*; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum.CONNECTOR; +import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum.MIRROR_MAKER; @Service public class HealthCheckResultServiceImpl implements HealthCheckResultService { @@ -146,7 +147,26 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(HealthCheckResultPO::getDimension, CONNECTOR.getDimension()); wrapper.in(HealthCheckResultPO::getClusterPhyId, connectClusterIdList); - resultPOList.addAll(healthCheckResultDAO.selectList(wrapper)); + resultPOList = healthCheckResultDAO.selectList(wrapper); + return resultPOList; + } + + @Override + public List getMirrorMakerHealthCheckResult(Long clusterPhyId) { + List resultPOList = new ArrayList<>(); + + //查找connect集群 + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(ConnectClusterPO::getKafkaClusterPhyId, clusterPhyId); + List connectClusterIdList = connectClusterDAO.selectList(lambdaQueryWrapper).stream().map(elem -> elem.getId()).collect(Collectors.toList()); + if (ValidateUtils.isEmptyList(connectClusterIdList)) { + return resultPOList; + } + + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + wrapper.eq(HealthCheckResultPO::getDimension, MIRROR_MAKER.getDimension()); + wrapper.in(HealthCheckResultPO::getClusterPhyId, connectClusterIdList); + resultPOList = healthCheckResultDAO.selectList(wrapper); return resultPOList; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java index f9fc3edb..00b4fcc6 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/HealthStateService.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.health.state; import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; import java.util.List; @@ -17,6 +18,7 @@ public interface HealthStateService { GroupMetrics calGroupHealthMetrics(Long clusterPhyId, String groupName); ZookeeperMetrics calZookeeperHealthMetrics(Long clusterPhyId); ConnectorMetrics calConnectorHealthMetrics(Long connectClusterId, String connectorName); + MirrorMakerMetrics calMirrorMakerHealthMetrics(Long connectClusterId, String mirrorMakerName); /** * 获取集群健康检查结果 diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java index 3610cfb7..5247564b 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/health/state/impl/HealthStateServiceImpl.java @@ -7,6 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAgg import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO; import com.xiaojukeji.know.streaming.km.common.component.SpringTool; import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum; @@ -29,6 +30,7 @@ import java.util.List; import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectorMetricVersionItems.*; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.BrokerMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*; @@ -72,6 +74,7 @@ public class HealthStateServiceImpl implements HealthStateService { metrics.putMetric(this.calClusterGroupsHealthMetrics(clusterPhyId).getMetrics()); metrics.putMetric(this.calZookeeperHealthMetrics(clusterPhyId).getMetrics()); metrics.putMetric(this.calClusterConnectsHealthMetrics(clusterPhyId).getMetrics()); + metrics.putMetric(this.calClusterMirrorMakersHealthMetrics(clusterPhyId).getMetrics()); // 统计最终结果 Float passed = 0.0f; @@ -81,6 +84,7 @@ public class HealthStateServiceImpl implements HealthStateService { passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS); passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER); passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CONNECTOR); + passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_MIRROR_MAKER); Float total = 0.0f; total += metrics.getMetric(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL); @@ -89,6 +93,7 @@ public class HealthStateServiceImpl implements HealthStateService { total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS); total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER); total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CONNECTOR); + total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_MIRROR_MAKER); // 状态 Float state = 0.0f; @@ -98,6 +103,7 @@ public class HealthStateServiceImpl implements HealthStateService { state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_GROUPS)); state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_CLUSTER)); state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_CONNECTOR)); + state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_MIRROR_MAKER)); metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED, passed); metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL, total); @@ -225,6 +231,31 @@ public class HealthStateServiceImpl implements HealthStateService { return metrics; } + @Override + public MirrorMakerMetrics calMirrorMakerHealthMetrics(Long connectClusterId, String mirrorMakerName) { + ConnectCluster connectCluster = LoadedConnectClusterCache.getByPhyId(connectClusterId); + MirrorMakerMetrics metrics = new MirrorMakerMetrics(connectClusterId, mirrorMakerName); + + if (connectCluster == null) { + metrics.putMetric(MIRROR_MAKER_METRIC_HEALTH_STATE, (float) HealthStateEnum.DEAD.getDimension()); + return metrics; + } + + List resultList = healthCheckResultService.getHealthCheckAggResult(connectClusterId, HealthCheckDimensionEnum.MIRROR_MAKER, mirrorMakerName); + + if (ValidateUtils.isEmptyList(resultList)) { + metrics.getMetrics().put(MIRROR_MAKER_METRIC_HEALTH_CHECK_PASSED, 0.0f); + metrics.getMetrics().put(MIRROR_MAKER_METRIC_HEALTH_CHECK_TOTAL, 0.0f); + } else { + metrics.getMetrics().put(MIRROR_MAKER_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckPassed(resultList)); + metrics.getMetrics().put(MIRROR_MAKER_METRIC_HEALTH_CHECK_TOTAL, (float) resultList.size()); + } + + metrics.putMetric(MIRROR_MAKER_METRIC_HEALTH_STATE, (float) this.calHealthState(resultList).getDimension()); + return metrics; + } + + @Override public List getAllDimensionHealthResult(Long clusterPhyId) { List supportedDimensionCodeList = new ArrayList<>(); @@ -249,6 +280,8 @@ public class HealthStateServiceImpl implements HealthStateService { for (Integer dimensionCode : dimensionCodeList) { if (dimensionCode.equals(HealthCheckDimensionEnum.CONNECTOR.getDimension())) { poList.addAll(healthCheckResultService.getConnectorHealthCheckResult(clusterPhyId)); + } else if (dimensionCode.equals(HealthCheckDimensionEnum.MIRROR_MAKER.getDimension())) { + poList.addAll(healthCheckResultService.getMirrorMakerHealthCheckResult(clusterPhyId)); } else { poList.addAll(healthCheckResultService.listCheckResult(clusterPhyId, dimensionCode)); } @@ -366,6 +399,24 @@ public class HealthStateServiceImpl implements HealthStateService { return metrics; } + private ClusterMetrics calClusterMirrorMakersHealthMetrics(Long clusterPhyId){ + List mirrorMakerHealthCheckResult = healthCheckResultService.getMirrorMakerHealthCheckResult(clusterPhyId); + List resultList = this.getDimensionHealthCheckAggResult(mirrorMakerHealthCheckResult, Arrays.asList(MIRROR_MAKER.getDimension())); + + ClusterMetrics metrics = new ClusterMetrics(clusterPhyId); + + if (ValidateUtils.isEmptyList(resultList)) { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_MIRROR_MAKER, 0.0f); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_MIRROR_MAKER, 0.0f); + } else { + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_MIRROR_MAKER, this.getHealthCheckPassed(resultList)); + metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_MIRROR_MAKER, (float) resultList.size()); + } + + metrics.putMetric(CLUSTER_METRIC_HEALTH_STATE_MIRROR_MAKER, (float) this.calHealthState(resultList).getDimension()); + return metrics; + } + /**************************************************** 聚合数据 ****************************************************/ diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/health/KafkaHealthController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/health/KafkaHealthController.java index 48a3bf46..062cf954 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/health/KafkaHealthController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/health/KafkaHealthController.java @@ -60,6 +60,14 @@ public class KafkaHealthController { @ResponseBody public Result> getClusterHealthCheckResultDetail(@PathVariable Long clusterPhyId, @RequestBody List dimensionCodeList) { + if (dimensionCodeList.isEmpty()) { + return Result.buildSuc( + HealthScoreVOConverter.convert2HealthScoreResultDetailVOList( + healthStateService.getAllDimensionHealthResult(clusterPhyId) + ) + ); + } + return Result.buildSuc(HealthScoreVOConverter.convert2HealthScoreResultDetailVOList( healthStateService.getDimensionHealthResult(clusterPhyId, dimensionCodeList) )); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/mm2/health/MirrorMakerHealthCheckTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/mm2/health/MirrorMakerHealthCheckTask.java new file mode 100644 index 00000000..f96396a0 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/mm2/health/MirrorMakerHealthCheckTask.java @@ -0,0 +1,33 @@ +package com.xiaojukeji.know.streaming.km.task.connect.mm2.health; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService; +import com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.mm2.HealthCheckMirrorMakerService; +import com.xiaojukeji.know.streaming.km.task.connect.health.AbstractHealthCheckTask; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author wyb + * @date 2022/12/21 + */ +@NoArgsConstructor +@AllArgsConstructor +@Task(name = "MirrorMakerHealthCheckTask", + description = "MirrorMaker健康检查", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class MirrorMakerHealthCheckTask extends AbstractHealthCheckTask { + + @Autowired + private HealthCheckMirrorMakerService healthCheckMirrorMakerService; + + @Override + public AbstractHealthCheckService getCheckService() { + return healthCheckMirrorMakerService; + } +}