mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-02 18:32:08 +08:00
ZK-指标采集入ES
This commit is contained in:
@@ -343,17 +343,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
|
||||
private Broker getStartTimeAndBuildBroker(Long clusterPhyId, Node newNode, JmxConfig jmxConfig) {
|
||||
try {
|
||||
Object object = jmxDAO.getJmxValue(
|
||||
clusterPhyId,
|
||||
newNode.id(),
|
||||
newNode.host(),
|
||||
null,
|
||||
jmxConfig,
|
||||
new ObjectName("java.lang:type=Runtime"),
|
||||
"StartTime"
|
||||
);
|
||||
Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), null, jmxConfig);
|
||||
|
||||
return Broker.buildFrom(clusterPhyId, newNode, object != null? (Long) object: null);
|
||||
return Broker.buildFrom(clusterPhyId, newNode, startTime);
|
||||
} catch (Exception e) {
|
||||
log.error("class=BrokerServiceImpl||method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,141 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.version.metrics;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_ZOOKEEPER;
|
||||
import static com.xiaojukeji.know.streaming.km.common.jmx.JmxAttribute.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.jmx.JmxName.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl.ZookeeperMetricServiceImpl.*;
|
||||
|
||||
@Component
|
||||
public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric {
|
||||
|
||||
/**
|
||||
* 性能
|
||||
*/
|
||||
public static final String ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY = "AvgRequestLatency";
|
||||
public static final String ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY = "MinRequestLatency";
|
||||
public static final String ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY = "MaxRequestLatency";
|
||||
public static final String ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS = "OutstandingRequests";
|
||||
public static final String ZOOKEEPER_METRIC_NODE_COUNT = "NodeCount";
|
||||
public static final String ZOOKEEPER_METRIC_WATCH_COUNT = "WatchCount";
|
||||
public static final String ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS = "NumAliveConnections";
|
||||
public static final String ZOOKEEPER_METRIC_PACKETS_RECEIVED = "PacketsReceived";
|
||||
public static final String ZOOKEEPER_METRIC_PACKETS_SENT = "PacketsSent";
|
||||
public static final String ZOOKEEPER_METRIC_EPHEMERALS_COUNT = "EphemeralsCount";
|
||||
public static final String ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE = "ApproximateDataSize";
|
||||
public static final String ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT = "OpenFileDescriptorCount";
|
||||
public static final String ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT = "MaxFileDescriptorCount";
|
||||
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC = "KafkaZKDisconnectsPerSec";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC = "KafkaZKSyncConnectsPerSec";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH = "KafkaZKRequestLatencyMs_99thPercentile";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX = "KafkaZKRequestLatencyMs_Max";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN = "KafkaZKRequestLatencyMs_Mean";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN = "KafkaZKRequestLatencyMs_Min";
|
||||
|
||||
|
||||
public static final String ZOOKEEPER_METRIC_COLLECT_COST_TIME = Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME;
|
||||
|
||||
@Override
|
||||
public int versionItemType() {
|
||||
return METRIC_ZOOKEEPER.getCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<VersionMetricControlItem> init(){
|
||||
List<VersionMetricControlItem> items = new ArrayList<>();
|
||||
|
||||
// 性能指标
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY).unit("ms").desc("平均响应延迟").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY).unit("ms").desc("最小响应延迟").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY).unit("ms").desc("最大响应延迟").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS).unit("个").desc("堆积请求数").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_NODE_COUNT).unit("个").desc("ZNode数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_WATCH_COUNT).unit("个").desc("Watch数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS).unit("个").desc("客户端连接数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_PACKETS_RECEIVED).unit("个").desc("接受包的数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_PACKETS_SENT).unit("个").desc("发送包的数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_EPHEMERALS_COUNT).unit("个").desc("临时节点数").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE).unit("byte").desc("文件大小(近似值)").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT).unit("个").desc("已打开的文件描述符数").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT).unit("个").desc("允许打开的最大文件描述符数").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
// JMX指标
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH).unit("ms").desc("ZK请求99分位延迟").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(PERCENTILE_99)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX).unit("ms").desc("ZK请求最大延迟").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MAX)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN).unit("ms").desc("ZK请求最小延迟").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MIN)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN).unit("ms").desc("ZK请求平均延迟").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MEAN)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC).unit("个").desc("断开连接数").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_DISCONNECTORS_PER_SEC ).jmxAttribute(RATE_MIN_1)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC).unit("个").desc("同步连接数").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_SYNC_CONNECTS_PER_SEC ).jmxAttribute(RATE_MIN_1)));
|
||||
return items;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.zookeeper;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface ZookeeperMetricService {
|
||||
/**
|
||||
* ZK指标获取
|
||||
* @param param 参数,因为ZK 四字命令在使用时,是短连接,所以参数内容会复杂一些,后续可以考虑优化为长连接
|
||||
* @return
|
||||
*/
|
||||
Result<ZookeeperMetrics> collectMetricsFromZookeeper(ZookeeperMetricParam param);
|
||||
Result<ZookeeperMetrics> batchCollectMetricsFromZookeeper(Long clusterPhyId, List<String> metricNameList);
|
||||
|
||||
Result<List<MetricLineVO>> listMetricsFromES(Long clusterPhyId, MetricDTO dto);
|
||||
}
|
||||
@@ -0,0 +1,281 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.MonitorCmdDataParser;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
|
||||
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ZookeeperMetricESDAO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_JMX_CONNECT_ERROR;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems.*;
|
||||
|
||||
|
||||
@Service
|
||||
public class ZookeeperMetricServiceImpl extends BaseMetricService implements ZookeeperMetricService {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ZookeeperMetricServiceImpl.class);
|
||||
|
||||
public static final String ZOOKEEPER_METHOD_DO_NOTHING = "doNothing";
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD = "getMetricFromMonitorCmd";
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD = "getMetricFromServerCmd";
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX = "getMetricFromKafkaByJMX";
|
||||
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperService zookeeperService;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperMetricESDAO zookeeperMetricESDAO;
|
||||
|
||||
@Autowired
|
||||
private KafkaJMXClient kafkaJMXClient;
|
||||
|
||||
@Autowired
|
||||
private KafkaControllerService kafkaControllerService;
|
||||
|
||||
@Override
|
||||
protected VersionItemTypeEnum getVersionItemType() {
|
||||
return VersionItemTypeEnum.METRIC_ZOOKEEPER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> listMetricPOFields(){
|
||||
return BeanUtil.listBeanFields(ZookeeperMetricPO.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initRegisterVCHandler(){
|
||||
registerVCHandler( ZOOKEEPER_METHOD_DO_NOTHING, this::doNothing);
|
||||
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD, this::getMetricFromMonitorCmd);
|
||||
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD, this::getMetricFromServerCmd);
|
||||
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX, this::getMetricFromKafkaByJMX);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<ZookeeperMetrics> collectMetricsFromZookeeper(ZookeeperMetricParam param) {
|
||||
try {
|
||||
return (Result<ZookeeperMetrics>)doVCHandler(param.getClusterPhyId(), param.getMetricName(), param);
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<ZookeeperMetrics> batchCollectMetricsFromZookeeper(Long clusterPhyId, List<String> metricNameList) {
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
|
||||
if (null == clusterPhy) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
|
||||
}
|
||||
|
||||
List<ZookeeperInfo> aliveZKList = zookeeperService.listFromDBByCluster(clusterPhyId).stream()
|
||||
.filter(elem -> Constant.ALIVE.equals(elem.getStatus()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (ValidateUtils.isEmptyList(aliveZKList)) {
|
||||
// 没有指标可以获取
|
||||
return Result.buildSuc(new ZookeeperMetrics(clusterPhyId));
|
||||
}
|
||||
|
||||
// 构造参数
|
||||
ZookeeperMetricParam param = new ZookeeperMetricParam(
|
||||
clusterPhyId,
|
||||
aliveZKList.stream().map(elem -> new Tuple<String, Integer>(elem.getHost(), elem.getPort())).collect(Collectors.toList()),
|
||||
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class),
|
||||
null
|
||||
);
|
||||
|
||||
ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId);
|
||||
for(String metricName : metricNameList) {
|
||||
try {
|
||||
if(metrics.getMetrics().containsKey(metricName)) {
|
||||
continue;
|
||||
}
|
||||
param.setMetricName(metricName);
|
||||
|
||||
Result<ZookeeperMetrics> ret = this.collectMetricsFromZookeeper(param);
|
||||
if(null == ret || ret.failed() || null == ret.getData()){
|
||||
continue;
|
||||
}
|
||||
|
||||
metrics.putMetric(ret.getData().getMetrics());
|
||||
} catch (Exception e){
|
||||
LOGGER.error(
|
||||
"class=ZookeeperMetricServiceImpl||method=collectMetricsFromZookeeper||clusterPhyId={}||metricName={}||errMsg=exception!",
|
||||
clusterPhyId, metricName, e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return Result.buildSuc(metrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<List<MetricLineVO>> listMetricsFromES(Long clusterPhyId, MetricDTO dto) {
|
||||
Map<String/*metricName*/, List<MetricPointVO>> pointVOMap = zookeeperMetricESDAO.listMetricsByClusterPhyId(
|
||||
clusterPhyId,
|
||||
dto.getMetricsNames(),
|
||||
dto.getAggType(),
|
||||
dto.getStartTime(),
|
||||
dto.getEndTime()
|
||||
);
|
||||
|
||||
// 格式转化
|
||||
List<MetricLineVO> voList = new ArrayList<>();
|
||||
pointVOMap.entrySet().stream().forEach(entry ->
|
||||
voList.add(new MetricLineVO(String.valueOf(clusterPhyId), entry.getKey(), entry.getValue()))
|
||||
);
|
||||
return Result.buildSuc(voList);
|
||||
}
|
||||
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private Result<ZookeeperMetrics> getMetricFromServerCmd(VersionItemParam metricParam) {
|
||||
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
|
||||
|
||||
Result<ZookeeperMetrics> rz = null;
|
||||
for (Tuple<String, Integer> hostPort: param.getZkAddressList()) {
|
||||
Result<ServerCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
|
||||
param.getClusterPhyId(),
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
|
||||
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
|
||||
new ServerCmdDataParser()
|
||||
);
|
||||
|
||||
if (cmdDataResult.failed()) {
|
||||
rz = Result.buildFromIgnoreData(cmdDataResult);
|
||||
continue;
|
||||
}
|
||||
|
||||
ServerCmdData cmdData = cmdDataResult.getData();
|
||||
|
||||
ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue());
|
||||
|
||||
return Result.buildSuc(metrics);
|
||||
}
|
||||
|
||||
return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId()));
|
||||
}
|
||||
|
||||
private Result<ZookeeperMetrics> getMetricFromMonitorCmd(VersionItemParam metricParam) {
|
||||
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
|
||||
|
||||
Result<ZookeeperMetrics> rz = null;
|
||||
for (Tuple<String, Integer> hostPort: param.getZkAddressList()) {
|
||||
Result<MonitorCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
|
||||
param.getClusterPhyId(),
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
|
||||
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
|
||||
new MonitorCmdDataParser()
|
||||
);
|
||||
|
||||
if (cmdDataResult.failed()) {
|
||||
rz = Result.buildFromIgnoreData(cmdDataResult);
|
||||
continue;
|
||||
}
|
||||
|
||||
MonitorCmdData cmdData = cmdDataResult.getData();
|
||||
|
||||
ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_WATCH_COUNT, cmdData.getZkWatchCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_EPHEMERALS_COUNT, cmdData.getZkEphemeralsCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE, cmdData.getZkApproximateDataSize().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT, cmdData.getZkOpenFileDescriptorCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT, cmdData.getZkMaxFileDescriptorCount().floatValue());
|
||||
|
||||
return Result.buildSuc(metrics);
|
||||
}
|
||||
|
||||
return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId()));
|
||||
}
|
||||
|
||||
private Result<ZookeeperMetrics> doNothing(VersionItemParam metricParam) {
|
||||
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
|
||||
return Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId()));
|
||||
}
|
||||
|
||||
private Result<ZookeeperMetrics> getMetricFromKafkaByJMX(VersionItemParam metricParam) {
|
||||
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
|
||||
|
||||
String metricName = param.getMetricName();
|
||||
Long clusterPhyId = param.getClusterPhyId();
|
||||
Integer kafkaControllerId = param.getKafkaControllerId();
|
||||
|
||||
//1、获取jmx的属性信息
|
||||
VersionJmxInfo jmxInfo = getJMXInfo(clusterPhyId, metricName);
|
||||
if(null == jmxInfo) {
|
||||
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
|
||||
}
|
||||
|
||||
//2、获取jmx连接
|
||||
JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterPhyId, kafkaControllerId);
|
||||
if (ValidateUtils.isNull(jmxConnectorWrap)) {
|
||||
return Result.buildFailure(VC_JMX_INIT_ERROR);
|
||||
}
|
||||
|
||||
try {
|
||||
//2、获取jmx指标
|
||||
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString();
|
||||
|
||||
return Result.buildSuc(ZookeeperMetrics.initWithMetric(clusterPhyId, metricName, Float.valueOf(value)));
|
||||
} catch (Exception e) {
|
||||
return Result.buildFailure(VC_JMX_CONNECT_ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user