From 9b7c41e804c2fd0e9e1e127259d0042859ed6c25 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 9 Feb 2023 14:35:47 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]MM2=E7=AE=A1=E7=90=86-=E8=AF=BB?= =?UTF-8?q?=E5=86=99ES=E4=B8=AD=E7=9A=84MM2=E6=8C=87=E6=A0=87(#894)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../metrics/mm2/MirrorMakerMetrics.java | 46 +++++++++ .../po/metrice/mm2/MirrorMakerMetricPO.java | 39 ++++++++ ...DAO.java => BaseConnectorMetricESDAO.java} | 21 ++-- .../connector/ConnectorMetricESDAO.java | 19 ++++ .../connect/mm2/MirrorMakerMetricESDAO.java | 18 ++++ .../km/persistence/es/dsls/DslConstant.java | 2 - .../es/template/TemplateConstant.java | 1 + .../ks_kafka_connect_mirror_maker_metric | 98 +++++++++++++++++++ 8 files changed, 227 insertions(+), 17 deletions(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/mm2/MirrorMakerMetrics.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/mm2/MirrorMakerMetricPO.java rename km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/{ConnectorMetricESDAO.java => BaseConnectorMetricESDAO.java} (95%) create mode 100644 km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/connector/ConnectorMetricESDAO.java create mode 100644 km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/mm2/MirrorMakerMetricESDAO.java create mode 100644 km-persistence/src/main/resources/es/template/ks_kafka_connect_mirror_maker_metric diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/mm2/MirrorMakerMetrics.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/mm2/MirrorMakerMetrics.java new file mode 100644 index 00000000..b44324ea --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/mm2/MirrorMakerMetrics.java @@ -0,0 +1,46 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + * @author zengqiao + * @date 20/6/17 + */ +@Data +@NoArgsConstructor +@ToString +public class MirrorMakerMetrics extends BaseMetrics { + private Long connectClusterId; + + private String connectorName; + + private String connectorNameAndClusterId; + + public MirrorMakerMetrics(Long connectClusterId, String connectorName) { + super(null); + this.connectClusterId = connectClusterId; + this.connectorName = connectorName; + this.connectorNameAndClusterId = connectorName + "#" + connectClusterId; + } + + public MirrorMakerMetrics(Long clusterPhyId, Long connectClusterId, String connectorName) { + super(clusterPhyId); + this.connectClusterId = connectClusterId; + this.connectorName = connectorName; + this.connectorNameAndClusterId = connectorName + "#" + connectClusterId; + } + + public static MirrorMakerMetrics initWithMetric(Long connectClusterId, String connectorName, String metricName, Float value) { + MirrorMakerMetrics metrics = new MirrorMakerMetrics(connectClusterId, connectorName); + metrics.putMetric(metricName, value); + return metrics; + } + + @Override + public String unique() { + return "KCOR@" + connectClusterId + "@" + connectorName; + } +} \ No newline at end of file diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/mm2/MirrorMakerMetricPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/mm2/MirrorMakerMetricPO.java new file mode 100644 index 00000000..5a823024 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/mm2/MirrorMakerMetricPO.java @@ -0,0 +1,39 @@ +package com.xiaojukeji.know.streaming.km.common.bean.po.metrice.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import static com.xiaojukeji.know.streaming.km.common.utils.CommonUtils.monitorTimestamp2min; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class MirrorMakerMetricPO extends BaseMetricESPO { + private Long connectClusterId; + + private String connectorName; + + /** + * 用于es内部排序 + */ + private String connectorNameAndClusterId; + + public MirrorMakerMetricPO(Long kafkaClusterPhyId, Long connectClusterId, String connectorName){ + super(kafkaClusterPhyId); + this.connectClusterId = connectClusterId; + this.connectorName = connectorName; + this.connectorNameAndClusterId = connectorName + "#" + connectClusterId; + } + + @Override + public String getKey() { + return "KCOR@" + clusterPhyId + "@" + connectClusterId + "@" + connectorName + "@" + monitorTimestamp2min(timestamp); + } + + @Override + public String getRoutingValue() { + return String.valueOf(connectClusterId); + } +} diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectorMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/BaseConnectorMetricESDAO.java similarity index 95% rename from km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectorMetricESDAO.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/BaseConnectorMetricESDAO.java index 1c47e862..cd80b62c 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/ConnectorMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/BaseConnectorMetricESDAO.java @@ -13,27 +13,16 @@ import com.xiaojukeji.know.streaming.km.common.utils.Triple; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant; -import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; -import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_CONNECTOR_INDEX; -@Component -public class ConnectorMetricESDAO extends BaseMetricESDAO { - - @PostConstruct - public void init() { - super.indexName = CONNECT_CONNECTOR_INDEX; - checkCurrentDayIndexExist(); - register( this); - } +public class BaseConnectorMetricESDAO extends BaseMetricESDAO { /** - * 获取每个 metric 的 topN 个 connector 的指标,如果获取不到 topN 的 connectors, 则默认返回 defaultTopics 的指标 + * 获取每个 metric 的 topN 个 connector 的指标,如果获取不到 topN 的 connectors, 则默认返回 defaultConnectorList 的指标 */ public Table, List> listMetricsByTopN(Long clusterPhyId, List> defaultConnectorList, @@ -143,7 +132,7 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { for(Tuple connector : connectorList) { try { esTPService.submitSearchTask( - String.format("class=ConnectorMetricESDAO||method=listMetricsByConnectors||ClusterPhyId=%d||connectorName=%s", clusterPhyId, connector.getV2()), + String.format("class=BaseConnectorMetricESDAO||method=listMetricsByConnectors||ClusterPhyId=%d||connectorName=%s", clusterPhyId, connector.getV2()), 3000, () -> { String dsl = dslLoaderUtil.getFormatDslByFileName( @@ -318,7 +307,9 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO { private Tuple splitConnectorNameAndClusterId(String connectorNameAndClusterId){ String[] ss = connectorNameAndClusterId.split("#"); - if(null == ss || ss.length != 2){return null;} + if(null == ss || ss.length != 2) { + return null; + } return new Tuple<>(ss[0], Long.valueOf(ss[1])); } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/connector/ConnectorMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/connector/ConnectorMetricESDAO.java new file mode 100644 index 00000000..167d5bac --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/connector/ConnectorMetricESDAO.java @@ -0,0 +1,19 @@ +package com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.connector; + +import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.BaseConnectorMetricESDAO; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_CONNECTOR_INDEX; + +@Component +public class ConnectorMetricESDAO extends BaseConnectorMetricESDAO { + + @PostConstruct + public void init() { + super.indexName = CONNECT_CONNECTOR_INDEX; + checkCurrentDayIndexExist(); + register( this); + } +} diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/mm2/MirrorMakerMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/mm2/MirrorMakerMetricESDAO.java new file mode 100644 index 00000000..c6879d38 --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/connect/mm2/MirrorMakerMetricESDAO.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.mm2; + +import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.BaseConnectorMetricESDAO; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_MM2_INDEX; + +@Component +public class MirrorMakerMetricESDAO extends BaseConnectorMetricESDAO { + + @PostConstruct + public void init() { + super.indexName = CONNECT_MM2_INDEX; + checkCurrentDayIndexExist(); + register( this); + } +} diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslConstant.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslConstant.java index 3e01e19f..83c0279c 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslConstant.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslConstant.java @@ -89,6 +89,4 @@ public class DslConstant { public static final String GET_CONNECTOR_AGG_LIST_METRICS = "ConnectorMetricESDAO/getConnectorAggListMetric"; public static final String GET_CONNECTOR_AGG_TOP_METRICS = "ConnectorMetricESDAO/getConnectorAggTopMetric"; - - } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/template/TemplateConstant.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/template/TemplateConstant.java index f74f79c6..52f7b6bf 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/template/TemplateConstant.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/template/TemplateConstant.java @@ -12,6 +12,7 @@ public class TemplateConstant { public static final String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric"; public static final String CONNECT_CLUSTER_INDEX = "ks_kafka_connect_cluster_metric"; public static final String CONNECT_CONNECTOR_INDEX = "ks_kafka_connect_connector_metric"; + public static final String CONNECT_MM2_INDEX = "ks_kafka_connect_mirror_maker_metric"; private TemplateConstant() { } diff --git a/km-persistence/src/main/resources/es/template/ks_kafka_connect_mirror_maker_metric b/km-persistence/src/main/resources/es/template/ks_kafka_connect_mirror_maker_metric new file mode 100644 index 00000000..c95a36ed --- /dev/null +++ b/km-persistence/src/main/resources/es/template/ks_kafka_connect_mirror_maker_metric @@ -0,0 +1,98 @@ +{ + "order" : 10, + "index_patterns" : [ + "ks_kafka_connect_mirror_maker_metric*" + ], + "settings" : { + "index" : { + "number_of_shards" : "2" + } + }, + "mappings" : { + "properties" : { + "connectClusterId" : { + "type" : "long" + }, + "routingValue" : { + "type" : "text", + "fields" : { + "keyword" : { + "ignore_above" : 256, + "type" : "keyword" + } + } + }, + "connectorName" : { + "type" : "keyword" + }, + "connectorNameAndClusterId" : { + "type" : "keyword" + }, + "clusterPhyId" : { + "type" : "long" + }, + "metrics" : { + "properties" : { + "HealthState" : { + "type" : "float" + }, + "HealthCheckTotal" : { + "type" : "float" + }, + "ByteCount" : { + "type" : "float" + }, + "ByteRate" : { + "type" : "float" + }, + "RecordAgeMs" : { + "type" : "float" + }, + "RecordAgeMsAvg" : { + "type" : "float" + }, + "RecordAgeMsMax" : { + "type" : "float" + }, + "RecordAgeMsMin" : { + "type" : "float" + }, + "RecordCount" : { + "type" : "float" + }, + "RecordRate" : { + "type" : "float" + }, + "ReplicationLatencyMs" : { + "type" : "float" + }, + "ReplicationLatencyMsAvg" : { + "type" : "float" + }, + "ReplicationLatencyMsMax" : { + "type" : "float" + }, + "ReplicationLatencyMsMin" : { + "type" : "float" + } + } + }, + "key" : { + "type" : "text", + "fields" : { + "keyword" : { + "ignore_above" : 256, + "type" : "keyword" + } + } + }, + "timestamp" : { + "format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis", + "index" : true, + "type" : "date", + "doc_values" : true + } + } + }, + "aliases" : { } + } \ No newline at end of file