From 6ba3dceb84de27d4f52bab4ffe141decd568f47b Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 9 Feb 2023 14:47:08 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]MM2=E7=AE=A1=E7=90=86-=E9=87=87?= =?UTF-8?q?=E9=9B=86MM2=E6=8C=87=E6=A0=87(#894)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sink/mm2/MirrorMakerMetricESSender.java | 33 ++ .../entity/connect/mm2/MirrorMakerTopic.java | 33 ++ .../mm2/MirrorMakerTopicPartitionMetrics.java | 38 ++ .../connect/mm2/MirrorMakerMetricParam.java | 26 ++ .../metric/mm2/MirrorMakerMetricEvent.java | 21 ++ .../streaming/km/common/jmx/JmxAttribute.java | 26 ++ .../know/streaming/km/common/jmx/JmxName.java | 6 + .../impl/ConnectorMetricServiceImpl.java | 2 +- .../connect/mm2/MirrorMakerMetricService.java | 28 ++ .../impl/MirrorMakerMetricServiceImpl.java | 324 ++++++++++++++++++ .../MirrorMakerMetricVersionItems.java | 100 +++++- .../mm2/metrics/MirrorMakerCollectorTask.java | 31 ++ 12 files changed, 666 insertions(+), 2 deletions(-) create mode 100644 km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/mm2/MirrorMakerMetricESSender.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/mm2/MirrorMakerTopic.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/mm2/MirrorMakerTopicPartitionMetrics.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/connect/mm2/MirrorMakerMetricParam.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/mm2/MirrorMakerMetricEvent.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/MirrorMakerMetricService.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerMetricServiceImpl.java create mode 100644 km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/mm2/metrics/MirrorMakerCollectorTask.java diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/mm2/MirrorMakerMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/mm2/MirrorMakerMetricESSender.java new file mode 100644 index 00000000..3089a995 --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/mm2/MirrorMakerMetricESSender.java @@ -0,0 +1,33 @@ +package com.xiaojukeji.know.streaming.km.collector.sink.mm2; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.collector.sink.AbstractMetricESSender; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.mm2.MirrorMakerMetricEvent; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.mm2.MirrorMakerMetricPO; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_MM2_INDEX; + +/** + * @author zengqiao + * @date 2022/12/20 + */ +@Component +public class MirrorMakerMetricESSender extends AbstractMetricESSender implements ApplicationListener { + protected static final ILog LOGGER = LogFactory.getLog(MirrorMakerMetricESSender.class); + + @PostConstruct + public void init(){ + LOGGER.info("method=init||msg=init finished"); + } + + @Override + public void onApplicationEvent(MirrorMakerMetricEvent event) { + send2es(CONNECT_MM2_INDEX, ConvertUtil.list2List(event.getMetricsList(), MirrorMakerMetricPO.class)); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/mm2/MirrorMakerTopic.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/mm2/MirrorMakerTopic.java new file mode 100644 index 00000000..aea8a33c --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/mm2/MirrorMakerTopic.java @@ -0,0 +1,33 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** + * @author wyb + * @date 2022/12/14 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class MirrorMakerTopic { + + /** + * mm2集群别名 + */ + private String clusterAlias; + + /** + * topic名称 + */ + private String topicName; + + /** + * partition在connect上的分布 Map + */ + private Map partitionMap; + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/mm2/MirrorMakerTopicPartitionMetrics.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/mm2/MirrorMakerTopicPartitionMetrics.java new file mode 100644 index 00000000..ef17adc9 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/mm2/MirrorMakerTopicPartitionMetrics.java @@ -0,0 +1,38 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author wyb + * @date 2022/12/16 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class MirrorMakerTopicPartitionMetrics extends BaseMetrics { + private Long connectClusterId; + + private String mirrorMakerName; + + private String clusterAlias; + + private String topicName; + + private Integer partitionId; + + private String workerId; + + @Override + public String unique() { + return "KCOR@" + connectClusterId + "@" + mirrorMakerName + "@" + clusterAlias + "@" + workerId + "@" + topicName + "@" + partitionId; + } + + public static MirrorMakerTopicPartitionMetrics initWithMetric(Long connectClusterId, String mirrorMakerName, String clusterAlias, String topicName, Integer partitionId, String workerId, String metricName, Float value) { + MirrorMakerTopicPartitionMetrics metrics = new MirrorMakerTopicPartitionMetrics(connectClusterId, mirrorMakerName, clusterAlias, topicName, partitionId, workerId); + metrics.putMetric(metricName, value); + return metrics; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/connect/mm2/MirrorMakerMetricParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/connect/mm2/MirrorMakerMetricParam.java new file mode 100644 index 00000000..c67f3128 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/connect/mm2/MirrorMakerMetricParam.java @@ -0,0 +1,26 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.connect.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.MetricParam; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author wyb + * @date 2022/12/15 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class MirrorMakerMetricParam extends MetricParam { + private Long connectClusterId; + + private String mirrorMakerName; + + private List mirrorMakerTopicList; + + private String metric; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/mm2/MirrorMakerMetricEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/mm2/MirrorMakerMetricEvent.java new file mode 100644 index 00000000..42e98ce7 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/mm2/MirrorMakerMetricEvent.java @@ -0,0 +1,21 @@ +package com.xiaojukeji.know.streaming.km.common.bean.event.metric.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BaseMetricEvent; +import lombok.Getter; + +import java.util.List; + +/** + * @author zengqiao + * @date 2022/12/20 + */ +@Getter +public class MirrorMakerMetricEvent extends BaseMetricEvent { + private final List metricsList; + + public MirrorMakerMetricEvent(Object source, List metricsList) { + super(source); + this.metricsList = metricsList; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java index 2a89a08c..3043b90f 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java @@ -144,6 +144,32 @@ public class JmxAttribute { public static final String TOTAL_RETRIES = "total-retries"; + /*********************************************************** mm2 ***********************************************************/ + + public static final String BYTE_COUNT = "byte-count"; + + public static final String BYTE_RATE = "byte-rate"; + + public static final String RECORD_AGE_MS = "record-age-ms"; + + public static final String RECORD_AGE_MS_AVG = "record-age-ms-avg"; + + public static final String RECORD_AGE_MS_MAX = "record-age-ms-max"; + + public static final String RECORD_AGE_MS_MIN = "record-age-ms-min"; + + public static final String RECORD_COUNT = "record-count"; + + public static final String RECORD_RATE = "record-rate"; + + public static final String REPLICATION_LATENCY_MS = "replication-latency-ms"; + + public static final String REPLICATION_LATENCY_MS_AVG = "replication-latency-ms-avg"; + + public static final String REPLICATION_LATENCY_MS_MAX = "replication-latency-ms-max"; + + public static final String REPLICATION_LATENCY_MS_MIN = "replication-latency-ms-min"; + private JmxAttribute() { } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java index 5e11e271..39217986 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java @@ -41,6 +41,8 @@ public class JmxName { public static final String JMX_SERVER_APP_INFO ="kafka.server:type=app-info"; + public static final String JMX_SERVER_TOPIC_MIRROR ="kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=*,topic=%s,partition=*"; + /*********************************************************** controller ***********************************************************/ public static final String JMX_CONTROLLER_ACTIVE_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount"; @@ -82,6 +84,10 @@ public class JmxName { public static final String JMX_CONNECTOR_TASK_ERROR_METRICS = "kafka.connect:type=task-error-metrics,connector=%s,task=%s"; + /*********************************************************** mm2 ***********************************************************/ + + public static final String JMX_MIRROR_MAKER_SOURCE = "kafka.connect.mirror:type=MirrorSourceConnector,target=%s,topic=%s,partition=%s"; + private JmxName() { } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorMetricServiceImpl.java index 8c9fec4f..8792875d 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorMetricServiceImpl.java @@ -34,7 +34,7 @@ import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerServic import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService; import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient; -import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.ConnectorMetricESDAO; +import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.connector.ConnectorMetricESDAO; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/MirrorMakerMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/MirrorMakerMetricService.java new file mode 100644 index 00000000..f4c833d8 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/MirrorMakerMetricService.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.core.service.connect.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.mm2.MetricsMirrorMakersDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; + +import java.util.List; + +/** + * @author wyb + * @date 2022/12/15 + */ +public interface MirrorMakerMetricService { + + Result collectMirrorMakerMetricsFromKafka(Long connectClusterPhyId, String mirrorMakerName, List mirrorMakerTopicList, String metricName); + + /** + * 从ES中获取一段时间内聚合计算之后的指标线 + */ + Result> listMirrorMakerClusterMetricsFromES(Long clusterPhyId, MetricsMirrorMakersDTO dto); + + Result> getLatestMetricsFromES(Long clusterPhyId, List> mirrorMakerList, List metricNameList); + + Result getLatestMetricsFromES(Long connectClusterId, String connectorName, List metricsNames); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerMetricServiceImpl.java new file mode 100644 index 00000000..83242841 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerMetricServiceImpl.java @@ -0,0 +1,324 @@ +package com.xiaojukeji.know.streaming.km.core.service.connect.mm2.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.google.common.collect.Table; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.mm2.MetricsMirrorMakersDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerTopicPartitionMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.connect.mm2.MirrorMakerMetricParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect.ConnectorMetricPO; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.mm2.MirrorMakerMetricPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionConnectJmxInfo; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; +import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService; +import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService; +import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient; +import org.springframework.beans.factory.annotation.Autowired; +import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.mm2.MirrorMakerMetricESDAO; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_CONNECT_MIRROR_MAKER; + +/** + * @author wyb + * @date 2022/12/15 + */ +@Service +public class MirrorMakerMetricServiceImpl extends BaseConnectorMetricService implements MirrorMakerMetricService { + protected static final ILog LOGGER = LogFactory.getLog(MirrorMakerMetricServiceImpl.class); + + public static final String MIRROR_MAKER_METHOD_DO_NOTHING = "doNothing"; + + public static final String MIRROR_MAKER_METHOD_GET_HEALTH_SCORE = "getMetricHealthScore"; + + public static final String MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM = "getTopicPartitionMetricListSum"; + + public static final String MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG = "getTopicPartitionMetricListAvg"; + + public static final String MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MIN = "getTopicPartitionMetricListMin"; + + public static final String MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MAX = "getTopicPartitionMetricListMax"; + + @Autowired + private ConnectJMXClient connectJMXClient; + + @Autowired + private MirrorMakerMetricESDAO mirrorMakerMetricESDAO; + + @Autowired + private ConnectorService connectorService; + + @Autowired + private HealthStateService healthStateService; + + @Override + protected List listMetricPOFields() { + return BeanUtil.listBeanFields(MirrorMakerMetricPO.class); + } + + @Override + protected void initRegisterVCHandler() { + registerVCHandler(MIRROR_MAKER_METHOD_DO_NOTHING, this::doNothing); + registerVCHandler(MIRROR_MAKER_METHOD_GET_HEALTH_SCORE, this::getMetricHealthScore); + registerVCHandler(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM, this::getTopicPartitionMetricListSum); + registerVCHandler(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG, this::getTopicPartitionMetricListAvg); + registerVCHandler(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MAX, this::getTopicPartitionMetricListMax); + registerVCHandler(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MIN, this::getTopicPartitionMetricListMin); + } + + @Override + protected VersionItemTypeEnum getVersionItemType() { + return METRIC_CONNECT_MIRROR_MAKER; + } + + @Override + public Result collectMirrorMakerMetricsFromKafka(Long connectClusterPhyId, String mirrorMakerName, List mirrorMakerTopicList, String metricName) { + try { + MirrorMakerMetricParam metricParam = new MirrorMakerMetricParam(connectClusterPhyId, mirrorMakerName, mirrorMakerTopicList, metricName); + return (Result) doVCHandler(connectClusterPhyId, metricName, metricParam); + } catch (Exception e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + @Override + public Result> listMirrorMakerClusterMetricsFromES(Long clusterPhyId, MetricsMirrorMakersDTO dto) { + Long startTime = dto.getStartTime(); + Long endTime = dto.getEndTime(); + Integer topN = dto.getTopNu(); + String aggType = dto.getAggType(); + List metricNameList = dto.getMetricsNames(); + + List> connectorList = new ArrayList<>(); + if(!CollectionUtils.isEmpty(dto.getConnectorNameList())){ + connectorList = dto.getConnectorNameList().stream() + .map(c -> new Tuple<>(c.getConnectClusterId(), c.getConnectorName())) + .collect(Collectors.toList()); + } + + Table, List> retTable; + if(ValidateUtils.isEmptyList(connectorList)) { + // 按照TopN的方式去获取 + List> defaultConnectorList = this.listTopNMirrorMakerList(clusterPhyId, topN); + + retTable = mirrorMakerMetricESDAO.listMetricsByTopN(clusterPhyId, defaultConnectorList, metricNameList, aggType, topN, startTime, endTime); + } else { + // 制定集群ID去获取 + retTable = mirrorMakerMetricESDAO.listMetricsByConnectors(clusterPhyId, metricNameList, aggType, connectorList, startTime, endTime); + } + + return Result.buildSuc(this.metricMap2VO(clusterPhyId, retTable.rowMap())); + } + + @Override + public Result> getLatestMetricsFromES(Long clusterPhyId, List> mirrorMakerList, List metricNameList) { + List connectorLatestMetricList = mirrorMakerMetricESDAO.getConnectorLatestMetric(clusterPhyId, mirrorMakerList, metricNameList); + return Result.buildSuc(ConvertUtil.list2List(connectorLatestMetricList, MirrorMakerMetrics.class)); + } + + @Override + public Result getLatestMetricsFromES(Long connectClusterId, String connectorName, List metricsNames) { + ConnectorMetricPO connectorLatestMetric = mirrorMakerMetricESDAO.getConnectorLatestMetric(null, connectClusterId, connectorName, metricsNames); + MirrorMakerMetrics mirrorMakerMetrics = ConvertUtil.obj2Obj(connectorLatestMetric, MirrorMakerMetrics.class); + return Result.buildSuc(mirrorMakerMetrics); + } + + private List> listTopNMirrorMakerList(Long clusterPhyId, Integer topN) { + List poList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId); + + if (CollectionUtils.isEmpty(poList)) { + return new ArrayList<>(); + } + + return poList.subList(0, Math.min(topN, poList.size())) + .stream() + .map( c -> new Tuple<>(c.getId(), c.getConnectorName()) ) + .collect(Collectors.toList()); + } + + protected List metricMap2VO(Long connectClusterId, + Map, List>> map){ + List multiLinesVOS = new ArrayList<>(); + if (map == null || map.isEmpty()) { + // 如果为空,则直接返回 + return multiLinesVOS; + } + + for(String metric : map.keySet()){ + try { + MetricMultiLinesVO multiLinesVO = new MetricMultiLinesVO(); + multiLinesVO.setMetricName(metric); + + List metricLines = new ArrayList<>(); + + Map, List> metricPointMap = map.get(metric); + if(null == metricPointMap || metricPointMap.isEmpty()){continue;} + + for(Map.Entry, List> entry : metricPointMap.entrySet()){ + MetricLineVO metricLineVO = new MetricLineVO(); + metricLineVO.setName(entry.getKey().getV1() + "#" + entry.getKey().getV2()); + metricLineVO.setMetricName(metric); + metricLineVO.setMetricPoints(entry.getValue()); + + metricLines.add(metricLineVO); + } + + multiLinesVO.setMetricLines(metricLines); + multiLinesVOS.add(multiLinesVO); + }catch (Exception e){ + LOGGER.error("method=metricMap2VO||connectClusterId={}||msg=exception!", connectClusterId, e); + } + } + + return multiLinesVOS; + } + + private Result doNothing(VersionItemParam metricParam) { + MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam; + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + return Result.buildSuc(new MirrorMakerMetrics(connectClusterId,mirrorMakerName)); + } + + private Result getMetricHealthScore(VersionItemParam metricParam) { + MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam; + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + + MirrorMakerMetrics metrics = healthStateService.calMirrorMakerHealthMetrics(connectClusterId, mirrorMakerName); + return Result.buildSuc(metrics); + } + + private Result getTopicPartitionMetricListSum(VersionItemParam metricParam) { + MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam; + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + List mirrorMakerTopicList = param.getMirrorMakerTopicList(); + String metric = param.getMetric(); + + Result> ret = this.getTopicPartitionMetricList(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metric); + if (!ret.hasData() || ret.getData().isEmpty()) { + return Result.buildFailure(NOT_EXIST); + } + Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get(); + return Result.buildSuc(MirrorMakerMetrics.initWithMetric(connectClusterId, mirrorMakerName, metric, sum)); + } + + private Result getTopicPartitionMetricListAvg(VersionItemParam metricParam) { + MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam; + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + List mirrorMakerTopicList = param.getMirrorMakerTopicList(); + String metric = param.getMetric(); + + Result> ret = this.getTopicPartitionMetricList(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metric); + + if (!ret.hasData() || ret.getData().isEmpty()) { + return Result.buildFailure(NOT_EXIST); + } + + Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get(); + return Result.buildSuc(MirrorMakerMetrics.initWithMetric(connectClusterId, mirrorMakerName, metric, sum / ret.getData().size())); + } + + private Result getTopicPartitionMetricListMax(VersionItemParam metricParam) { + MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam; + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + List mirrorMakerTopicList = param.getMirrorMakerTopicList(); + String metric = param.getMetric(); + + Result> ret = this.getTopicPartitionMetricList(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metric); + + if (!ret.hasData() || ret.getData().isEmpty()) { + return Result.buildFailure(NOT_EXIST); + } + + Float max = ret.getData().stream().max((a, b) -> a.getMetric(metric).compareTo(b.getMetric(metric))).get().getMetric(metric); + return Result.buildSuc(MirrorMakerMetrics.initWithMetric(connectClusterId, mirrorMakerName, metric, max)); + } + + private Result getTopicPartitionMetricListMin(VersionItemParam metricParam) { + MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam; + Long connectClusterId = param.getConnectClusterId(); + String mirrorMakerName = param.getMirrorMakerName(); + List mirrorMakerTopicList = param.getMirrorMakerTopicList(); + String metric = param.getMetric(); + + Result> ret = this.getTopicPartitionMetricList(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metric); + + if (!ret.hasData() || ret.getData().isEmpty()) { + return Result.buildFailure(NOT_EXIST); + } + + Float min = ret.getData().stream().max((a, b) -> b.getMetric(metric).compareTo(a.getMetric(metric))).get().getMetric(metric); + return Result.buildSuc(MirrorMakerMetrics.initWithMetric(connectClusterId, mirrorMakerName, metric, min)); + } + + + private Result> getTopicPartitionMetricList(Long connectClusterId, String mirrorMakerName, List mirrorMakerTopicList, String metric) { + List topicPartitionMetricsList = new ArrayList<>(); + for (MirrorMakerTopic mirrorMakerTopic : mirrorMakerTopicList) { + for (Map.Entry entry : mirrorMakerTopic.getPartitionMap().entrySet()) { + Result ret = this.getMirrorMakerTopicPartitionMetric(connectClusterId, mirrorMakerName, mirrorMakerTopic.getClusterAlias(), mirrorMakerTopic.getTopicName(), entry.getKey(), entry.getValue(), metric); + if (!ret.hasData() || ret.getData().getMetric(metric) == null) { + continue; + } + topicPartitionMetricsList.add(ret.getData()); + } + } + return Result.buildSuc(topicPartitionMetricsList); + } + + private Result getMirrorMakerTopicPartitionMetric(Long connectClusterId, String mirrorMakerName, String clusterAlias, String topicName, Integer partitionId, String workerId, String metric) { + VersionConnectJmxInfo jmxInfo = getJMXInfo(connectClusterId, metric); + if (null == jmxInfo) { + return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST); + } + + String jmxObjectName = String.format(jmxInfo.getJmxObjectName(), clusterAlias, topicName, partitionId); + + JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, workerId); + if (ValidateUtils.isNull(jmxConnectorWrap)) { + return Result.buildFailure(VC_JMX_INIT_ERROR); + } + try { + //2、获取jmx指标 + String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxObjectName), jmxInfo.getJmxAttribute()).toString(); + MirrorMakerTopicPartitionMetrics metrics = MirrorMakerTopicPartitionMetrics.initWithMetric(connectClusterId, mirrorMakerName, clusterAlias, topicName, partitionId, workerId, metric, Float.valueOf(value)); + return Result.buildSuc(metrics); + } catch (InstanceNotFoundException e) { + // 忽略该错误,该错误出现的原因是该指标在JMX中不存在 + return Result.buildSuc(new MirrorMakerTopicPartitionMetrics(connectClusterId, mirrorMakerName, clusterAlias, topicName, partitionId, workerId)); + } catch (Exception e) { + LOGGER.error("method=getMirrorMakerTopicPartitionMetric||connectClusterId={}||mirrorMakerName={}||clusterAlias={}||topicName={}||partitionId={}||workerId={}||metrics={}||jmx={}||msg={}", + connectClusterId, mirrorMakerName, clusterAlias, topicName, partitionId, workerId, metric, jmxObjectName, e.getClass().getName()); + return Result.buildFailure(VC_JMX_CONNECT_ERROR); + } + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/connect/MirrorMakerMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/connect/MirrorMakerMetricVersionItems.java index b5256e31..b8095e86 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/connect/MirrorMakerMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/connect/MirrorMakerMetricVersionItems.java @@ -1,26 +1,124 @@ package com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect; import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.BaseMetricVersionMetric; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import static com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem.*; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_CONNECT_MIRROR_MAKER; +import static com.xiaojukeji.know.streaming.km.common.jmx.JmxAttribute.*; +import static com.xiaojukeji.know.streaming.km.common.jmx.JmxName.JMX_MIRROR_MAKER_SOURCE; +import static com.xiaojukeji.know.streaming.km.core.service.connect.mm2.impl.MirrorMakerMetricServiceImpl.*; @Component public class MirrorMakerMetricVersionItems extends BaseMetricVersionMetric { + public static final String MIRROR_MAKER_METRIC_COLLECT_COST_TIME = Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME; + + public static final String MIRROR_MAKER_METRIC_HEALTH_STATE = "HealthState"; + + public static final String MIRROR_MAKER_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed"; + + public static final String MIRROR_MAKER_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal"; + + public static final String MIRROR_MAKER_METRIC_BYTE_COUNT = "ByteCount"; + + public static final String MIRROR_MAKER_METRIC_BYTE_RATE = "ByteRate"; + + public static final String MIRROR_MAKER_METRIC_RECORD_AGE_MS = "RecordAgeMs"; + + public static final String MIRROR_MAKER_METRIC_RECORD_AGE_MS_AVG = "RecordAgeMsAvg"; + + public static final String MIRROR_MAKER_METRIC_RECORD_AGE_MS_MAX = "RecordAgeMsMax"; + + public static final String MIRROR_MAKER_METRIC_RECORD_AGE_MS_MIN = "RecordAgeMsMin"; + + public static final String MIRROR_MAKER_METRIC_RECORD_COUNT = "RecordCount"; + + public static final String MIRROR_MAKER_METRIC_RECORD_RATE = "RecordRate"; + + public static final String MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS = "ReplicationLatencyMs"; + + public static final String MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_AVG = "ReplicationLatencyMsAvg"; + + public static final String MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX = "ReplicationLatencyMsMax"; + + public static final String MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MIN = "ReplicationLatencyMsMin"; + @Override public int versionItemType() { return METRIC_CONNECT_MIRROR_MAKER.getCode(); } @Override - public List init(){ + public List init() { List items = new ArrayList<>(); + // HealthScore 指标 + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH) + .extendMethod(MIRROR_MAKER_METHOD_GET_HEALTH_SCORE)); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_HEALTH_CHECK_PASSED).unit("个").desc("健康项检查通过数").category(CATEGORY_HEALTH) + .extendMethod(MIRROR_MAKER_METHOD_GET_HEALTH_SCORE)); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_HEALTH_CHECK_TOTAL).unit("个").desc("健康项检查总数").category(CATEGORY_HEALTH) + .extendMethod(MIRROR_MAKER_METHOD_GET_HEALTH_SCORE)); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_COLLECT_COST_TIME).unit("秒").desc("采集mirrorMaker指标的耗时").category(CATEGORY_PERFORMANCE) + .extendMethod(MIRROR_MAKER_METHOD_DO_NOTHING)); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_BYTE_COUNT).unit("byte").desc("消息复制流量大小").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(BYTE_COUNT))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_BYTE_RATE).unit(BYTE_PER_SEC).desc("复制流量速率").category(CATEGORY_FLOW) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(BYTE_RATE))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_RECORD_AGE_MS).unit("ms").desc("消息获取时年龄").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_AGE_MS))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_RECORD_AGE_MS_AVG).unit("ms").desc("消息获取时平均年龄").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_AGE_MS_AVG))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_RECORD_AGE_MS_MAX).unit("ms").desc("消息获取时最大年龄").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MAX) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_AGE_MS_MAX))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_RECORD_AGE_MS_MIN).unit("ms").desc("消息获取时最小年龄").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MIN) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_AGE_MS_MIN))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_RECORD_COUNT).unit("条").desc("消息复制条数").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_COUNT))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_RECORD_RATE).unit("条/s").desc("消息复制速率").category(CATEGORY_FLOW) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_RATE))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS).unit("ms").desc("消息复制延迟时间").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(REPLICATION_LATENCY_MS))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_AVG).unit("ms").desc("消息复制平均延迟时间").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(REPLICATION_LATENCY_MS_AVG))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX).unit("ms").desc("消息复制最大延迟时间").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MAX) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(REPLICATION_LATENCY_MS_MAX))); + items.add(buildAllVersionsItem() + .name(MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MIN).unit("ms").desc("消息复制最小延迟时间").category(CATEGORY_PERFORMANCE) + .extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MIN) + .jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(REPLICATION_LATENCY_MS_MIN))); return items; } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/mm2/metrics/MirrorMakerCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/mm2/metrics/MirrorMakerCollectorTask.java new file mode 100644 index 00000000..266014dc --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/mm2/metrics/MirrorMakerCollectorTask.java @@ -0,0 +1,31 @@ +package com.xiaojukeji.know.streaming.km.task.connect.mm2.metrics; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.xiaojukeji.know.streaming.km.collector.metric.connect.mm2.MirrorMakerMetricCollector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.task.connect.metrics.AbstractAsyncMetricsDispatchTask; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author wyb + * @date 2022/12/21 + */ +@Task(name = "MirrorMakerCollectorTask", + description = "MirrorMaker指标采集任务", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class MirrorMakerCollectorTask extends AbstractAsyncMetricsDispatchTask { + + @Autowired + private MirrorMakerMetricCollector mirrorMakerMetricCollector; + + @Override + public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception { + mirrorMakerMetricCollector.collectConnectMetrics(connectCluster); + return TaskResult.SUCCESS; + } +}