[Optimize] 删除Replica指标采集任务

1、当集群存在较多副本时,指标采集的性能会严重降低;
2、Replica的指标基本上都是在实时获取时才需要,因此当前先将Replica指标采集任务关闭,后续依据产品需要再看是否开启;
This commit is contained in:
zengqiao
2022-10-21 11:47:28 +08:00
committed by EricZeng
parent e8f77a456b
commit cafd665a2d
7 changed files with 100 additions and 83 deletions

View File

@@ -24,11 +24,6 @@ public class CollectedMetricsLocalCache {
.maximumSize(10000)
.build();
private static final Cache<String, Float> replicaMetricsValueCache = Caffeine.newBuilder()
.expireAfterWrite(90, TimeUnit.SECONDS)
.maximumSize(20000)
.build();
public static Float getBrokerMetrics(String brokerMetricKey) {
return brokerMetricsCache.getIfPresent(brokerMetricKey);
}
@@ -64,17 +59,6 @@ public class CollectedMetricsLocalCache {
partitionMetricsCache.put(partitionMetricsKey, metricsList);
}
public static Float getReplicaMetrics(String replicaMetricsKey) {
return replicaMetricsValueCache.getIfPresent(replicaMetricsKey);
}
public static void putReplicaMetrics(String replicaMetricsKey, Float value) {
if (value == null) {
return;
}
replicaMetricsValueCache.put(replicaMetricsKey, value);
}
public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
return clusterPhyId + "@" + brokerId + "@" + metricName;
}

View File

@@ -37,6 +37,8 @@ import com.xiaojukeji.know.streaming.km.core.service.version.metrics.BrokerMetri
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ReplicaMetricVersionItems;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BrokerMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@@ -49,6 +51,7 @@ import java.util.*;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
/**
* @author didi
@@ -105,7 +108,11 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
registerVCHandler( BROKER_METHOD_GET_HEALTH_SCORE, this::getMetricHealthScore);
registerVCHandler( BROKER_METHOD_GET_PARTITIONS_SKEW, this::getPartitionsSkew);
registerVCHandler( BROKER_METHOD_GET_LEADERS_SKEW, this::getLeadersSkew);
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize);
// registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize);
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_0_10_0_0, V_1_0_0, "getLogSizeFromJmx", this::getLogSizeFromJmx);
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_1_0_0, V_MAX, "getLogSizeFromClient", this::getLogSizeFromClient);
registerVCHandler( BROKER_METHOD_IS_BROKER_ALIVE, this::isBrokerAlive);
}
@@ -351,7 +358,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
);
}
private Result<BrokerMetrics> getLogSize(VersionItemParam metricParam) {
private Result<BrokerMetrics> getLogSizeFromJmx(VersionItemParam metricParam) {
BrokerMetricParam param = (BrokerMetricParam)metricParam;
String metric = param.getMetric();
@@ -360,19 +367,17 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
List<Partition> partitions = partitionService.listPartitionByBroker(clusterId, brokerId);
JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterId, brokerId);
if (ValidateUtils.isNull(jmxConnectorWrap)){return Result.buildFailure(VC_JMX_INIT_ERROR);}
Float logSizeSum = 0f;
for(Partition p : partitions) {
try {
Result<ReplicationMetrics> metricsResult = replicaMetricService.collectReplicaMetricsFromKafkaWithCache(
Result<ReplicationMetrics> metricsResult = replicaMetricService.collectReplicaMetricsFromKafka(
clusterId,
p.getTopicName(),
brokerId,
p.getPartitionId(),
ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE
);
if(null == metricsResult || metricsResult.failed() || null == metricsResult.getData()) {
continue;
}
@@ -391,6 +396,28 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
return Result.buildSuc(BrokerMetrics.initWithMetric(clusterId, brokerId, metric, logSizeSum));
}
private Result<BrokerMetrics> getLogSizeFromClient(VersionItemParam metricParam) {
BrokerMetricParam param = (BrokerMetricParam)metricParam;
String metric = param.getMetric();
Long clusterId = param.getClusterId();
Integer brokerId = param.getBrokerId();
Result<Map<String, LogDirDescription>> descriptionMapResult = brokerService.getBrokerLogDirDescFromKafka(clusterId, brokerId);
if(null == descriptionMapResult || descriptionMapResult.failed() || null == descriptionMapResult.getData()) {
return Result.buildFromIgnoreData(descriptionMapResult);
}
Float logSizeSum = 0f;
for (LogDirDescription logDirDescription: descriptionMapResult.getData().values()) {
for (ReplicaInfo replicaInfo: logDirDescription.replicaInfos().values()) {
logSizeSum += replicaInfo.size();
}
}
return Result.buildSuc(BrokerMetrics.initWithMetric(clusterId, brokerId, metric, logSizeSum));
}
private Result<BrokerMetrics> getLeadersSkew(VersionItemParam metricParam) {
BrokerMetricParam param = (BrokerMetricParam)metricParam;

View File

@@ -13,12 +13,14 @@ public interface ReplicaMetricService {
* 从kafka中采集指标
*/
Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topic, Integer partitionId, Integer brokerId, String metric);
Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic, Integer brokerId, Integer partitionId, String metric);
Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List<String> metricNameList);
/**
* 从ES中获取指标
*/
@Deprecated
Result<List<MetricPointVO>> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, MetricDTO dto);
@Deprecated
Result<ReplicationMetrics> getLatestMetricsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, List<String> metricNames);
}

View File

@@ -17,7 +17,6 @@ import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
@@ -77,32 +76,36 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
}
@Override
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId,
String topic,
Integer brokerId,
Integer partitionId,
String metric) {
String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric);
public Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List<String> metricNameList) {
ReplicationMetrics metrics = new ReplicationMetrics(clusterId, topicName, brokerId, partitionId);
for (String metricName: metricNameList) {
try {
if (metrics.getMetrics().containsKey(metricName)) {
continue;
}
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey);
if(null != keyValue){
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId);
replicationMetrics.putMetric(metric, keyValue);
return Result.buildSuc(replicationMetrics);
Result<ReplicationMetrics> ret = this.collectReplicaMetricsFromKafka(
clusterId,
metrics.getTopic(),
metrics.getBrokerId(),
metrics.getPartitionId(),
metricName
);
if (null == ret || ret.failed() || null == ret.getData()) {
continue;
}
metrics.putMetric(ret.getData().getMetrics());
} catch (Exception e) {
LOGGER.error(
"method=collectReplicaMetricsFromKafka||clusterPhyId={}||topicName={}||partition={}||brokerId={}||metricName={}||errMsg=exception!",
clusterId, topicName, partitionId, brokerId, e
);
}
}
Result<ReplicationMetrics> ret = collectReplicaMetricsFromKafka(clusterPhyId, topic, partitionId, brokerId, metric);
if(null == ret || ret.failed() || null == ret.getData()){return ret;}
// 更新cache
ret.getData().getMetrics().entrySet().stream().forEach(
metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics(
replicaMetricsKey,
metricNameAndValueEntry.getValue()
)
);
return ret;
return Result.buildSuc(metrics);
}
@Override
@@ -167,8 +170,8 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
Integer brokerId = metricParam.getBrokerId();
Integer partitionId = metricParam.getPartitionId();
Result<ReplicationMetrics> endRet = this.collectReplicaMetricsFromKafkaWithCache(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET);
Result<ReplicationMetrics> startRet = this.collectReplicaMetricsFromKafkaWithCache(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET);
Result<ReplicationMetrics> endRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET);
Result<ReplicationMetrics> startRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET);
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterId, topic, brokerId, partitionId);
if(null != endRet && endRet.successful() && null != startRet && startRet.successful()){