指标采集缓存时间调整

This commit is contained in:
zengqiao
2022-08-31 17:15:49 +08:00
parent f005c6bc44
commit 6e058240b3
8 changed files with 53 additions and 56 deletions

View File

@@ -10,13 +10,13 @@ import java.util.concurrent.TimeUnit;
public class CollectedMetricsLocalCache {
private static final Cache<String, Float> brokerMetricsCache = Caffeine.newBuilder()
.expireAfterWrite(60, TimeUnit.SECONDS)
.maximumSize(2000)
.expireAfterWrite(90, TimeUnit.SECONDS)
.maximumSize(10000)
.build();
private static final Cache<String, List<TopicMetrics>> topicMetricsCache = Caffeine.newBuilder()
.expireAfterWrite(90, TimeUnit.SECONDS)
.maximumSize(5000)
.maximumSize(10000)
.build();
private static final Cache<String, List<PartitionMetrics>> partitionMetricsCache = Caffeine.newBuilder()
@@ -29,63 +29,64 @@ public class CollectedMetricsLocalCache {
.maximumSize(20000)
.build();
public static Float getBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName) {
return brokerMetricsCache.getIfPresent(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName));
public static Float getBrokerMetrics(String brokerMetricKey) {
return brokerMetricsCache.getIfPresent(brokerMetricKey);
}
public static void putBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName, Float value) {
public static void putBrokerMetrics(String brokerMetricKey, Float value) {
if (value == null) {
return;
}
brokerMetricsCache.put(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value);
brokerMetricsCache.put(brokerMetricKey, value);
}
public static List<TopicMetrics> getTopicMetrics(Long clusterPhyId, String topicName, String metricName) {
return topicMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
public static List<TopicMetrics> getTopicMetrics(String topicMetricKey) {
return topicMetricsCache.getIfPresent(topicMetricKey);
}
public static void putTopicMetrics(Long clusterPhyId, String topicName, String metricName, List<TopicMetrics> metricsList) {
public static void putTopicMetrics(String topicMetricKey, List<TopicMetrics> metricsList) {
if (metricsList == null) {
return;
}
topicMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
topicMetricsCache.put(topicMetricKey, metricsList);
}
public static List<PartitionMetrics> getPartitionMetricsList(Long clusterPhyId, String topicName, String metricName) {
return partitionMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
public static List<PartitionMetrics> getPartitionMetricsList(String partitionMetricKey) {
return partitionMetricsCache.getIfPresent(partitionMetricKey);
}
public static void putPartitionMetricsList(Long clusterPhyId, String topicName, String metricName, List<PartitionMetrics> metricsList) {
public static void putPartitionMetricsList(String partitionMetricsKey, List<PartitionMetrics> metricsList) {
if (metricsList == null) {
return;
}
partitionMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
partitionMetricsCache.put(partitionMetricsKey, metricsList);
}
public static Float getReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
return replicaMetricsValueCache.getIfPresent(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName));
public static Float getReplicaMetrics(String replicaMetricsKey) {
return replicaMetricsValueCache.getIfPresent(replicaMetricsKey);
}
public static void putReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName, Float value) {
public static void putReplicaMetrics(String replicaMetricsKey, Float value) {
if (value == null) {
return;
}
replicaMetricsValueCache.put(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value);
replicaMetricsValueCache.put(replicaMetricsKey, value);
}
/**************************************************** private method ****************************************************/
private static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
return clusterPhyId + "@" + brokerId + "@" + metricName;
}
private static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
public static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
return clusterPhyId + "@" + topicName + "@" + metricName;
}
private static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
public static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
return clusterPhyId + "@" + brokerId + "@" + topicName + "@" + partitionId + "@" + metricName;
}
/**************************************************** private method ****************************************************/
}

View File

@@ -110,9 +110,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
}
@Override
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric){
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric) {
String brokerMetricKey = CollectedMetricsLocalCache.genBrokerMetricKey(clusterId, brokerId, metric);
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric);
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(brokerMetricKey);
if(null != keyValue) {
BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId);
brokerMetrics.putMetric(metric, keyValue);
@@ -124,7 +125,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
Map<String, Float> metricsMap = ret.getData().getMetrics();
for(Map.Entry<String, Float> metricNameAndValueEntry : metricsMap.entrySet()){
CollectedMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue());
CollectedMetricsLocalCache.putBrokerMetrics(brokerMetricKey, metricNameAndValueEntry.getValue());
}
return ret;

View File

@@ -126,7 +126,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
private TopicMetricService topicMetricService;
@Autowired
private TopicService topicService;
private TopicService topicService;
@Autowired
private PartitionService partitionService;
@@ -728,13 +728,10 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
Long clusterId = param.getClusterId();
//1、获取jmx的属性信息
VersionJmxInfo jmxInfo = getJMXInfo(clusterId, metric);
if(null == jmxInfo){return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);}
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
float metricVale = 0f;
for(Broker broker : brokers){
for(Broker broker : brokers) {
Result<BrokerMetrics> ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterId, broker.getBrokerId(), metric);
if(null == ret || ret.failed() || null == ret.getData()){continue;}

View File

@@ -75,7 +75,9 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
@Override
public Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName) {
List<PartitionMetrics> metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(clusterPhyId, topicName, metricName);
String partitionMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName);
List<PartitionMetrics> metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(partitionMetricsKey);
if(null != metricsList) {
return Result.buildSuc(metricsList);
}
@@ -88,12 +90,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
// 更新cache
PartitionMetrics metrics = metricsResult.getData().get(0);
metrics.getMetrics().entrySet().forEach(
metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(
clusterPhyId,
metrics.getTopic(),
metricEntry.getKey(),
metricsResult.getData()
)
metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(partitionMetricsKey, metricsResult.getData())
);
return metricsResult;

View File

@@ -77,9 +77,14 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
}
@Override
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic,
Integer brokerId, Integer partitionId, String metric){
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(clusterPhyId, brokerId, topic, partitionId, metric);
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId,
String topic,
Integer brokerId,
Integer partitionId,
String metric) {
String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric);
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey);
if(null != keyValue){
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId);
replicationMetrics.putMetric(metric, keyValue);
@@ -92,11 +97,7 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
// 更新cache
ret.getData().getMetrics().entrySet().stream().forEach(
metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics(
clusterPhyId,
brokerId,
topic,
partitionId,
metricNameAndValueEntry.getKey(),
replicaMetricsKey,
metricNameAndValueEntry.getValue()
)
);

View File

@@ -120,7 +120,9 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
@Override
public Result<List<TopicMetrics>> collectTopicMetricsFromKafkaWithCacheFirst(Long clusterPhyId, String topicName, String metricName) {
List<TopicMetrics> metricsList = CollectedMetricsLocalCache.getTopicMetrics(clusterPhyId, topicName, metricName);
String topicMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName);
List<TopicMetrics> metricsList = CollectedMetricsLocalCache.getTopicMetrics(topicMetricsKey);
if(null != metricsList) {
return Result.buildSuc(metricsList);
}
@@ -133,12 +135,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
// 更新cache
TopicMetrics metrics = metricsResult.getData().get(0);
metrics.getMetrics().entrySet().forEach(
metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(
clusterPhyId,
metrics.getTopic(),
metricEntry.getKey(),
metricsResult.getData()
)
metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(topicMetricsKey, metricsResult.getData())
);
return metricsResult;

View File

@@ -35,6 +35,7 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster";
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster";
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster";
public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize";
public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize";
public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";