From cafd665a2d3a492a3855af336e347dbaa95005bc Mon Sep 17 00:00:00 2001 From: zengqiao Date: Fri, 21 Oct 2022 11:47:28 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=20=E5=88=A0=E9=99=A4Replica=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E9=87=87=E9=9B=86=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、当集群存在较多副本时,指标采集的性能会严重降低; 2、Replica的指标基本上都是在实时获取时才需要,因此当前先将Replica指标采集任务关闭,后续依据产品需要再看是否开启; --- .../metric/ReplicaMetricCollector.java | 2 +- .../cache/CollectedMetricsLocalCache.java | 16 ----- .../broker/impl/BrokerMetricServiceImpl.java | 39 +++++++++-- .../service/replica/ReplicaMetricService.java | 4 +- .../impl/ReplicaMetricServiceImpl.java | 55 ++++++++-------- .../v3/replica/ReplicaMetricsController.java | 3 +- .../metrics/ReplicaMetricCollectorTask.java | 64 +++++++++---------- 7 files changed, 100 insertions(+), 83 deletions(-) diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ReplicaMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ReplicaMetricCollector.java index 5f712f93..3f9e0035 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ReplicaMetricCollector.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ReplicaMetricCollector.java @@ -91,7 +91,7 @@ public class ReplicaMetricCollector extends AbstractMetricCollector ret = replicaMetricService.collectReplicaMetricsFromKafkaWithCache( + Result ret = replicaMetricService.collectReplicaMetricsFromKafka( clusterPhyId, metrics.getTopic(), metrics.getBrokerId(), diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java index bc5b1c34..2fc0a4ff 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java @@ -24,11 +24,6 @@ public class CollectedMetricsLocalCache { .maximumSize(10000) .build(); - private static final Cache replicaMetricsValueCache = Caffeine.newBuilder() - .expireAfterWrite(90, TimeUnit.SECONDS) - .maximumSize(20000) - .build(); - public static Float getBrokerMetrics(String brokerMetricKey) { return brokerMetricsCache.getIfPresent(brokerMetricKey); } @@ -64,17 +59,6 @@ public class CollectedMetricsLocalCache { partitionMetricsCache.put(partitionMetricsKey, metricsList); } - public static Float getReplicaMetrics(String replicaMetricsKey) { - return replicaMetricsValueCache.getIfPresent(replicaMetricsKey); - } - - public static void putReplicaMetrics(String replicaMetricsKey, Float value) { - if (value == null) { - return; - } - replicaMetricsValueCache.put(replicaMetricsKey, value); - } - public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) { return clusterPhyId + "@" + brokerId + "@" + metricName; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java index 93c343ff..e82882e1 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java @@ -37,6 +37,8 @@ import com.xiaojukeji.know.streaming.km.core.service.version.metrics.BrokerMetri import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ReplicaMetricVersionItems; import com.xiaojukeji.know.streaming.km.persistence.es.dao.BrokerMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.clients.admin.ReplicaInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -49,6 +51,7 @@ import java.util.*; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*; /** * @author didi @@ -105,7 +108,11 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker registerVCHandler( BROKER_METHOD_GET_HEALTH_SCORE, this::getMetricHealthScore); registerVCHandler( BROKER_METHOD_GET_PARTITIONS_SKEW, this::getPartitionsSkew); registerVCHandler( BROKER_METHOD_GET_LEADERS_SKEW, this::getLeadersSkew); - registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize); +// registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize); + + registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_0_10_0_0, V_1_0_0, "getLogSizeFromJmx", this::getLogSizeFromJmx); + registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_1_0_0, V_MAX, "getLogSizeFromClient", this::getLogSizeFromClient); + registerVCHandler( BROKER_METHOD_IS_BROKER_ALIVE, this::isBrokerAlive); } @@ -351,7 +358,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker ); } - private Result getLogSize(VersionItemParam metricParam) { + private Result getLogSizeFromJmx(VersionItemParam metricParam) { BrokerMetricParam param = (BrokerMetricParam)metricParam; String metric = param.getMetric(); @@ -360,19 +367,17 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker List partitions = partitionService.listPartitionByBroker(clusterId, brokerId); - JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterId, brokerId); - if (ValidateUtils.isNull(jmxConnectorWrap)){return Result.buildFailure(VC_JMX_INIT_ERROR);} - Float logSizeSum = 0f; for(Partition p : partitions) { try { - Result metricsResult = replicaMetricService.collectReplicaMetricsFromKafkaWithCache( + Result metricsResult = replicaMetricService.collectReplicaMetricsFromKafka( clusterId, p.getTopicName(), brokerId, p.getPartitionId(), ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE ); + if(null == metricsResult || metricsResult.failed() || null == metricsResult.getData()) { continue; } @@ -391,6 +396,28 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker return Result.buildSuc(BrokerMetrics.initWithMetric(clusterId, brokerId, metric, logSizeSum)); } + private Result getLogSizeFromClient(VersionItemParam metricParam) { + BrokerMetricParam param = (BrokerMetricParam)metricParam; + + String metric = param.getMetric(); + Long clusterId = param.getClusterId(); + Integer brokerId = param.getBrokerId(); + + Result> descriptionMapResult = brokerService.getBrokerLogDirDescFromKafka(clusterId, brokerId); + if(null == descriptionMapResult || descriptionMapResult.failed() || null == descriptionMapResult.getData()) { + return Result.buildFromIgnoreData(descriptionMapResult); + } + + Float logSizeSum = 0f; + for (LogDirDescription logDirDescription: descriptionMapResult.getData().values()) { + for (ReplicaInfo replicaInfo: logDirDescription.replicaInfos().values()) { + logSizeSum += replicaInfo.size(); + } + } + + return Result.buildSuc(BrokerMetrics.initWithMetric(clusterId, brokerId, metric, logSizeSum)); + } + private Result getLeadersSkew(VersionItemParam metricParam) { BrokerMetricParam param = (BrokerMetricParam)metricParam; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/ReplicaMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/ReplicaMetricService.java index 987303f8..c0a44586 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/ReplicaMetricService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/ReplicaMetricService.java @@ -13,12 +13,14 @@ public interface ReplicaMetricService { * 从kafka中采集指标 */ Result collectReplicaMetricsFromKafka(Long clusterId, String topic, Integer partitionId, Integer brokerId, String metric); - Result collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic, Integer brokerId, Integer partitionId, String metric); + Result collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List metricNameList); /** * 从ES中获取指标 */ + @Deprecated Result> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, MetricDTO dto); + @Deprecated Result getLatestMetricsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, List metricNames); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java index 460e6520..848c8601 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java @@ -17,7 +17,6 @@ import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; @@ -77,32 +76,36 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli } @Override - public Result collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, - String topic, - Integer brokerId, - Integer partitionId, - String metric) { - String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric); + public Result collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List metricNameList) { + ReplicationMetrics metrics = new ReplicationMetrics(clusterId, topicName, brokerId, partitionId); + for (String metricName: metricNameList) { + try { + if (metrics.getMetrics().containsKey(metricName)) { + continue; + } - Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey); - if(null != keyValue){ - ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId); - replicationMetrics.putMetric(metric, keyValue); - return Result.buildSuc(replicationMetrics); + Result ret = this.collectReplicaMetricsFromKafka( + clusterId, + metrics.getTopic(), + metrics.getBrokerId(), + metrics.getPartitionId(), + metricName + ); + + if (null == ret || ret.failed() || null == ret.getData()) { + continue; + } + + metrics.putMetric(ret.getData().getMetrics()); + } catch (Exception e) { + LOGGER.error( + "method=collectReplicaMetricsFromKafka||clusterPhyId={}||topicName={}||partition={}||brokerId={}||metricName={}||errMsg=exception!", + clusterId, topicName, partitionId, brokerId, e + ); + } } - Result ret = collectReplicaMetricsFromKafka(clusterPhyId, topic, partitionId, brokerId, metric); - if(null == ret || ret.failed() || null == ret.getData()){return ret;} - - // 更新cache - ret.getData().getMetrics().entrySet().stream().forEach( - metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics( - replicaMetricsKey, - metricNameAndValueEntry.getValue() - ) - ); - - return ret; + return Result.buildSuc(metrics); } @Override @@ -167,8 +170,8 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli Integer brokerId = metricParam.getBrokerId(); Integer partitionId = metricParam.getPartitionId(); - Result endRet = this.collectReplicaMetricsFromKafkaWithCache(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET); - Result startRet = this.collectReplicaMetricsFromKafkaWithCache(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET); + Result endRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET); + Result startRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET); ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterId, topic, brokerId, partitionId); if(null != endRet && endRet.successful() && null != startRet && startRet.successful()){ diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/replica/ReplicaMetricsController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/replica/ReplicaMetricsController.java index 13e2c855..7e276aff 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/replica/ReplicaMetricsController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/replica/ReplicaMetricsController.java @@ -26,6 +26,7 @@ public class ReplicaMetricsController { @Autowired private ReplicaMetricService replicationMetricService; + @Deprecated @ApiOperation(value = "Replica指标-单个Replica") @PostMapping(value = "clusters/{clusterPhyId}/brokers/{brokerId}/topics/{topicName}/partitions/{partitionId}/metric-points") @ResponseBody @@ -45,7 +46,7 @@ public class ReplicaMetricsController { @PathVariable String topicName, @PathVariable Integer partitionId, @RequestBody List metricsNames) { - Result metricsResult = replicationMetricService.getLatestMetricsFromES(clusterPhyId, brokerId, topicName, partitionId, metricsNames); + Result metricsResult = replicationMetricService.collectReplicaMetricsFromKafka(clusterPhyId, topicName, partitionId, brokerId, metricsNames); if (metricsResult.failed()) { return Result.buildFromIgnoreData(metricsResult); } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ReplicaMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ReplicaMetricCollectorTask.java index 6b93e324..7e52c2f4 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ReplicaMetricCollectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ReplicaMetricCollectorTask.java @@ -1,32 +1,32 @@ -package com.xiaojukeji.know.streaming.km.task.metrics; - -import com.didiglobal.logi.job.annotation.Task; -import com.didiglobal.logi.job.common.TaskResult; -import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; - -/** - * @author didi - */ -@Slf4j -@Task(name = "ReplicaMetricCollectorTask", - description = "Replica指标采集任务", - cron = "0 0/1 * * * ? *", - autoRegister = true, - consensual = ConsensualEnum.BROADCAST, - timeout = 2 * 60) -public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { - - @Autowired - private ReplicaMetricCollector replicaMetricCollector; - - @Override - public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - replicaMetricCollector.collectMetrics(clusterPhy); - - return TaskResult.SUCCESS; - } -} +//package com.xiaojukeji.know.streaming.km.task.metrics; +// +//import com.didiglobal.logi.job.annotation.Task; +//import com.didiglobal.logi.job.common.TaskResult; +//import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +//import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector; +//import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.beans.factory.annotation.Autowired; +// +///** +// * @author didi +// */ +//@Slf4j +//@Task(name = "ReplicaMetricCollectorTask", +// description = "Replica指标采集任务", +// cron = "0 0/1 * * * ? *", +// autoRegister = true, +// consensual = ConsensualEnum.BROADCAST, +// timeout = 2 * 60) +//public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { +// +// @Autowired +// private ReplicaMetricCollector replicaMetricCollector; +// +// @Override +// public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { +// replicaMetricCollector.collectMetrics(clusterPhy); +// +// return TaskResult.SUCCESS; +// } +//}