mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Feature]MM2管理-MM2健康巡检(#894)
This commit is contained in:
@@ -18,5 +18,7 @@ public class ClusterPhysState {
|
||||
|
||||
private Integer downCount;
|
||||
|
||||
private Integer unknownCount;
|
||||
|
||||
private Integer total;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.connect.mm2;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ConnectClusterParam;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author wyb
|
||||
* @date 2022/12/21
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class MirrorMakerParam extends ConnectClusterParam {
|
||||
|
||||
private String mirrorMakerName;
|
||||
|
||||
private String connectorType;
|
||||
|
||||
List<MirrorMakerTopic> mirrorMakerTopicList;
|
||||
|
||||
public MirrorMakerParam(Long connectClusterId, String connectorType, String mirrorMakerName, List<MirrorMakerTopic> mirrorMakerTopicList) {
|
||||
super(connectClusterId);
|
||||
this.mirrorMakerName = mirrorMakerName;
|
||||
this.connectorType = connectorType;
|
||||
this.mirrorMakerTopicList = mirrorMakerTopicList;
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,9 @@ public class ClusterPhysStateVO {
|
||||
@ApiModelProperty(value = "挂掉集群数", example = "10")
|
||||
private Integer downCount;
|
||||
|
||||
@ApiModelProperty(value = "未知状态集群数", example = "10")
|
||||
private Integer unknownCount;
|
||||
|
||||
@ApiModelProperty(value = "集群总数", example = "40")
|
||||
private Integer total;
|
||||
}
|
||||
|
||||
@@ -24,6 +24,8 @@ public enum HealthCheckDimensionEnum {
|
||||
|
||||
CONNECTOR(6, "Connector", "Connect"),
|
||||
|
||||
MIRROR_MAKER(7,"MirrorMaker","MirrorMaker"),
|
||||
|
||||
MAX_VAL(100, "所有的dimension的值需要小于MAX_VAL", "Ignore")
|
||||
|
||||
;
|
||||
|
||||
@@ -136,7 +136,7 @@ public enum HealthCheckNameEnum {
|
||||
HealthCheckDimensionEnum.CONNECT_CLUSTER,
|
||||
"TaskStartupFailurePercentage",
|
||||
Constant.HC_CONFIG_NAME_PREFIX+"CONNECT_CLUSTER_TASK_STARTUP_FAILURE_PERCENTAGE",
|
||||
"connect集群任务启动失败概率",
|
||||
"Connect集群任务启动失败概率",
|
||||
HealthCompareValueConfig.class,
|
||||
false
|
||||
),
|
||||
@@ -145,7 +145,7 @@ public enum HealthCheckNameEnum {
|
||||
HealthCheckDimensionEnum.CONNECTOR,
|
||||
"ConnectorFailedTaskCount",
|
||||
Constant.HC_CONFIG_NAME_PREFIX+"CONNECTOR_FAILED_TASK_COUNT",
|
||||
"connector失败状态的任务数量",
|
||||
"Connector失败状态的任务数量",
|
||||
HealthCompareValueConfig.class,
|
||||
false
|
||||
),
|
||||
@@ -154,13 +154,50 @@ public enum HealthCheckNameEnum {
|
||||
HealthCheckDimensionEnum.CONNECTOR,
|
||||
"ConnectorUnassignedTaskCount",
|
||||
Constant.HC_CONFIG_NAME_PREFIX+"CONNECTOR_UNASSIGNED_TASK_COUNT",
|
||||
"connector未被分配的任务数量",
|
||||
"Connector未被分配的任务数量",
|
||||
HealthCompareValueConfig.class,
|
||||
false
|
||||
),
|
||||
|
||||
MIRROR_MAKER_FAILED_TASK_COUNT(
|
||||
HealthCheckDimensionEnum.MIRROR_MAKER,
|
||||
"MirrorMakerFailedTaskCount",
|
||||
Constant.HC_CONFIG_NAME_PREFIX+"MIRROR_MAKER_FAILED_TASK_COUNT",
|
||||
"MirrorMaker失败状态的任务数量",
|
||||
HealthCompareValueConfig.class,
|
||||
false
|
||||
),
|
||||
|
||||
MIRROR_MAKER_UNASSIGNED_TASK_COUNT(
|
||||
HealthCheckDimensionEnum.MIRROR_MAKER,
|
||||
"MirrorMakerUnassignedTaskCount",
|
||||
Constant.HC_CONFIG_NAME_PREFIX+"MIRROR_MAKER_UNASSIGNED_TASK_COUNT",
|
||||
"MirrorMaker未被分配的任务数量",
|
||||
HealthCompareValueConfig.class,
|
||||
false
|
||||
),
|
||||
|
||||
MIRROR_MAKER_TOTAL_RECORD_ERRORS(
|
||||
HealthCheckDimensionEnum.MIRROR_MAKER,
|
||||
"TotalRecord-errors",
|
||||
Constant.HC_CONFIG_NAME_PREFIX + "MIRROR_MAKER_TOTAL_RECORD_ERRORS",
|
||||
"MirrorMaker消息处理错误的次数",
|
||||
HealthCompareValueConfig.class,
|
||||
false
|
||||
),
|
||||
|
||||
MIRROR_MAKER_REPLICATION_LATENCY_MS_MAX(
|
||||
HealthCheckDimensionEnum.MIRROR_MAKER,
|
||||
"ReplicationLatencyMsMax",
|
||||
Constant.HC_CONFIG_NAME_PREFIX + "MIRROR_MAKER_REPLICATION_LATENCY_MS_MAX",
|
||||
"MirrorMaker消息复制最大延迟时间",
|
||||
HealthCompareValueConfig.class,
|
||||
false
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
;
|
||||
|
||||
/**
|
||||
|
||||
@@ -10,7 +10,9 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.Conne
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.connect.ConnectorParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
@@ -55,13 +57,10 @@ public class HealthCheckConnectorService extends AbstractHealthCheckService {
|
||||
@Override
|
||||
public List<ClusterParam> getResList(Long connectClusterId) {
|
||||
List<ClusterParam> paramList = new ArrayList<>();
|
||||
Result<List<String>> ret = connectorService.listConnectorsFromCluster(connectClusterId);
|
||||
if (!ret.hasData()) {
|
||||
return paramList;
|
||||
}
|
||||
List<ConnectorPO> connectorPOList = connectorService.listByConnectClusterIdFromDB(connectClusterId);
|
||||
|
||||
for (String connectorName : ret.getData()) {
|
||||
paramList.add(new ConnectorParam(connectClusterId, connectorName));
|
||||
for (ConnectorPO connectorPO : connectorPOList) {
|
||||
paramList.add(new ConnectorParam(connectClusterId, connectorPO.getConnectorName(), connectorPO.getConnectorType()));
|
||||
}
|
||||
|
||||
return paramList;
|
||||
@@ -88,9 +87,10 @@ public class HealthCheckConnectorService extends AbstractHealthCheckService {
|
||||
|
||||
Long connectClusterId = param.getConnectClusterId();
|
||||
String connectorName = param.getConnectorName();
|
||||
String connectorType = param.getConnectorType();
|
||||
Double compareValue = compareConfig.getValue();
|
||||
|
||||
return this.getHealthCompareResult(connectClusterId, connectorName, CONNECTOR_METRIC_CONNECTOR_FAILED_TASK_COUNT, HealthCheckNameEnum.CONNECTOR_FAILED_TASK_COUNT, compareValue);
|
||||
return this.getHealthCompareResult(connectClusterId, connectorName, connectorType, HealthCheckDimensionEnum.CONNECTOR.getDimension(), CONNECTOR_METRIC_CONNECTOR_FAILED_TASK_COUNT, HealthCheckNameEnum.CONNECTOR_FAILED_TASK_COUNT, compareValue);
|
||||
}
|
||||
|
||||
private HealthCheckResult checkUnassignedTaskCount(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
|
||||
@@ -99,17 +99,18 @@ public class HealthCheckConnectorService extends AbstractHealthCheckService {
|
||||
|
||||
Long connectClusterId = param.getConnectClusterId();
|
||||
String connectorName = param.getConnectorName();
|
||||
String connectorType = param.getConnectorType();
|
||||
Double compareValue = compareConfig.getValue();
|
||||
|
||||
return this.getHealthCompareResult(connectClusterId, connectorName, CONNECTOR_METRIC_CONNECTOR_UNASSIGNED_TASK_COUNT, HealthCheckNameEnum.CONNECTOR_UNASSIGNED_TASK_COUNT, compareValue);
|
||||
return this.getHealthCompareResult(connectClusterId, connectorName, connectorType, HealthCheckDimensionEnum.CONNECTOR.getDimension(), CONNECTOR_METRIC_CONNECTOR_UNASSIGNED_TASK_COUNT, HealthCheckNameEnum.CONNECTOR_UNASSIGNED_TASK_COUNT, compareValue);
|
||||
|
||||
}
|
||||
|
||||
private HealthCheckResult getHealthCompareResult(Long connectClusterId, String connectorName, String metricName, HealthCheckNameEnum healthCheckNameEnum, Double compareValue) {
|
||||
public HealthCheckResult getHealthCompareResult(Long connectClusterId, String connectorName, String connectorType, Integer dimension, String metricName, HealthCheckNameEnum healthCheckNameEnum, Double compareValue) {
|
||||
|
||||
Result<ConnectorMetrics> ret = connectorMetricService.collectConnectClusterMetricsFromKafka(connectClusterId, connectorName, metricName);
|
||||
Result<ConnectorMetrics> ret = connectorMetricService.collectConnectClusterMetricsFromKafka(connectClusterId, connectorName, metricName , ConnectorTypeEnum.getByName(connectorType));
|
||||
|
||||
if (!ret.hasData()) {
|
||||
if (!ret.hasData() || ret.getData().getMetric(metricName) == null) {
|
||||
log.error("method=getHealthCompareResult||connectClusterId={}||connectorName={}||metricName={}||errMsg=get metrics failed",
|
||||
connectClusterId, connectorName, metricName);
|
||||
return null;
|
||||
@@ -117,14 +118,8 @@ public class HealthCheckConnectorService extends AbstractHealthCheckService {
|
||||
|
||||
Float value = ret.getData().getMetric(metricName);
|
||||
|
||||
if (value == null) {
|
||||
log.error("method=getHealthCompareResult||connectClusterId={}||connectorName={}||metricName={}||errMsg=get metrics failed",
|
||||
connectClusterId, connectorName, metricName);
|
||||
return null;
|
||||
}
|
||||
|
||||
HealthCheckResult checkResult = new HealthCheckResult(
|
||||
HealthCheckDimensionEnum.CONNECTOR.getDimension(),
|
||||
dimension,
|
||||
healthCheckNameEnum.getConfigName(),
|
||||
connectClusterId,
|
||||
connectorName
|
||||
|
||||
@@ -0,0 +1,205 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.mm2;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.google.common.collect.Table;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.HealthCompareValueConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.connect.mm2.MirrorMakerParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckNameEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.HealthCheckConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.connector.ConnectorMetricESDAO;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum.SOURCE;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectorMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX;
|
||||
|
||||
/**
|
||||
* @author wyb
|
||||
* @date 2022/12/21
|
||||
*/
|
||||
@Service
|
||||
public class HealthCheckMirrorMakerService extends AbstractHealthCheckService {
|
||||
private static final ILog log = LogFactory.getLog(HealthCheckMirrorMakerService.class);
|
||||
|
||||
@Autowired
|
||||
private ConnectorService connectorService;
|
||||
|
||||
@Autowired
|
||||
private MirrorMakerService mirrorMakerService;
|
||||
|
||||
@Autowired
|
||||
private ConnectClusterService connectClusterService;
|
||||
|
||||
@Autowired
|
||||
private MirrorMakerMetricService mirrorMakerMetricService;
|
||||
|
||||
@Autowired
|
||||
private ConnectorMetricESDAO connectorMetricESDAO;
|
||||
|
||||
@Autowired
|
||||
private HealthCheckConnectorService healthCheckConnectorService;
|
||||
|
||||
private static final Long TEN_MIN = 10 * 60 * 1000L;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
functionMap.put(HealthCheckNameEnum.MIRROR_MAKER_UNASSIGNED_TASK_COUNT.getConfigName(), this::checkUnassignedTaskCount);
|
||||
functionMap.put(HealthCheckNameEnum.MIRROR_MAKER_FAILED_TASK_COUNT.getConfigName(), this::checkFailedTaskCount);
|
||||
functionMap.put(HealthCheckNameEnum.MIRROR_MAKER_REPLICATION_LATENCY_MS_MAX.getConfigName(), this::checkReplicationLatencyMsMax);
|
||||
functionMap.put(HealthCheckNameEnum.MIRROR_MAKER_TOTAL_RECORD_ERRORS.getConfigName(), this::checkTotalRecordErrors);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ClusterParam> getResList(Long connectClusterId) {
|
||||
List<ClusterParam> paramList = new ArrayList<>();
|
||||
List<ConnectorPO> mirrorMakerList = connectorService.listByConnectClusterIdFromDB(connectClusterId).stream().filter(elem -> elem.getConnectorType().equals(SOURCE.name()) && elem.getConnectorClassName().equals(MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList());
|
||||
|
||||
if (mirrorMakerList.isEmpty()) {
|
||||
return paramList;
|
||||
}
|
||||
Result<Map<String, MirrorMakerTopic>> ret = mirrorMakerService.getMirrorMakerTopicMap(connectClusterId);
|
||||
|
||||
if (!ret.hasData()) {
|
||||
log.error("method=getResList||connectClusterId={}||get MirrorMakerTopicMap failed!", connectClusterId);
|
||||
return paramList;
|
||||
}
|
||||
|
||||
Map<String, MirrorMakerTopic> mirrorMakerTopicMap = ret.getData();
|
||||
|
||||
for (ConnectorPO mirrorMaker : mirrorMakerList) {
|
||||
List<MirrorMakerTopic> mirrorMakerTopicList = mirrorMakerService.getMirrorMakerTopicList(mirrorMaker, mirrorMakerTopicMap);
|
||||
paramList.add(new MirrorMakerParam(connectClusterId, mirrorMaker.getConnectorType(), mirrorMaker.getConnectorName(), mirrorMakerTopicList));
|
||||
}
|
||||
return paramList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HealthCheckDimensionEnum getHealthCheckDimensionEnum() {
|
||||
return HealthCheckDimensionEnum.MIRROR_MAKER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getDimensionCodeIfSupport(Long kafkaClusterPhyId) {
|
||||
List<ConnectCluster> clusterList = connectClusterService.listByKafkaCluster(kafkaClusterPhyId);
|
||||
if (ValidateUtils.isEmptyList(clusterList)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return this.getHealthCheckDimensionEnum().getDimension();
|
||||
}
|
||||
|
||||
private HealthCheckResult checkFailedTaskCount(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
|
||||
MirrorMakerParam param = (MirrorMakerParam) paramTuple.getV1();
|
||||
HealthCompareValueConfig compareConfig = (HealthCompareValueConfig) paramTuple.getV2();
|
||||
|
||||
Long connectClusterId = param.getConnectClusterId();
|
||||
String mirrorMakerName = param.getMirrorMakerName();
|
||||
String connectorType = param.getConnectorType();
|
||||
Double compareValue = compareConfig.getValue();
|
||||
|
||||
return healthCheckConnectorService.getHealthCompareResult(connectClusterId, mirrorMakerName, connectorType, HealthCheckDimensionEnum.MIRROR_MAKER.getDimension(), CONNECTOR_METRIC_CONNECTOR_FAILED_TASK_COUNT, HealthCheckNameEnum.MIRROR_MAKER_FAILED_TASK_COUNT, compareValue);
|
||||
}
|
||||
|
||||
private HealthCheckResult checkUnassignedTaskCount(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
|
||||
MirrorMakerParam param = (MirrorMakerParam) paramTuple.getV1();
|
||||
HealthCompareValueConfig compareConfig = (HealthCompareValueConfig) paramTuple.getV2();
|
||||
|
||||
Long connectClusterId = param.getConnectClusterId();
|
||||
String mirrorMakerName = param.getMirrorMakerName();
|
||||
String connectorType = param.getConnectorType();
|
||||
Double compareValue = compareConfig.getValue();
|
||||
|
||||
return healthCheckConnectorService.getHealthCompareResult(connectClusterId, mirrorMakerName, connectorType, HealthCheckDimensionEnum.MIRROR_MAKER.getDimension(), CONNECTOR_METRIC_CONNECTOR_UNASSIGNED_TASK_COUNT, HealthCheckNameEnum.MIRROR_MAKER_UNASSIGNED_TASK_COUNT, compareValue);
|
||||
}
|
||||
|
||||
private HealthCheckResult checkReplicationLatencyMsMax(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple) {
|
||||
MirrorMakerParam param = (MirrorMakerParam) paramTuple.getV1();
|
||||
HealthCompareValueConfig compareConfig = (HealthCompareValueConfig) paramTuple.getV2();
|
||||
|
||||
Long connectClusterId = param.getConnectClusterId();
|
||||
String mirrorMakerName = param.getMirrorMakerName();
|
||||
List<MirrorMakerTopic> mirrorMakerTopicList = param.getMirrorMakerTopicList();
|
||||
String metricName = MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX;
|
||||
|
||||
Result<MirrorMakerMetrics> ret = mirrorMakerMetricService.collectMirrorMakerMetricsFromKafka(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metricName);
|
||||
|
||||
if (!ret.hasData() || ret.getData().getMetric(metricName) == null) {
|
||||
log.error("method=checkReplicationLatencyMsMax||connectClusterId={}||metricName={}||errMsg=get metrics failed",
|
||||
param.getConnectClusterId(), metricName);
|
||||
return null;
|
||||
}
|
||||
|
||||
Float value = ret.getData().getMetric(metricName);
|
||||
|
||||
HealthCheckResult checkResult = new HealthCheckResult(
|
||||
HealthCheckDimensionEnum.MIRROR_MAKER.getDimension(),
|
||||
HealthCheckNameEnum.MIRROR_MAKER_REPLICATION_LATENCY_MS_MAX.getConfigName(),
|
||||
connectClusterId,
|
||||
mirrorMakerName
|
||||
);
|
||||
checkResult.setPassed(value <= compareConfig.getValue() ? Constant.YES : Constant.NO);
|
||||
return checkResult;
|
||||
}
|
||||
|
||||
private HealthCheckResult checkTotalRecordErrors(Tuple<ClusterParam, BaseClusterHealthConfig> paramTuple){
|
||||
MirrorMakerParam param = (MirrorMakerParam) paramTuple.getV1();
|
||||
HealthCompareValueConfig compareConfig = (HealthCompareValueConfig) paramTuple.getV2();
|
||||
|
||||
Long connectClusterId = param.getConnectClusterId();
|
||||
String mirrorMakerName = param.getMirrorMakerName();
|
||||
List<MirrorMakerTopic> mirrorMakerTopicList = param.getMirrorMakerTopicList();
|
||||
|
||||
ConnectCluster connectCluster = LoadedConnectClusterCache.getByPhyId(connectClusterId);
|
||||
Long endTime = System.currentTimeMillis();
|
||||
Long startTime = endTime - TEN_MIN;
|
||||
Tuple<Long, String> connectClusterIdAndName = new Tuple<>(connectClusterId, mirrorMakerName);
|
||||
String metricName = CONNECTOR_METRIC_TOTAL_RECORD_ERRORS;
|
||||
|
||||
Table<String, Tuple<Long, String>, List<MetricPointVO>> table = connectorMetricESDAO.listMetricsByConnectors(connectCluster.getKafkaClusterPhyId(), Arrays.asList(metricName), "avg", Arrays.asList(connectClusterIdAndName), startTime, endTime);
|
||||
List<MetricPointVO> pointVOList = table.get(metricName, connectClusterIdAndName);
|
||||
Collections.sort(pointVOList, (p1, p2) -> p2.getTimeStamp().compareTo(p1.getTimeStamp()));
|
||||
|
||||
HealthCheckResult checkResult = new HealthCheckResult(
|
||||
HealthCheckDimensionEnum.MIRROR_MAKER.getDimension(),
|
||||
HealthCheckNameEnum.MIRROR_MAKER_TOTAL_RECORD_ERRORS.getConfigName(),
|
||||
connectClusterId,
|
||||
mirrorMakerName
|
||||
);
|
||||
|
||||
double diff = 0;
|
||||
if (pointVOList.size() > 1) {
|
||||
diff = Double.valueOf(pointVOList.get(0).getValue()) - Double.valueOf(pointVOList.get(1).getValue());
|
||||
}
|
||||
checkResult.setPassed(diff <= compareConfig.getValue() ? Constant.YES : Constant.NO);
|
||||
|
||||
return checkResult;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -26,4 +26,6 @@ public interface HealthCheckResultService {
|
||||
void batchReplace(Long clusterPhyId, Integer dimension, List<HealthCheckResult> healthCheckResults);
|
||||
|
||||
List<HealthCheckResultPO> getConnectorHealthCheckResult(Long clusterPhyId);
|
||||
|
||||
List<HealthCheckResultPO> getMirrorMakerHealthCheckResult(Long clusterPhyId);
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum.CONNECTOR;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum.MIRROR_MAKER;
|
||||
|
||||
@Service
|
||||
public class HealthCheckResultServiceImpl implements HealthCheckResultService {
|
||||
@@ -146,7 +147,26 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService {
|
||||
LambdaQueryWrapper<HealthCheckResultPO> wrapper = new LambdaQueryWrapper<>();
|
||||
wrapper.eq(HealthCheckResultPO::getDimension, CONNECTOR.getDimension());
|
||||
wrapper.in(HealthCheckResultPO::getClusterPhyId, connectClusterIdList);
|
||||
resultPOList.addAll(healthCheckResultDAO.selectList(wrapper));
|
||||
resultPOList = healthCheckResultDAO.selectList(wrapper);
|
||||
return resultPOList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HealthCheckResultPO> getMirrorMakerHealthCheckResult(Long clusterPhyId) {
|
||||
List<HealthCheckResultPO> resultPOList = new ArrayList<>();
|
||||
|
||||
//查找connect集群
|
||||
LambdaQueryWrapper<ConnectClusterPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(ConnectClusterPO::getKafkaClusterPhyId, clusterPhyId);
|
||||
List<Long> connectClusterIdList = connectClusterDAO.selectList(lambdaQueryWrapper).stream().map(elem -> elem.getId()).collect(Collectors.toList());
|
||||
if (ValidateUtils.isEmptyList(connectClusterIdList)) {
|
||||
return resultPOList;
|
||||
}
|
||||
|
||||
LambdaQueryWrapper<HealthCheckResultPO> wrapper = new LambdaQueryWrapper<>();
|
||||
wrapper.eq(HealthCheckResultPO::getDimension, MIRROR_MAKER.getDimension());
|
||||
wrapper.in(HealthCheckResultPO::getClusterPhyId, connectClusterIdList);
|
||||
resultPOList = healthCheckResultDAO.selectList(wrapper);
|
||||
return resultPOList;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.health.state;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -17,6 +18,7 @@ public interface HealthStateService {
|
||||
GroupMetrics calGroupHealthMetrics(Long clusterPhyId, String groupName);
|
||||
ZookeeperMetrics calZookeeperHealthMetrics(Long clusterPhyId);
|
||||
ConnectorMetrics calConnectorHealthMetrics(Long connectClusterId, String connectorName);
|
||||
MirrorMakerMetrics calMirrorMakerHealthMetrics(Long connectClusterId, String mirrorMakerName);
|
||||
|
||||
/**
|
||||
* 获取集群健康检查结果
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAgg
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.health.HealthCheckResultPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum;
|
||||
@@ -29,6 +30,7 @@ import java.util.List;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectorMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.BrokerMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*;
|
||||
@@ -72,6 +74,7 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
metrics.putMetric(this.calClusterGroupsHealthMetrics(clusterPhyId).getMetrics());
|
||||
metrics.putMetric(this.calZookeeperHealthMetrics(clusterPhyId).getMetrics());
|
||||
metrics.putMetric(this.calClusterConnectsHealthMetrics(clusterPhyId).getMetrics());
|
||||
metrics.putMetric(this.calClusterMirrorMakersHealthMetrics(clusterPhyId).getMetrics());
|
||||
|
||||
// 统计最终结果
|
||||
Float passed = 0.0f;
|
||||
@@ -81,6 +84,7 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_GROUPS);
|
||||
passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER);
|
||||
passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_CONNECTOR);
|
||||
passed += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_PASSED_MIRROR_MAKER);
|
||||
|
||||
Float total = 0.0f;
|
||||
total += metrics.getMetric(ZOOKEEPER_METRIC_HEALTH_CHECK_TOTAL);
|
||||
@@ -89,6 +93,7 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_GROUPS);
|
||||
total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER);
|
||||
total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CONNECTOR);
|
||||
total += metrics.getMetric(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_MIRROR_MAKER);
|
||||
|
||||
// 状态
|
||||
Float state = 0.0f;
|
||||
@@ -98,6 +103,7 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_GROUPS));
|
||||
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_CLUSTER));
|
||||
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_CONNECTOR));
|
||||
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_MIRROR_MAKER));
|
||||
|
||||
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED, passed);
|
||||
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL, total);
|
||||
@@ -225,6 +231,31 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MirrorMakerMetrics calMirrorMakerHealthMetrics(Long connectClusterId, String mirrorMakerName) {
|
||||
ConnectCluster connectCluster = LoadedConnectClusterCache.getByPhyId(connectClusterId);
|
||||
MirrorMakerMetrics metrics = new MirrorMakerMetrics(connectClusterId, mirrorMakerName);
|
||||
|
||||
if (connectCluster == null) {
|
||||
metrics.putMetric(MIRROR_MAKER_METRIC_HEALTH_STATE, (float) HealthStateEnum.DEAD.getDimension());
|
||||
return metrics;
|
||||
}
|
||||
|
||||
List<HealthCheckAggResult> resultList = healthCheckResultService.getHealthCheckAggResult(connectClusterId, HealthCheckDimensionEnum.MIRROR_MAKER, mirrorMakerName);
|
||||
|
||||
if (ValidateUtils.isEmptyList(resultList)) {
|
||||
metrics.getMetrics().put(MIRROR_MAKER_METRIC_HEALTH_CHECK_PASSED, 0.0f);
|
||||
metrics.getMetrics().put(MIRROR_MAKER_METRIC_HEALTH_CHECK_TOTAL, 0.0f);
|
||||
} else {
|
||||
metrics.getMetrics().put(MIRROR_MAKER_METRIC_HEALTH_CHECK_PASSED, this.getHealthCheckPassed(resultList));
|
||||
metrics.getMetrics().put(MIRROR_MAKER_METRIC_HEALTH_CHECK_TOTAL, (float) resultList.size());
|
||||
}
|
||||
|
||||
metrics.putMetric(MIRROR_MAKER_METRIC_HEALTH_STATE, (float) this.calHealthState(resultList).getDimension());
|
||||
return metrics;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<HealthScoreResult> getAllDimensionHealthResult(Long clusterPhyId) {
|
||||
List<Integer> supportedDimensionCodeList = new ArrayList<>();
|
||||
@@ -249,6 +280,8 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
for (Integer dimensionCode : dimensionCodeList) {
|
||||
if (dimensionCode.equals(HealthCheckDimensionEnum.CONNECTOR.getDimension())) {
|
||||
poList.addAll(healthCheckResultService.getConnectorHealthCheckResult(clusterPhyId));
|
||||
} else if (dimensionCode.equals(HealthCheckDimensionEnum.MIRROR_MAKER.getDimension())) {
|
||||
poList.addAll(healthCheckResultService.getMirrorMakerHealthCheckResult(clusterPhyId));
|
||||
} else {
|
||||
poList.addAll(healthCheckResultService.listCheckResult(clusterPhyId, dimensionCode));
|
||||
}
|
||||
@@ -366,6 +399,24 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
private ClusterMetrics calClusterMirrorMakersHealthMetrics(Long clusterPhyId){
|
||||
List<HealthCheckResultPO> mirrorMakerHealthCheckResult = healthCheckResultService.getMirrorMakerHealthCheckResult(clusterPhyId);
|
||||
List<HealthCheckAggResult> resultList = this.getDimensionHealthCheckAggResult(mirrorMakerHealthCheckResult, Arrays.asList(MIRROR_MAKER.getDimension()));
|
||||
|
||||
ClusterMetrics metrics = new ClusterMetrics(clusterPhyId);
|
||||
|
||||
if (ValidateUtils.isEmptyList(resultList)) {
|
||||
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_MIRROR_MAKER, 0.0f);
|
||||
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_MIRROR_MAKER, 0.0f);
|
||||
} else {
|
||||
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED_MIRROR_MAKER, this.getHealthCheckPassed(resultList));
|
||||
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL_MIRROR_MAKER, (float) resultList.size());
|
||||
}
|
||||
|
||||
metrics.putMetric(CLUSTER_METRIC_HEALTH_STATE_MIRROR_MAKER, (float) this.calHealthState(resultList).getDimension());
|
||||
return metrics;
|
||||
}
|
||||
|
||||
|
||||
/**************************************************** 聚合数据 ****************************************************/
|
||||
|
||||
|
||||
@@ -60,6 +60,14 @@ public class KafkaHealthController {
|
||||
@ResponseBody
|
||||
public Result<List<HealthScoreResultDetailVO>> getClusterHealthCheckResultDetail(@PathVariable Long clusterPhyId,
|
||||
@RequestBody List<Integer> dimensionCodeList) {
|
||||
if (dimensionCodeList.isEmpty()) {
|
||||
return Result.buildSuc(
|
||||
HealthScoreVOConverter.convert2HealthScoreResultDetailVOList(
|
||||
healthStateService.getAllDimensionHealthResult(clusterPhyId)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return Result.buildSuc(HealthScoreVOConverter.convert2HealthScoreResultDetailVOList(
|
||||
healthStateService.getDimensionHealthResult(clusterPhyId, dimensionCodeList)
|
||||
));
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.xiaojukeji.know.streaming.km.task.connect.mm2.health;
|
||||
|
||||
import com.didiglobal.logi.job.annotation.Task;
|
||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checker.connect.mm2.HealthCheckMirrorMakerService;
|
||||
import com.xiaojukeji.know.streaming.km.task.connect.health.AbstractHealthCheckTask;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* @author wyb
|
||||
* @date 2022/12/21
|
||||
*/
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Task(name = "MirrorMakerHealthCheckTask",
|
||||
description = "MirrorMaker健康检查",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class MirrorMakerHealthCheckTask extends AbstractHealthCheckTask {
|
||||
|
||||
@Autowired
|
||||
private HealthCheckMirrorMakerService healthCheckMirrorMakerService;
|
||||
|
||||
@Override
|
||||
public AbstractHealthCheckService getCheckService() {
|
||||
return healthCheckMirrorMakerService;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user