From f4a219ceefb61501328b66d17482d102b5848781 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 9 Jan 2023 14:47:18 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=E5=8E=BB=E9=99=A4Replica=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E4=BB=8EES=E8=AF=BB=E5=86=99=E7=9A=84=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E4=BB=A3=E7=A0=81(#862)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../metric/kafka/ReplicaMetricCollector.java | 114 ------------------ .../sink/kafka/ReplicaMetricESSender.java | 29 ----- .../bean/event/metric/ReplicaMetricEvent.java | 20 --- .../service/replica/ReplicaMetricService.java | 11 -- .../impl/ReplicaMetricServiceImpl.java | 24 ---- .../init/template/ks_kafka_replication_metric | 65 ---------- .../component/AbstractMonitorSinkService.java | 3 - .../es/dao/ReplicationMetricESDAO.java | 95 --------------- .../km/persistence/es/dsls/DslConstant.java | 5 - .../es/template/TemplateConstant.java | 1 - .../getAggSingleReplicationMetrics | 48 -------- .../getReplicationLatestMetrics | 52 -------- .../es/template/ks_kafka_replication_metric | 65 ---------- .../v3/replica/ReplicaMetricsController.java | 2 +- .../es/ReplicationMetricESDAOTest.java | 48 -------- 15 files changed, 1 insertion(+), 581 deletions(-) delete mode 100644 km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ReplicaMetricCollector.java delete mode 100644 km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/kafka/ReplicaMetricESSender.java delete mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ReplicaMetricEvent.java delete mode 100644 km-dist/init/template/ks_kafka_replication_metric delete mode 100644 km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java delete mode 100644 km-persistence/src/main/resources/es/dsl/ReplicationMetricESDAO/getAggSingleReplicationMetrics delete mode 100644 km-persistence/src/main/resources/es/dsl/ReplicationMetricESDAO/getReplicationLatestMetrics delete mode 100644 km-persistence/src/main/resources/es/template/ks_kafka_replication_metric delete mode 100644 km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ReplicationMetricESDAOTest.java diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ReplicaMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ReplicaMetricCollector.java deleted file mode 100644 index e6c5efcd..00000000 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ReplicaMetricCollector.java +++ /dev/null @@ -1,114 +0,0 @@ -package com.xiaojukeji.know.streaming.km.collector.metric.kafka; - -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics; -import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; -import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; -import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; -import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; -import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; -import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; -import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService; -import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.List; - -import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_REPLICATION; - -/** - * @author didi - */ -@Component -public class ReplicaMetricCollector extends AbstractKafkaMetricCollector { - protected static final ILog LOGGER = LogFactory.getLog(ReplicaMetricCollector.class); - - @Autowired - private VersionControlService versionControlService; - - @Autowired - private ReplicaMetricService replicaMetricService; - - @Autowired - private PartitionService partitionService; - - @Override - public List collectKafkaMetrics(ClusterPhy clusterPhy) { - Long clusterPhyId = clusterPhy.getId(); - List partitions = partitionService.listPartitionFromCacheFirst(clusterPhyId); - List items = versionControlService.listVersionControlItem(this.getClusterVersion(clusterPhy), collectorType().getCode()); - - FutureWaitUtil future = this.getFutureUtilByClusterPhyId(clusterPhyId); - - List metricsList = new ArrayList<>(); - for(Partition partition : partitions) { - for (Integer brokerId: partition.getAssignReplicaList()) { - ReplicationMetrics metrics = new ReplicationMetrics(clusterPhyId, partition.getTopicName(), brokerId, partition.getPartitionId()); - metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME); - metricsList.add(metrics); - - future.runnableTask( - String.format("class=ReplicaMetricCollector||clusterPhyId=%d||brokerId=%d||topicName=%s||partitionId=%d", - clusterPhyId, brokerId, partition.getTopicName(), partition.getPartitionId()), - 30000, - () -> collectMetrics(clusterPhyId, metrics, items) - ); - } - } - - future.waitExecute(30000); - - publishMetric(new ReplicaMetricEvent(this, metricsList)); - - return metricsList; - } - - @Override - public VersionItemTypeEnum collectorType() { - return METRIC_REPLICATION; - } - - /**************************************************** private method ****************************************************/ - - private ReplicationMetrics collectMetrics(Long clusterPhyId, ReplicationMetrics metrics, List items) { - long startTime = System.currentTimeMillis(); - - for(VersionControlItem v : items) { - try { - if (metrics.getMetrics().containsKey(v.getName())) { - continue; - } - - Result ret = replicaMetricService.collectReplicaMetricsFromKafka( - clusterPhyId, - metrics.getTopic(), - metrics.getBrokerId(), - metrics.getPartitionId(), - v.getName() - ); - - if (null == ret || ret.failed() || null == ret.getData()) { - continue; - } - - metrics.putMetric(ret.getData().getMetrics()); - } catch (Exception e) { - LOGGER.error( - "method=collectMetrics||clusterPhyId={}||topicName={}||partition={}||metricName={}||errMsg=exception!", - clusterPhyId, metrics.getTopic(), metrics.getPartitionId(), v.getName(), e - ); - } - } - - // 记录采集性能 - metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f); - - return metrics; - } -} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/kafka/ReplicaMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/kafka/ReplicaMetricESSender.java deleted file mode 100644 index d7b74905..00000000 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/kafka/ReplicaMetricESSender.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.xiaojukeji.know.streaming.km.collector.sink.kafka; - -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.collector.sink.AbstractMetricESSender; -import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent; -import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO; -import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.REPLICATION_INDEX; - -@Component -public class ReplicaMetricESSender extends AbstractMetricESSender implements ApplicationListener { - private static final ILog LOGGER = LogFactory.getLog(ReplicaMetricESSender.class); - - @PostConstruct - public void init(){ - LOGGER.info("method=init||msg=init finished"); - } - - @Override - public void onApplicationEvent(ReplicaMetricEvent event) { - send2es(REPLICATION_INDEX, ConvertUtil.list2List(event.getReplicationMetrics(), ReplicationMetricPO.class)); - } -} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ReplicaMetricEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ReplicaMetricEvent.java deleted file mode 100644 index 9b71a69b..00000000 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ReplicaMetricEvent.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.xiaojukeji.know.streaming.km.common.bean.event.metric; - -import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics; -import lombok.Getter; - -import java.util.List; - -/** - * @author didi - */ -@Getter -public class ReplicaMetricEvent extends BaseMetricEvent{ - - private final List replicationMetrics; - - public ReplicaMetricEvent(Object source, List replicationMetrics) { - super( source ); - this.replicationMetrics = replicationMetrics; - } -} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/ReplicaMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/ReplicaMetricService.java index c0a44586..1223df25 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/ReplicaMetricService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/ReplicaMetricService.java @@ -1,9 +1,7 @@ package com.xiaojukeji.know.streaming.km.core.service.replica; -import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; -import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import java.util.List; @@ -14,13 +12,4 @@ public interface ReplicaMetricService { */ Result collectReplicaMetricsFromKafka(Long clusterId, String topic, Integer partitionId, Integer brokerId, String metric); Result collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List metricNameList); - - /** - * 从ES中获取指标 - */ - @Deprecated - Result> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, MetricDTO dto); - - @Deprecated - Result getLatestMetricsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, List metricNames); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java index c15914a6..92986d4b 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java @@ -2,7 +2,6 @@ package com.xiaojukeji.know.streaming.km.core.service.replica.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ReplicationMetricParam; @@ -10,26 +9,21 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInfo; import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO; -import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil; -import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; -import com.xiaojukeji.know.streaming.km.persistence.es.dao.ReplicationMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.management.InstanceNotFoundException; import javax.management.ObjectName; -import java.util.ArrayList; import java.util.List; -import java.util.Map; import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_REPLICATION; @@ -54,9 +48,6 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli @Autowired private PartitionService partitionService; - @Autowired - private ReplicationMetricESDAO replicationMetricESDAO; - @Override protected List listMetricPOFields(){ return BeanUtil.listBeanFields(ReplicationMetricPO.class); @@ -118,21 +109,6 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli } } - @Override - public Result> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, MetricDTO dto) { - Map metricPointMap = replicationMetricESDAO.getReplicationMetricsPoint(clusterPhyId, topicName, brokerId, partitionId, - dto.getMetricsNames(), dto.getAggType(), dto.getStartTime(), dto.getEndTime()); - - List metricPoints = new ArrayList<>(metricPointMap.values()); - return Result.buildSuc(metricPoints); - } - - @Override - public Result getLatestMetricsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, List metricNames) { - ReplicationMetricPO metricPO = replicationMetricESDAO.getReplicationLatestMetrics(clusterPhyId, brokerId, topicName, partitionId, metricNames); - return Result.buildSuc(ConvertUtil.obj2Obj(metricPO, ReplicationMetrics.class)); - } - /**************************************************** private method ****************************************************/ private Result doNothing(VersionItemParam param) { ReplicationMetricParam metricParam = (ReplicationMetricParam)param; diff --git a/km-dist/init/template/ks_kafka_replication_metric b/km-dist/init/template/ks_kafka_replication_metric deleted file mode 100644 index a8ff4b53..00000000 --- a/km-dist/init/template/ks_kafka_replication_metric +++ /dev/null @@ -1,65 +0,0 @@ -{ - "order" : 10, - "index_patterns" : [ - "ks_kafka_replication_metric*" - ], - "settings" : { - "index" : { - "number_of_shards" : "10" - } - }, - "mappings" : { - "properties" : { - "brokerId" : { - "type" : "long" - }, - "partitionId" : { - "type" : "long" - }, - "routingValue" : { - "type" : "text", - "fields" : { - "keyword" : { - "ignore_above" : 256, - "type" : "keyword" - } - } - }, - "clusterPhyId" : { - "type" : "long" - }, - "topic" : { - "type" : "keyword" - }, - "metrics" : { - "properties" : { - "LogStartOffset" : { - "type" : "float" - }, - "Messages" : { - "type" : "float" - }, - "LogEndOffset" : { - "type" : "float" - } - } - }, - "key" : { - "type" : "text", - "fields" : { - "keyword" : { - "ignore_above" : 256, - "type" : "keyword" - } - } - }, - "timestamp" : { - "format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis", - "index" : true, - "type" : "date", - "doc_values" : true - } - } - }, - "aliases" : { } - } \ No newline at end of file diff --git a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java index 1fccaf90..8bac5ac6 100644 --- a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java +++ b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java @@ -56,9 +56,6 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event; sinkMetrics(groupMetric2SinkPoint(groupMetricEvent.getGroupMetrics())); - } else if(event instanceof ReplicaMetricEvent) { - ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event; - sinkMetrics(replicationMetric2SinkPoint(replicaMetricEvent.getReplicationMetrics())); } else if(event instanceof ZookeeperMetricEvent) { ZookeeperMetricEvent zookeeperMetricEvent = (ZookeeperMetricEvent)event; sinkMetrics(zookeeperMetric2SinkPoint(zookeeperMetricEvent.getZookeeperMetrics())); diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java deleted file mode 100644 index 6f1c7561..00000000 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ReplicationMetricESDAO.java +++ /dev/null @@ -1,95 +0,0 @@ -package com.xiaojukeji.know.streaming.km.persistence.es.dao; - -import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse; -import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr; -import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO; -import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslConstant; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.VALUE; -import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.REPLICATION_INDEX; - -/** - * @author didi - */ -@Component -public class ReplicationMetricESDAO extends BaseMetricESDAO { - - @PostConstruct - public void init() { - super.indexName = REPLICATION_INDEX; - checkCurrentDayIndexExist(); - register(this); - } - - /** - * 获取集群 clusterId 中 brokerId 最新的统计指标 - */ - public ReplicationMetricPO getReplicationLatestMetrics(Long clusterPhyId, Integer brokerId, String topic, - Integer partitionId, List metricNames){ - Long endTime = getLatestMetricTime(); - Long startTime = endTime - FIVE_MIN; - - String dsl = dslLoaderUtil.getFormatDslByFileName( - DslConstant.GET_REPLICATION_LATEST_METRICS, clusterPhyId, brokerId, topic, partitionId, startTime, endTime); - - ReplicationMetricPO replicationMetricPO = esOpClient.performRequestAndTakeFirst( - realIndex(startTime, endTime), dsl, ReplicationMetricPO.class); - - return (null == replicationMetricPO) ? new ReplicationMetricPO(clusterPhyId, topic, brokerId, partitionId) - : filterMetrics(replicationMetricPO, metricNames); - } - - /** - * 获取集群 clusterPhyId 中每个 metric 的指定 partitionId 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值 - */ - public Map getReplicationMetricsPoint(Long clusterPhyId, String topic, - Integer brokerId, Integer partitionId, List metrics, - String aggType, Long startTime, Long endTime){ - //1、获取需要查下的索引 - String realIndex = realIndex(startTime, endTime); - - //2、构造agg查询条件 - String aggDsl = buildAggsDSL(metrics, aggType); - - String dsl = dslLoaderUtil.getFormatDslByFileName( - DslConstant.GET_REPLICATION_AGG_SINGLE_METRICS, clusterPhyId, brokerId,topic, partitionId, startTime, endTime, aggDsl); - - return esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl, - s -> handleSingleESQueryResponse(s, metrics, aggType), 3); - } - - /**************************************************** private method ****************************************************/ - private Map handleSingleESQueryResponse(ESQueryResponse response, List metrics, String aggType){ - Map metricMap = new HashMap<>(); - - if(null == response || null == response.getAggs()){ - return metricMap; - } - - Map esAggrMap = response.getAggs().getEsAggrMap(); - if (null == esAggrMap) { - return metricMap; - } - - for(String metric : metrics){ - String value = esAggrMap.get(metric).getUnusedMap().get(VALUE).toString(); - - MetricPointVO metricPoint = new MetricPointVO(); - metricPoint.setAggType(aggType); - metricPoint.setValue(value); - metricPoint.setName(metric); - - metricMap.put(metric, metricPoint); - } - - return metricMap; - } -} diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslConstant.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslConstant.java index 9bd8062a..3e01e19f 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslConstant.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslConstant.java @@ -62,11 +62,6 @@ public class DslConstant { public static final String LIST_PARTITION_LATEST_METRICS_BY_TOPIC = "PartitionMetricESDAO/listPartitionLatestMetricsByTopic"; - /**************************************************** REPLICATION ****************************************************/ - public static final String GET_REPLICATION_AGG_SINGLE_METRICS = "ReplicationMetricESDAO/getAggSingleReplicationMetrics"; - - public static final String GET_REPLICATION_LATEST_METRICS = "ReplicationMetricESDAO/getReplicationLatestMetrics"; - /**************************************************** Group ****************************************************/ public static final String GET_GROUP_TOPIC_PARTITION = "GroupMetricESDAO/getTopicPartitionOfGroup"; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/template/TemplateConstant.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/template/TemplateConstant.java index 2fc61f38..f74f79c6 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/template/TemplateConstant.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/template/TemplateConstant.java @@ -9,7 +9,6 @@ public class TemplateConstant { public static final String BROKER_INDEX = "ks_kafka_broker_metric"; public static final String PARTITION_INDEX = "ks_kafka_partition_metric"; public static final String GROUP_INDEX = "ks_kafka_group_metric"; - public static final String REPLICATION_INDEX = "ks_kafka_replication_metric"; public static final String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric"; public static final String CONNECT_CLUSTER_INDEX = "ks_kafka_connect_cluster_metric"; public static final String CONNECT_CONNECTOR_INDEX = "ks_kafka_connect_connector_metric"; diff --git a/km-persistence/src/main/resources/es/dsl/ReplicationMetricESDAO/getAggSingleReplicationMetrics b/km-persistence/src/main/resources/es/dsl/ReplicationMetricESDAO/getAggSingleReplicationMetrics deleted file mode 100644 index 59e4abca..00000000 --- a/km-persistence/src/main/resources/es/dsl/ReplicationMetricESDAO/getAggSingleReplicationMetrics +++ /dev/null @@ -1,48 +0,0 @@ -{ - "size":0, - "query":{ - "bool":{ - "must":[ - { - "term":{ - "clusterPhyId":{ - "value":%d - } - } - }, - { - "term":{ - "brokerId":{ - "value":%d - } - } - }, - { - "term":{ - "topic":{ - "value":"%s" - } - } - }, - { - "term":{ - "partitionId":{ - "value":%d - } - } - }, - { - "range":{ - "timestamp":{ - "gte":%d, - "lte":%d - } - } - } - ] - } - }, - "aggs":{ - %s - } -} \ No newline at end of file diff --git a/km-persistence/src/main/resources/es/dsl/ReplicationMetricESDAO/getReplicationLatestMetrics b/km-persistence/src/main/resources/es/dsl/ReplicationMetricESDAO/getReplicationLatestMetrics deleted file mode 100644 index c4650a5c..00000000 --- a/km-persistence/src/main/resources/es/dsl/ReplicationMetricESDAO/getReplicationLatestMetrics +++ /dev/null @@ -1,52 +0,0 @@ -{ - "size": 1, - "query": { - "bool": { - "must": [ - { - "term": { - "clusterPhyId": { - "value": %d - } - } - }, - { - "term": { - "brokerId": { - "value": %d - } - } - }, - { - "term": { - "topic": { - "value": "%s" - } - } - }, - { - "term": { - "partitionId": { - "value": %d - } - } - }, - { - "range": { - "timestamp": { - "gte": %d, - "lte": %d - } - } - } - ] - } - }, - "sort": [ - { - "timestamp": { - "order": "desc" - } - } - ] -} \ No newline at end of file diff --git a/km-persistence/src/main/resources/es/template/ks_kafka_replication_metric b/km-persistence/src/main/resources/es/template/ks_kafka_replication_metric deleted file mode 100644 index a8ff4b53..00000000 --- a/km-persistence/src/main/resources/es/template/ks_kafka_replication_metric +++ /dev/null @@ -1,65 +0,0 @@ -{ - "order" : 10, - "index_patterns" : [ - "ks_kafka_replication_metric*" - ], - "settings" : { - "index" : { - "number_of_shards" : "10" - } - }, - "mappings" : { - "properties" : { - "brokerId" : { - "type" : "long" - }, - "partitionId" : { - "type" : "long" - }, - "routingValue" : { - "type" : "text", - "fields" : { - "keyword" : { - "ignore_above" : 256, - "type" : "keyword" - } - } - }, - "clusterPhyId" : { - "type" : "long" - }, - "topic" : { - "type" : "keyword" - }, - "metrics" : { - "properties" : { - "LogStartOffset" : { - "type" : "float" - }, - "Messages" : { - "type" : "float" - }, - "LogEndOffset" : { - "type" : "float" - } - } - }, - "key" : { - "type" : "text", - "fields" : { - "keyword" : { - "ignore_above" : 256, - "type" : "keyword" - } - } - }, - "timestamp" : { - "format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis", - "index" : true, - "type" : "date", - "doc_values" : true - } - } - }, - "aliases" : { } - } \ No newline at end of file diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/replica/ReplicaMetricsController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/replica/ReplicaMetricsController.java index 7e276aff..2b1878ea 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/replica/ReplicaMetricsController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/replica/ReplicaMetricsController.java @@ -35,7 +35,7 @@ public class ReplicaMetricsController { @PathVariable String topicName, @PathVariable Integer partitionId, @RequestBody MetricDTO dto) { - return replicationMetricService.getMetricPointsFromES(clusterPhyId, brokerId, topicName, partitionId, dto); + return Result.buildSuc(); } @ApiOperation(value = "Replica指标-单个Replica") diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ReplicationMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ReplicationMetricESDAOTest.java deleted file mode 100644 index 98224a3d..00000000 --- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ReplicationMetricESDAOTest.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.xiaojukeji.know.streaming.km.persistence.es; - -import com.xiaojukeji.know.streaming.km.KnowStreamApplicationTest; -import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO; -import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import com.xiaojukeji.know.streaming.km.persistence.es.dao.ReplicationMetricESDAO; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Map; - -public class ReplicationMetricESDAOTest extends KnowStreamApplicationTest { - - @Autowired - private ReplicationMetricESDAO replicationMetricESDAO; - - @Test - public void getReplicationLatestMetricsTest(){ - Long clusterPhyId = 2L; - Integer brokerId = 1; - String topic = "know-streaming-test-251"; - Integer partitionId = 1; - ReplicationMetricPO replicationMetricPO = replicationMetricESDAO.getReplicationLatestMetrics( - clusterPhyId, brokerId, topic, partitionId, new ArrayList<>()); - - assert null != replicationMetricPO; - } - - /** - * 测试 - * 获取集群 clusterPhyId 中每个 metric 的指定 partitionId 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值 - */ - @Test - public void getReplicationMetricsPointTest(){ - Long clusterPhyId = 2L; - Integer brokerId = 1; - String topic = "know-streaming-test-251"; - Integer partitionId = 1; - Long endTime = System.currentTimeMillis(); - Long startTime = endTime - 4 * 60 * 60 * 1000; - Map metricPointVOMap = replicationMetricESDAO.getReplicationMetricsPoint( - clusterPhyId, topic, brokerId, partitionId, Collections.emptyList(), "avg", startTime, endTime); - - assert null != metricPointVOMap; - } -}