[Feature]MM2管理-读写ES中的MM2指标(#894)

This commit is contained in:
zengqiao
2023-02-09 14:35:47 +08:00
committed by EricZeng
parent 346aee8fe7
commit 9b7c41e804
8 changed files with 227 additions and 17 deletions

View File

@@ -0,0 +1,46 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author zengqiao
* @date 20/6/17
*/
@Data
@NoArgsConstructor
@ToString
public class MirrorMakerMetrics extends BaseMetrics {
private Long connectClusterId;
private String connectorName;
private String connectorNameAndClusterId;
public MirrorMakerMetrics(Long connectClusterId, String connectorName) {
super(null);
this.connectClusterId = connectClusterId;
this.connectorName = connectorName;
this.connectorNameAndClusterId = connectorName + "#" + connectClusterId;
}
public MirrorMakerMetrics(Long clusterPhyId, Long connectClusterId, String connectorName) {
super(clusterPhyId);
this.connectClusterId = connectClusterId;
this.connectorName = connectorName;
this.connectorNameAndClusterId = connectorName + "#" + connectClusterId;
}
public static MirrorMakerMetrics initWithMetric(Long connectClusterId, String connectorName, String metricName, Float value) {
MirrorMakerMetrics metrics = new MirrorMakerMetrics(connectClusterId, connectorName);
metrics.putMetric(metricName, value);
return metrics;
}
@Override
public String unique() {
return "KCOR@" + connectClusterId + "@" + connectorName;
}
}

View File

@@ -0,0 +1,39 @@
package com.xiaojukeji.know.streaming.km.common.bean.po.metrice.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import static com.xiaojukeji.know.streaming.km.common.utils.CommonUtils.monitorTimestamp2min;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MirrorMakerMetricPO extends BaseMetricESPO {
private Long connectClusterId;
private String connectorName;
/**
* 用于es内部排序
*/
private String connectorNameAndClusterId;
public MirrorMakerMetricPO(Long kafkaClusterPhyId, Long connectClusterId, String connectorName){
super(kafkaClusterPhyId);
this.connectClusterId = connectClusterId;
this.connectorName = connectorName;
this.connectorNameAndClusterId = connectorName + "#" + connectClusterId;
}
@Override
public String getKey() {
return "KCOR@" + clusterPhyId + "@" + connectClusterId + "@" + connectorName + "@" + monitorTimestamp2min(timestamp);
}
@Override
public String getRoutingValue() {
return String.valueOf(connectClusterId);
}
}

View File

@@ -13,27 +13,16 @@ import com.xiaojukeji.know.streaming.km.common.utils.Triple;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_CONNECTOR_INDEX;
@Component
public class ConnectorMetricESDAO extends BaseMetricESDAO {
@PostConstruct
public void init() {
super.indexName = CONNECT_CONNECTOR_INDEX;
checkCurrentDayIndexExist();
register( this);
}
public class BaseConnectorMetricESDAO extends BaseMetricESDAO {
/**
* 获取每个 metric topN connector 的指标如果获取不到 topN connectors, 则默认返回 defaultTopics 的指标
* 获取每个 metric topN connector 的指标如果获取不到 topN connectors, 则默认返回 defaultConnectorList 的指标
*/
public Table<String/*metric*/, Tuple<Long, String>, List<MetricPointVO>> listMetricsByTopN(Long clusterPhyId,
List<Tuple<Long, String>> defaultConnectorList,
@@ -143,7 +132,7 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
for(Tuple<Long, String> connector : connectorList) {
try {
esTPService.submitSearchTask(
String.format("class=ConnectorMetricESDAO||method=listMetricsByConnectors||ClusterPhyId=%d||connectorName=%s", clusterPhyId, connector.getV2()),
String.format("class=BaseConnectorMetricESDAO||method=listMetricsByConnectors||ClusterPhyId=%d||connectorName=%s", clusterPhyId, connector.getV2()),
3000,
() -> {
String dsl = dslLoaderUtil.getFormatDslByFileName(
@@ -318,7 +307,9 @@ public class ConnectorMetricESDAO extends BaseMetricESDAO {
private Tuple<String, Long> splitConnectorNameAndClusterId(String connectorNameAndClusterId){
String[] ss = connectorNameAndClusterId.split("#");
if(null == ss || ss.length != 2){return null;}
if(null == ss || ss.length != 2) {
return null;
}
return new Tuple<>(ss[0], Long.valueOf(ss[1]));
}

View File

@@ -0,0 +1,19 @@
package com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.connector;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.BaseConnectorMetricESDAO;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_CONNECTOR_INDEX;
@Component
public class ConnectorMetricESDAO extends BaseConnectorMetricESDAO {
@PostConstruct
public void init() {
super.indexName = CONNECT_CONNECTOR_INDEX;
checkCurrentDayIndexExist();
register( this);
}
}

View File

@@ -0,0 +1,18 @@
package com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.mm2;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.BaseConnectorMetricESDAO;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_MM2_INDEX;
@Component
public class MirrorMakerMetricESDAO extends BaseConnectorMetricESDAO {
@PostConstruct
public void init() {
super.indexName = CONNECT_MM2_INDEX;
checkCurrentDayIndexExist();
register( this);
}
}

View File

@@ -89,6 +89,4 @@ public class DslConstant {
public static final String GET_CONNECTOR_AGG_LIST_METRICS = "ConnectorMetricESDAO/getConnectorAggListMetric";
public static final String GET_CONNECTOR_AGG_TOP_METRICS = "ConnectorMetricESDAO/getConnectorAggTopMetric";
}

View File

@@ -12,6 +12,7 @@ public class TemplateConstant {
public static final String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric";
public static final String CONNECT_CLUSTER_INDEX = "ks_kafka_connect_cluster_metric";
public static final String CONNECT_CONNECTOR_INDEX = "ks_kafka_connect_connector_metric";
public static final String CONNECT_MM2_INDEX = "ks_kafka_connect_mirror_maker_metric";
private TemplateConstant() {
}

View File

@@ -0,0 +1,98 @@
{
"order" : 10,
"index_patterns" : [
"ks_kafka_connect_mirror_maker_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "2"
}
},
"mappings" : {
"properties" : {
"connectClusterId" : {
"type" : "long"
},
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"connectorName" : {
"type" : "keyword"
},
"connectorNameAndClusterId" : {
"type" : "keyword"
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"HealthState" : {
"type" : "float"
},
"HealthCheckTotal" : {
"type" : "float"
},
"ByteCount" : {
"type" : "float"
},
"ByteRate" : {
"type" : "float"
},
"RecordAgeMs" : {
"type" : "float"
},
"RecordAgeMsAvg" : {
"type" : "float"
},
"RecordAgeMsMax" : {
"type" : "float"
},
"RecordAgeMsMin" : {
"type" : "float"
},
"RecordCount" : {
"type" : "float"
},
"RecordRate" : {
"type" : "float"
},
"ReplicationLatencyMs" : {
"type" : "float"
},
"ReplicationLatencyMsAvg" : {
"type" : "float"
},
"ReplicationLatencyMsMax" : {
"type" : "float"
},
"ReplicationLatencyMsMin" : {
"type" : "float"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"index" : true,
"type" : "date",
"doc_values" : true
}
}
},
"aliases" : { }
}