From afe44a2537f8a9ef162d37fdadb3dde71a030cc4 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Sat, 3 Sep 2022 08:34:32 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=88=9B=E5=BB=BAES=E7=B4=A2?= =?UTF-8?q?=E5=BC=95=20&=20=E4=B8=BB=E5=8A=A8=E5=A1=AB=E8=A1=A5=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E5=8E=86=E5=8F=B2=E6=9B=B2=E7=BA=BF=E7=BC=BA=E5=B0=91?= =?UTF-8?q?=E7=9A=84=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/collector/metric/MetricESSender.java | 23 +- .../bean/vo/metrics/point/MetricPointVO.java | 4 + .../km/common/constant/ESIndexConstant.java | 647 ++++++++++++++++++ .../enums/metric/KafkaMetricIndexEnum.java | 54 -- .../metrics/ClusterMetricVersionItems.java | 2 + .../km/persistence/es/BaseESDAO.java | 2 +- .../km/persistence/es/ESOpClient.java | 91 +++ .../persistence/es/dao/BaseMetricESDAO.java | 73 +- .../persistence/es/dao/BrokerMetricESDAO.java | 10 +- .../es/dao/ClusterMetricESDAO.java | 10 +- .../persistence/es/dao/GroupMetricESDAO.java | 11 +- .../es/dao/PartitionMetricESDAO.java | 8 +- .../es/dao/ReplicationMetricESDAO.java | 8 +- .../persistence/es/dao/TopicMetricESDAO.java | 10 +- .../es/ClusterMetricESDAOTest.java | 4 +- 15 files changed, 859 insertions(+), 98 deletions(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java delete mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/metric/KafkaMetricIndexEnum.java diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java index 55944f6f..a94a377d 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java @@ -5,7 +5,6 @@ import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*; -import com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory; @@ -21,6 +20,8 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; + @Component public class MetricESSender implements ApplicationListener { protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); @@ -41,37 +42,37 @@ public class MetricESSender implements ApplicationListener { public void onApplicationEvent(BaseMetricEvent event) { if(event instanceof BrokerMetricEvent) { BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event; - send2es(KafkaMetricIndexEnum.BROKER_INFO, + send2es(BROKER_INDEX, ConvertUtil.list2List(brokerMetricEvent.getBrokerMetrics(), BrokerMetricPO.class) ); } else if(event instanceof ClusterMetricEvent) { ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event; - send2es(KafkaMetricIndexEnum.CLUSTER_INFO, + send2es(CLUSTER_INDEX, ConvertUtil.list2List(clusterMetricEvent.getClusterMetrics(), ClusterMetricPO.class) ); } else if(event instanceof TopicMetricEvent) { TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event; - send2es(KafkaMetricIndexEnum.TOPIC_INFO, + send2es(TOPIC_INDEX, ConvertUtil.list2List(topicMetricEvent.getTopicMetrics(), TopicMetricPO.class) ); } else if(event instanceof PartitionMetricEvent) { PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event; - send2es(KafkaMetricIndexEnum.PARTITION_INFO, + send2es(PARTITION_INDEX, ConvertUtil.list2List(partitionMetricEvent.getPartitionMetrics(), PartitionMetricPO.class) ); } else if(event instanceof GroupMetricEvent) { GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event; - send2es(KafkaMetricIndexEnum.GROUP_INFO, + send2es(GROUP_INDEX, ConvertUtil.list2List(groupMetricEvent.getGroupMetrics(), GroupMetricPO.class) ); } else if(event instanceof ReplicaMetricEvent) { ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event; - send2es(KafkaMetricIndexEnum.REPLICATION_INFO, + send2es(REPLICATION_INDEX, ConvertUtil.list2List(replicaMetricEvent.getReplicationMetrics(), ReplicationMetricPO.class) ); } @@ -80,19 +81,19 @@ public class MetricESSender implements ApplicationListener { /** * 根据不同监控维度来发送 */ - private boolean send2es(KafkaMetricIndexEnum stats, List statsList){ + private boolean send2es(String index, List statsList){ if (CollectionUtils.isEmpty(statsList)) { return true; } if (!EnvUtil.isOnline()) { LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}", - stats.getIndex(), statsList.size()); + index, statsList.size()); } - BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(stats); + BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index); if (Objects.isNull( baseMetricESDao )) { - LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", stats.getIndex()); + LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index); return false; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java index 1dc894f7..c647b222 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/point/MetricPointVO.java @@ -29,6 +29,10 @@ public class MetricPointVO implements Comparable { @Override public int compareTo(MetricPointVO o) { if(null == o){return 0;} + if(null == this.getTimeStamp() + || null == o.getTimeStamp()){ + return 0; + } return this.getTimeStamp().intValue() - o.getTimeStamp().intValue(); } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java new file mode 100644 index 00000000..0de516f7 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java @@ -0,0 +1,647 @@ +package com.xiaojukeji.know.streaming.km.common.constant; + +public class ESIndexConstant { + + public final static String TOPIC_INDEX = "ks_kafka_topic_metric"; + public final static String TOPIC_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_topic_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"brokerId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"topic\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"BytesIn_min_15\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"Messages\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesRejected\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"PartitionURP\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckTotal\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"ReplicationCount\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"ReplicationBytesOut\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"ReplicationBytesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"FailedFetchRequests\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn_min_5\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthScore\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"LogSize\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut_min_15\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"FailedProduceRequests\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut_min_5\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"MessagesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"TotalProduceRequests\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckPassed\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"brokerAgg\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String CLUSTER_INDEX = "ks_kafka_cluster_metric"; + public final static String CLUSTER_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_cluster_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"Connections\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesIn_min_15\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PartitionURP\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore_Topics\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"EventQueueSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"ActiveControllerCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupDeads\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesIn_min_5\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal_Topics\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Partitions\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesOut\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Groups\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesOut_min_15\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"TotalRequestQueueSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed_Groups\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"TotalProduceRequests\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"TotalLogSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupEmptys\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PartitionNoLeader\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore_Brokers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Messages\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Topics\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PartitionMinISR_E\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Brokers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Replicas\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal_Groups\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupRebalances\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MessageIn\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed_Topics\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal_Brokers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PartitionMinISR_S\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesIn\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"BytesOut_min_5\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupActives\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MessagesIn\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"GroupReBalances\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed_Brokers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore_Groups\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"TotalResponseQueueSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"Zookeepers\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"LeaderMessages\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthScore_Cluster\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckPassed_Cluster\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"HealthCheckTotal_Cluster\" : {\n" + + " \"type\" : \"double\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"type\" : \"date\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String BROKER_INDEX = "ks_kafka_broker_metric"; + public final static String BROKER_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_broker_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"brokerId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"NetworkProcessorAvgIdle\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"UnderReplicatedPartitions\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn_min_15\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckTotal\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"RequestHandlerAvgIdle\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"connectionsCount\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn_min_5\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthScore\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut_min_15\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"BytesOut_min_5\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"TotalRequestQueueSize\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"MessagesIn\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"TotalProduceRequests\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckPassed\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"TotalResponseQueueSize\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String PARTITION_INDEX = "ks_kafka_partition_metric"; + public final static String PARTITION_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_partition_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"brokerId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"partitionId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"topic\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"LogStartOffset\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"Messages\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"LogEndOffset\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String GROUP_INDEX = "ks_kafka_group_metric"; + public final static String GROUP_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_group_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"group\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"partitionId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"topic\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"HealthScore\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"Lag\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"OffsetConsumed\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckTotal\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"HealthCheckPassed\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"groupMetric\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + + public final static String REPLICATION_INDEX = "ks_kafka_replication_metric"; + public final static String REPLICATION_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_partition_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"brokerId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"partitionId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"topic\" : {\n" + + " \"type\" : \"keyword\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"LogStartOffset\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"Messages\" : {\n" + + " \"type\" : \"float\"\n" + + " },\n" + + " \"LogEndOffset\" : {\n" + + " \"type\" : \"float\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }[root@10-255-0-23 template]# cat ks_kafka_replication_metric\n" + + "PUT _template/ks_kafka_replication_metric\n" + + "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_replication_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"index\" : true,\n" + + " \"type\" : \"date\",\n" + + " \"doc_values\" : true\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/metric/KafkaMetricIndexEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/metric/KafkaMetricIndexEnum.java deleted file mode 100644 index 25535864..00000000 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/metric/KafkaMetricIndexEnum.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.xiaojukeji.know.streaming.km.common.enums.metric; - -/** - * @author: D10865 - * @description: - * @date: Create on 2019/3/11 下午2:19 - * @modified By D10865 - * - * 不同维度的es监控数据 - */ -public enum KafkaMetricIndexEnum { - - /** - * topic 维度 - */ - TOPIC_INFO("ks_kafka_topic_metric"), - - /** - * 集群 维度 - */ - CLUSTER_INFO("ks_kafka_cluster_metric"), - - /** - * broker 维度 - */ - BROKER_INFO("ks_kafka_broker_metric"), - - /** - * partition 维度 - */ - PARTITION_INFO("ks_kafka_partition_metric"), - - /** - * group 维度 - */ - GROUP_INFO("ks_kafka_group_metric"), - - /** - * replication 维度 - */ - REPLICATION_INFO("ks_kafka_replication_metric"), - - ; - - private String index; - - KafkaMetricIndexEnum(String index) { - this.index = index; - } - - public String getIndex() { - return index; - } -} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java index d3357ab4..53b98479 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java @@ -64,11 +64,13 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric { public static final String CLUSTER_METRIC_BYTES_OUT = "BytesOut"; public static final String CLUSTER_METRIC_BYTES_OUT_5_MIN = "BytesOut_min_5"; public static final String CLUSTER_METRIC_BYTES_OUT_15_MIN = "BytesOut_min_15"; + public static final String CLUSTER_METRIC_GROUP = "Groups"; public static final String CLUSTER_METRIC_GROUP_ACTIVES = "GroupActives"; public static final String CLUSTER_METRIC_GROUP_EMPTYS = "GroupEmptys"; public static final String CLUSTER_METRIC_GROUP_REBALANCES = "GroupRebalances"; public static final String CLUSTER_METRIC_GROUP_DEADS = "GroupDeads"; + public static final String CLUSTER_METRIC_ALIVE = "Alive"; public static final String CLUSTER_METRIC_ACL_ENABLE = "AclEnable"; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java index dff96236..62bc6a57 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/BaseESDAO.java @@ -8,7 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired; /** * 直接操作es集群的dao */ -public class BaseESDAO { +public abstract class BaseESDAO { protected static final ILog LOGGER = LogFactory.getLog("ES_LOGGER"); /** diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index c611c538..1200699a 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -11,7 +11,11 @@ import com.didiglobal.logi.elasticsearch.client.request.batch.ESBatchRequest; import com.didiglobal.logi.elasticsearch.client.request.query.query.ESQueryRequest; import com.didiglobal.logi.elasticsearch.client.response.batch.ESBatchResponse; import com.didiglobal.logi.elasticsearch.client.response.batch.IndexResultItemNode; +import com.didiglobal.logi.elasticsearch.client.response.indices.gettemplate.ESIndicesGetTemplateResponse; +import com.didiglobal.logi.elasticsearch.client.response.indices.putindex.ESIndicesPutIndexResponse; +import com.didiglobal.logi.elasticsearch.client.response.indices.puttemplate.ESIndicesPutTemplateResponse; import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse; +import com.didiglobal.logi.elasticsearch.client.response.setting.template.TemplateConfig; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.google.common.collect.Lists; @@ -340,7 +344,94 @@ public class ESOpClient { return false; } + /** + * 根据表达式判断索引是否已存在 + */ + public boolean indexExist(String indexName) { + ESClient esClient = null; + try { + esClient = this.getESClientFromPool(); + if (esClient == null) { + return false; + } + + // 检查索引是否存在 + return esClient.admin().indices().prepareExists(indexName).execute().actionGet(30, TimeUnit.SECONDS).isExists(); + } catch (Exception e){ + LOGGER.warn("class=ESOpClient||method=indexExist||indexName={}||msg=exception!", indexName, e); + } finally { + if (esClient != null) { + returnESClientToPool(esClient); + } + } + + return false; + } + + /** + * 创建索引 + */ + public boolean createIndex(String indexName) { + if (indexExist(indexName)) { + return true; + } + + ESClient client = getESClientFromPool(); + if (client != null) { + try { + ESIndicesPutIndexResponse response = client.admin().indices().preparePutIndex(indexName).execute() + .actionGet(30, TimeUnit.SECONDS); + return response.getAcknowledged(); + } catch (Exception e){ + LOGGER.warn( "msg=create index fail||indexName={}", indexName, e); + } finally { + returnESClientToPool(client); + } + } + + return false; + } + + /** + * 创建索引模板 + */ + public boolean createIndexTemplateIfNotExist(String indexTemplateName, String config) { + ESClient esClient = null; + + try { + esClient = this.getESClientFromPool(); + + // 获取es中原来index template的配置 + ESIndicesGetTemplateResponse getTemplateResponse = + esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( 30, TimeUnit.SECONDS ); + + TemplateConfig templateConfig = getTemplateResponse.getMultiTemplatesConfig().getSingleConfig(); + + if (null != templateConfig) { + return true; + } + + // 创建新的模板 + ESIndicesPutTemplateResponse response = esClient.admin().indices().preparePutTemplate( indexTemplateName ) + .setTemplateConfig( config ).execute().actionGet( 30, TimeUnit.SECONDS ); + + return response.getAcknowledged(); + } catch (Exception e) { + LOGGER.warn( + "class=ESOpClient||method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!", + indexTemplateName, config, e + ); + } finally { + if (esClient != null) { + this.returnESClientToPool(esClient); + } + } + + return false; + } + /**************************************************** private method ****************************************************/ + /** * 执行查询 * @param request diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java index 8a0f96a9..fe04e4d1 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java @@ -8,11 +8,12 @@ import com.google.common.collect.Maps; import com.xiaojukeji.know.streaming.km.common.bean.entity.search.*; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BaseMetricESPO; -import com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.utils.IndexNameUtils; import com.xiaojukeji.know.streaming.km.persistence.es.BaseESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; import lombok.NoArgsConstructor; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.util.CollectionUtils; import java.util.*; @@ -25,7 +26,8 @@ public class BaseMetricESDAO extends BaseESDAO { /** * 操作的索引名称 */ - protected String indexName; + protected String indexName; + protected String indexTemplate; protected static final Long ONE_MIN = 60 * 1000L; protected static final Long FIVE_MIN = 5 * ONE_MIN; @@ -35,10 +37,24 @@ public class BaseMetricESDAO extends BaseESDAO { /** * 不同维度 kafka 监控数据 */ - private static Map ariusStatsEsDaoMap = Maps + private static Map ariusStatsEsDaoMap = Maps .newConcurrentMap(); - public static BaseMetricESDAO getByStatsType(KafkaMetricIndexEnum statsType) { + /** + * 检查 es 索引是否存在,不存在则创建索引 + */ + @Scheduled(cron = "0 3/5 * * * ?") + public void checkCurrentDayIndexExist(){ + String realIndex = IndexNameUtils.genCurrentDailyIndexName(indexName); + + if(esOpClient.indexExist(realIndex)){return;} + + if(esOpClient.createIndexTemplateIfNotExist(indexName, indexTemplate)){ + esOpClient.createIndex(realIndex); + } + } + + public static BaseMetricESDAO getByStatsType(String statsType) { return ariusStatsEsDaoMap.get(statsType); } @@ -48,7 +64,7 @@ public class BaseMetricESDAO extends BaseESDAO { * @param statsType * @param baseAriusStatsEsDao */ - public static void register(KafkaMetricIndexEnum statsType, BaseMetricESDAO baseAriusStatsEsDao) { + public static void register(String statsType, BaseMetricESDAO baseAriusStatsEsDao) { ariusStatsEsDaoMap.put(statsType, baseAriusStatsEsDao); } @@ -358,7 +374,50 @@ public class BaseMetricESDAO extends BaseESDAO { String dsl = dslLoaderUtil.getFormatDslByFileName(DslsConstant.GET_LATEST_METRIC_TIME, startTime, endTime, appendQueryDsl); String realIndexName = IndexNameUtils.genDailyIndexName(indexName, startTime, endTime); - return esOpClient.performRequest(realIndexName, dsl, s -> s.getHits().getHits().isEmpty() - ? System.currentTimeMillis() : ((Map)s.getHits().getHits().get(0).getSource()).get(TIME_STAMP), 3); + return esOpClient.performRequest( + realIndexName, + dsl, + s -> s == null || s.getHits().getHits().isEmpty() ? System.currentTimeMillis() : ((Map)s.getHits().getHits().get(0).getSource()).get(TIME_STAMP), + 3 + ); + } + + /** + * 对 metricPointVOS 进行缺点优化 + */ + protected List optimizeMetricPoints(List metricPointVOS){ + if(CollectionUtils.isEmpty(metricPointVOS)){return metricPointVOS;} + + int size = metricPointVOS.size(); + if(size < 2){return metricPointVOS;} + + Collections.sort(metricPointVOS); + + List rets = new ArrayList<>(); + for(int first = 0, second = first + 1; second < size; first++, second++){ + MetricPointVO firstPoint = metricPointVOS.get(first); + MetricPointVO secondPoint = metricPointVOS.get(second); + + if(null != firstPoint && null != secondPoint){ + rets.add(firstPoint); + + //说明有空点,那就增加一个点 + if(secondPoint.getTimeStamp() - firstPoint.getTimeStamp() > ONE_MIN){ + MetricPointVO addPoint = new MetricPointVO(); + addPoint.setName(firstPoint.getName()); + addPoint.setAggType(firstPoint.getAggType()); + addPoint.setValue(firstPoint.getValue()); + addPoint.setTimeStamp(firstPoint.getTimeStamp() + ONE_MIN); + + rets.add(addPoint); + } + + if(second == size - 1){ + rets.add(secondPoint); + } + } + } + + return rets; } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java index b80c1ca0..edc186f4 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java @@ -18,14 +18,16 @@ import java.util.*; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.BROKER_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; @Component public class BrokerMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = BROKER_INFO.getIndex(); - BaseMetricESDAO.register(BROKER_INFO, this); + super.indexName = BROKER_INDEX; + super.indexTemplate = BROKER_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } protected FutureWaitUtil queryFuture = FutureWaitUtil.init("BrokerMetricESDAO", 4,8, 500); @@ -258,7 +260,7 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { } } ); - metricMap.put(metric, metricPoints); + metricMap.put(metric, optimizeMetricPoints(metricPoints)); } return metricMap; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java index 63a9f3f1..82a86253 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ClusterMetricESDAO.java @@ -23,15 +23,17 @@ import java.util.List; import java.util.Map; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.CLUSTER_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; @Component public class ClusterMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = CLUSTER_INFO.getIndex(); - BaseMetricESDAO.register(CLUSTER_INFO, this); + super.indexName = CLUSTER_INDEX; + super.indexTemplate = CLUSTER_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } protected FutureWaitUtil queryFuture = FutureWaitUtil.init("ClusterMetricESDAO", 4,8, 500); @@ -207,7 +209,7 @@ public class ClusterMetricESDAO extends BaseMetricESDAO { } } ); - metricMap.put(metric, metricPoints); + metricMap.put(metric, optimizeMetricPoints(metricPoints)); } return metricMap; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java index 42ae0ace..cf65e6ef 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/GroupMetricESDAO.java @@ -23,16 +23,17 @@ import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.constant.Constant.ZERO; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; -import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.KEY; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.GROUP_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; @Component public class GroupMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = GROUP_INFO.getIndex(); - BaseMetricESDAO.register(GROUP_INFO, this); + super.indexName = GROUP_INDEX; + super.indexTemplate = GROUP_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } protected FutureWaitUtil queryFuture = FutureWaitUtil.init("GroupMetricESDAO", 4,8, 500); @@ -206,7 +207,7 @@ public class GroupMetricESDAO extends BaseMetricESDAO { } } ); - metricMap.put(metric, metricPoints); + metricMap.put(metric, optimizeMetricPoints(metricPoints)); } return metricMap; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java index 85dc55df..4f86852b 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/PartitionMetricESDAO.java @@ -8,7 +8,7 @@ import javax.annotation.PostConstruct; import java.util.List; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.PARTITION_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; /** * @author didi @@ -18,8 +18,10 @@ public class PartitionMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = PARTITION_INFO.getIndex(); - BaseMetricESDAO.register(PARTITION_INFO, this); + super.indexName = PARTITION_INDEX; + super.indexTemplate = PARTITION_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } public PartitionMetricPO getPartitionLatestMetrics(Long clusterPhyId, String topic, diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java index e5f9f164..1f604cc0 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java @@ -14,7 +14,7 @@ import java.util.List; import java.util.Map; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.VALUE; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.REPLICATION_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; /** * @author didi @@ -24,8 +24,10 @@ public class ReplicationMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = REPLICATION_INFO.getIndex(); - BaseMetricESDAO.register(REPLICATION_INFO, this); + super.indexName = REPLICATION_INDEX; + super.indexTemplate = REPLICATION_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } /** diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java index 402333ee..e9089c17 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java @@ -22,15 +22,17 @@ import java.util.*; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; -import static com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum.TOPIC_INFO; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; @Component public class TopicMetricESDAO extends BaseMetricESDAO { @PostConstruct public void init() { - super.indexName = TOPIC_INFO.getIndex(); - BaseMetricESDAO.register(TOPIC_INFO, this); + super.indexName = TOPIC_INDEX; + super.indexTemplate = TOPIC_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); } protected FutureWaitUtil queryFuture = FutureWaitUtil.init("TopicMetricESDAO", 4,8, 500); @@ -352,7 +354,7 @@ public class TopicMetricESDAO extends BaseMetricESDAO { } } ); - metricMap.put(metric, metricPoints); + metricMap.put(metric, optimizeMetricPoints(metricPoints)); } return metricMap; diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java index 2cdb895e..c69f7129 100644 --- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java +++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java @@ -20,8 +20,8 @@ public class ClusterMetricESDAOTest extends KnowStreamApplicationTest { @Test public void listClusterMetricsByClusterIdsTest(){ - List metrics = Arrays.asList("BytesIn_min_1", "BytesOut_min_1"); - List clusterIds = Arrays.asList(123L); + List metrics = Arrays.asList("MessagesIn"); + List clusterIds = Arrays.asList(293L); Long endTime = System.currentTimeMillis(); Long startTime = endTime - 4 * 60 * 60 * 1000;