mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
[Optimize] 优化ZK指标的获取,减少重复采集的出现 (#709)
1、避免不同集群,相同的ZK地址时,指标重复获取的情况; 2、避免集群某个ZK地址获取指标失败时,下一个周期还会继续尝试从该地址获取指标;
This commit is contained in:
46
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/ZookeeperLocalCache.java
vendored
Normal file
46
km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/ZookeeperLocalCache.java
vendored
Normal file
@@ -0,0 +1,46 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.cache;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.BaseFourLetterWordCmdData;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ZookeeperLocalCache {
|
||||
private static final Cache<String, String> fourLetterCmdFailedServerCache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(10, TimeUnit.MINUTES)
|
||||
.maximumSize(10000)
|
||||
.build();
|
||||
|
||||
private static final Cache<String, BaseFourLetterWordCmdData> fourLetterCmdDataCache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(60, TimeUnit.SECONDS)
|
||||
.maximumSize(10000)
|
||||
.build();
|
||||
|
||||
public static boolean canUse(String host, int port, String cmd) {
|
||||
String data = fourLetterCmdFailedServerCache.getIfPresent(gen4lwFailedKey(host, port, cmd));
|
||||
|
||||
return data == null;
|
||||
}
|
||||
|
||||
public static void setFailed(String host, int port, String cmd) {
|
||||
fourLetterCmdFailedServerCache.put(gen4lwFailedKey(host, port, cmd), "");
|
||||
}
|
||||
|
||||
public static BaseFourLetterWordCmdData getData(String host, int port, String cmd) {
|
||||
return fourLetterCmdDataCache.getIfPresent(gen4lwFailedKey(host, port, cmd));
|
||||
}
|
||||
|
||||
public static void putData(String host, int port, String cmd, BaseFourLetterWordCmdData cmdData) {
|
||||
fourLetterCmdDataCache.put(gen4lwFailedKey(host, port, cmd), cmdData);
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private static String gen4lwFailedKey(String host, int port, String cmd) {
|
||||
return host + "@" + port + "@" + cmd;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.Zookeepe
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.BaseFourLetterWordCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.MonitorCmdDataParser;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser;
|
||||
@@ -26,8 +27,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterw
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.cache.ZookeeperLocalCache;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
|
||||
@@ -53,6 +54,7 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD = "getMetricFromMonitorCmd";
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD = "getMetricFromServerCmd";
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX = "getMetricFromKafkaByJMX";
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_HEALTH_SERVICE = "getMetricFromHealthService";
|
||||
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
@@ -66,9 +68,6 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
|
||||
@Autowired
|
||||
private KafkaJMXClient kafkaJMXClient;
|
||||
|
||||
@Autowired
|
||||
private KafkaControllerService kafkaControllerService;
|
||||
|
||||
@Override
|
||||
protected VersionItemTypeEnum getVersionItemType() {
|
||||
return VersionItemTypeEnum.METRIC_ZOOKEEPER;
|
||||
@@ -171,24 +170,37 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
|
||||
|
||||
Result<ZookeeperMetrics> rz = null;
|
||||
for (Tuple<String, Integer> hostPort: param.getZkAddressList()) {
|
||||
Result<ServerCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
|
||||
param.getClusterPhyId(),
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
|
||||
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
|
||||
new ServerCmdDataParser()
|
||||
);
|
||||
ServerCmdData cmdData = null;
|
||||
|
||||
if (cmdDataResult.failed()) {
|
||||
rz = Result.buildFromIgnoreData(cmdDataResult);
|
||||
BaseFourLetterWordCmdData baseCmdData = ZookeeperLocalCache.getData(hostPort.getV1(), hostPort.getV2(), FourLetterWordUtil.ServerCmd);
|
||||
if (baseCmdData != null) {
|
||||
cmdData = (ServerCmdData) baseCmdData;
|
||||
} else if (ZookeeperLocalCache.canUse(hostPort.getV1(), hostPort.getV2(), FourLetterWordUtil.ServerCmd)) {
|
||||
Result<ServerCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
|
||||
param.getClusterPhyId(),
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
|
||||
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
|
||||
new ServerCmdDataParser()
|
||||
);
|
||||
|
||||
if (cmdDataResult.failed()) {
|
||||
ZookeeperLocalCache.setFailed(hostPort.getV1(), hostPort.getV2(), FourLetterWordUtil.ServerCmd);
|
||||
|
||||
rz = Result.buildFromIgnoreData(cmdDataResult);
|
||||
continue;
|
||||
}
|
||||
|
||||
cmdData = cmdDataResult.getData();
|
||||
ZookeeperLocalCache.putData(hostPort.getV1(), hostPort.getV2(), FourLetterWordUtil.ServerCmd, cmdData);
|
||||
} else {
|
||||
// baseCmdData为空 且 当前地址不可使用
|
||||
continue;
|
||||
}
|
||||
|
||||
ServerCmdData cmdData = cmdDataResult.getData();
|
||||
|
||||
ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue());
|
||||
@@ -208,24 +220,36 @@ public class ZookeeperMetricServiceImpl extends BaseMetricService implements Zoo
|
||||
|
||||
Result<ZookeeperMetrics> rz = null;
|
||||
for (Tuple<String, Integer> hostPort: param.getZkAddressList()) {
|
||||
Result<MonitorCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
|
||||
param.getClusterPhyId(),
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
|
||||
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
|
||||
new MonitorCmdDataParser()
|
||||
);
|
||||
MonitorCmdData cmdData = null;
|
||||
|
||||
if (cmdDataResult.failed()) {
|
||||
rz = Result.buildFromIgnoreData(cmdDataResult);
|
||||
BaseFourLetterWordCmdData baseCmdData = ZookeeperLocalCache.getData(hostPort.getV1(), hostPort.getV2(), FourLetterWordUtil.MonitorCmd);
|
||||
if (baseCmdData != null) {
|
||||
cmdData = (MonitorCmdData) baseCmdData;
|
||||
} else if (ZookeeperLocalCache.canUse(hostPort.getV1(), hostPort.getV2(), FourLetterWordUtil.MonitorCmd)) {
|
||||
Result<MonitorCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
|
||||
param.getClusterPhyId(),
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
|
||||
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
|
||||
new MonitorCmdDataParser()
|
||||
);
|
||||
|
||||
if (cmdDataResult.failed()) {
|
||||
ZookeeperLocalCache.setFailed(hostPort.getV1(), hostPort.getV2(), FourLetterWordUtil.MonitorCmd);
|
||||
|
||||
rz = Result.buildFromIgnoreData(cmdDataResult);
|
||||
continue;
|
||||
}
|
||||
|
||||
cmdData = cmdDataResult.getData();
|
||||
ZookeeperLocalCache.putData(hostPort.getV1(), hostPort.getV2(), FourLetterWordUtil.MonitorCmd, cmdData);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
||||
MonitorCmdData cmdData = cmdDataResult.getData();
|
||||
|
||||
ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue());
|
||||
|
||||
Reference in New Issue
Block a user