[Feature]MM2管理-采集MM2指标(#894)

This commit is contained in:
zengqiao
2023-02-09 14:47:08 +08:00
committed by EricZeng
parent 9b7c41e804
commit 6ba3dceb84
12 changed files with 666 additions and 2 deletions

View File

@@ -0,0 +1,33 @@
package com.xiaojukeji.know.streaming.km.collector.sink.mm2;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.collector.sink.AbstractMetricESSender;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.mm2.MirrorMakerMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.mm2.MirrorMakerMetricPO;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.persistence.es.template.TemplateConstant.CONNECT_MM2_INDEX;
/**
* @author zengqiao
* @date 2022/12/20
*/
@Component
public class MirrorMakerMetricESSender extends AbstractMetricESSender implements ApplicationListener<MirrorMakerMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog(MirrorMakerMetricESSender.class);
@PostConstruct
public void init(){
LOGGER.info("method=init||msg=init finished");
}
@Override
public void onApplicationEvent(MirrorMakerMetricEvent event) {
send2es(CONNECT_MM2_INDEX, ConvertUtil.list2List(event.getMetricsList(), MirrorMakerMetricPO.class));
}
}

View File

@@ -0,0 +1,33 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* @author wyb
* @date 2022/12/14
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MirrorMakerTopic {
/**
* mm2集群别名
*/
private String clusterAlias;
/**
* topic名称
*/
private String topicName;
/**
* partition在connect上的分布 Map<PartitionId,WorkerId>
*/
private Map<Integer,String> partitionMap;
}

View File

@@ -0,0 +1,38 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author wyb
* @date 2022/12/16
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MirrorMakerTopicPartitionMetrics extends BaseMetrics {
private Long connectClusterId;
private String mirrorMakerName;
private String clusterAlias;
private String topicName;
private Integer partitionId;
private String workerId;
@Override
public String unique() {
return "KCOR@" + connectClusterId + "@" + mirrorMakerName + "@" + clusterAlias + "@" + workerId + "@" + topicName + "@" + partitionId;
}
public static MirrorMakerTopicPartitionMetrics initWithMetric(Long connectClusterId, String mirrorMakerName, String clusterAlias, String topicName, Integer partitionId, String workerId, String metricName, Float value) {
MirrorMakerTopicPartitionMetrics metrics = new MirrorMakerTopicPartitionMetrics(connectClusterId, mirrorMakerName, clusterAlias, topicName, partitionId, workerId);
metrics.putMetric(metricName, value);
return metrics;
}
}

View File

@@ -0,0 +1,26 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.connect.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.MetricParam;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @author wyb
* @date 2022/12/15
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MirrorMakerMetricParam extends MetricParam {
private Long connectClusterId;
private String mirrorMakerName;
private List<MirrorMakerTopic> mirrorMakerTopicList;
private String metric;
}

View File

@@ -0,0 +1,21 @@
package com.xiaojukeji.know.streaming.km.common.bean.event.metric.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BaseMetricEvent;
import lombok.Getter;
import java.util.List;
/**
* @author zengqiao
* @date 2022/12/20
*/
@Getter
public class MirrorMakerMetricEvent extends BaseMetricEvent {
private final List<MirrorMakerMetrics> metricsList;
public MirrorMakerMetricEvent(Object source, List<MirrorMakerMetrics> metricsList) {
super(source);
this.metricsList = metricsList;
}
}

View File

@@ -144,6 +144,32 @@ public class JmxAttribute {
public static final String TOTAL_RETRIES = "total-retries";
/*********************************************************** mm2 ***********************************************************/
public static final String BYTE_COUNT = "byte-count";
public static final String BYTE_RATE = "byte-rate";
public static final String RECORD_AGE_MS = "record-age-ms";
public static final String RECORD_AGE_MS_AVG = "record-age-ms-avg";
public static final String RECORD_AGE_MS_MAX = "record-age-ms-max";
public static final String RECORD_AGE_MS_MIN = "record-age-ms-min";
public static final String RECORD_COUNT = "record-count";
public static final String RECORD_RATE = "record-rate";
public static final String REPLICATION_LATENCY_MS = "replication-latency-ms";
public static final String REPLICATION_LATENCY_MS_AVG = "replication-latency-ms-avg";
public static final String REPLICATION_LATENCY_MS_MAX = "replication-latency-ms-max";
public static final String REPLICATION_LATENCY_MS_MIN = "replication-latency-ms-min";
private JmxAttribute() {
}
}

View File

@@ -41,6 +41,8 @@ public class JmxName {
public static final String JMX_SERVER_APP_INFO ="kafka.server:type=app-info";
public static final String JMX_SERVER_TOPIC_MIRROR ="kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=*,topic=%s,partition=*";
/*********************************************************** controller ***********************************************************/
public static final String JMX_CONTROLLER_ACTIVE_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";
@@ -82,6 +84,10 @@ public class JmxName {
public static final String JMX_CONNECTOR_TASK_ERROR_METRICS = "kafka.connect:type=task-error-metrics,connector=%s,task=%s";
/*********************************************************** mm2 ***********************************************************/
public static final String JMX_MIRROR_MAKER_SOURCE = "kafka.connect.mirror:type=MirrorSourceConnector,target=%s,topic=%s,partition=%s";
private JmxName() {
}

View File

@@ -34,7 +34,7 @@ import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerServic
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.ConnectorMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.connector.ConnectorMetricESDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

View File

@@ -0,0 +1,28 @@
package com.xiaojukeji.know.streaming.km.core.service.connect.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.mm2.MetricsMirrorMakersDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import java.util.List;
/**
* @author wyb
* @date 2022/12/15
*/
public interface MirrorMakerMetricService {
Result<MirrorMakerMetrics> collectMirrorMakerMetricsFromKafka(Long connectClusterPhyId, String mirrorMakerName, List<MirrorMakerTopic> mirrorMakerTopicList, String metricName);
/**
* 从ES中获取一段时间内聚合计算之后的指标线
*/
Result<List<MetricMultiLinesVO>> listMirrorMakerClusterMetricsFromES(Long clusterPhyId, MetricsMirrorMakersDTO dto);
Result<List<MirrorMakerMetrics>> getLatestMetricsFromES(Long clusterPhyId, List<Tuple<Long, String>> mirrorMakerList, List<String> metricNameList);
Result<MirrorMakerMetrics> getLatestMetricsFromES(Long connectClusterId, String connectorName, List<String> metricsNames);
}

View File

@@ -0,0 +1,324 @@
package com.xiaojukeji.know.streaming.km.core.service.connect.mm2.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.google.common.collect.Table;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.mm2.MetricsMirrorMakersDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerTopicPartitionMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.connect.mm2.MirrorMakerMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.connect.ConnectorMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.mm2.MirrorMakerMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionConnectJmxInfo;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseConnectorMetricService;
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
import org.springframework.beans.factory.annotation.Autowired;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.connect.mm2.MirrorMakerMetricESDAO;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_CONNECT_MIRROR_MAKER;
/**
* @author wyb
* @date 2022/12/15
*/
@Service
public class MirrorMakerMetricServiceImpl extends BaseConnectorMetricService implements MirrorMakerMetricService {
protected static final ILog LOGGER = LogFactory.getLog(MirrorMakerMetricServiceImpl.class);
public static final String MIRROR_MAKER_METHOD_DO_NOTHING = "doNothing";
public static final String MIRROR_MAKER_METHOD_GET_HEALTH_SCORE = "getMetricHealthScore";
public static final String MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM = "getTopicPartitionMetricListSum";
public static final String MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG = "getTopicPartitionMetricListAvg";
public static final String MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MIN = "getTopicPartitionMetricListMin";
public static final String MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MAX = "getTopicPartitionMetricListMax";
@Autowired
private ConnectJMXClient connectJMXClient;
@Autowired
private MirrorMakerMetricESDAO mirrorMakerMetricESDAO;
@Autowired
private ConnectorService connectorService;
@Autowired
private HealthStateService healthStateService;
@Override
protected List<String> listMetricPOFields() {
return BeanUtil.listBeanFields(MirrorMakerMetricPO.class);
}
@Override
protected void initRegisterVCHandler() {
registerVCHandler(MIRROR_MAKER_METHOD_DO_NOTHING, this::doNothing);
registerVCHandler(MIRROR_MAKER_METHOD_GET_HEALTH_SCORE, this::getMetricHealthScore);
registerVCHandler(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM, this::getTopicPartitionMetricListSum);
registerVCHandler(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG, this::getTopicPartitionMetricListAvg);
registerVCHandler(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MAX, this::getTopicPartitionMetricListMax);
registerVCHandler(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MIN, this::getTopicPartitionMetricListMin);
}
@Override
protected VersionItemTypeEnum getVersionItemType() {
return METRIC_CONNECT_MIRROR_MAKER;
}
@Override
public Result<MirrorMakerMetrics> collectMirrorMakerMetricsFromKafka(Long connectClusterPhyId, String mirrorMakerName, List<MirrorMakerTopic> mirrorMakerTopicList, String metricName) {
try {
MirrorMakerMetricParam metricParam = new MirrorMakerMetricParam(connectClusterPhyId, mirrorMakerName, mirrorMakerTopicList, metricName);
return (Result<MirrorMakerMetrics>) doVCHandler(connectClusterPhyId, metricName, metricParam);
} catch (Exception e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}
@Override
public Result<List<MetricMultiLinesVO>> listMirrorMakerClusterMetricsFromES(Long clusterPhyId, MetricsMirrorMakersDTO dto) {
Long startTime = dto.getStartTime();
Long endTime = dto.getEndTime();
Integer topN = dto.getTopNu();
String aggType = dto.getAggType();
List<String> metricNameList = dto.getMetricsNames();
List<Tuple<Long, String>> connectorList = new ArrayList<>();
if(!CollectionUtils.isEmpty(dto.getConnectorNameList())){
connectorList = dto.getConnectorNameList().stream()
.map(c -> new Tuple<>(c.getConnectClusterId(), c.getConnectorName()))
.collect(Collectors.toList());
}
Table<String/*metric*/, Tuple<Long, String>, List<MetricPointVO>> retTable;
if(ValidateUtils.isEmptyList(connectorList)) {
// 按照TopN的方式去获取
List<Tuple<Long, String>> defaultConnectorList = this.listTopNMirrorMakerList(clusterPhyId, topN);
retTable = mirrorMakerMetricESDAO.listMetricsByTopN(clusterPhyId, defaultConnectorList, metricNameList, aggType, topN, startTime, endTime);
} else {
// 制定集群ID去获取
retTable = mirrorMakerMetricESDAO.listMetricsByConnectors(clusterPhyId, metricNameList, aggType, connectorList, startTime, endTime);
}
return Result.buildSuc(this.metricMap2VO(clusterPhyId, retTable.rowMap()));
}
@Override
public Result<List<MirrorMakerMetrics>> getLatestMetricsFromES(Long clusterPhyId, List<Tuple<Long, String>> mirrorMakerList, List<String> metricNameList) {
List<ConnectorMetricPO> connectorLatestMetricList = mirrorMakerMetricESDAO.getConnectorLatestMetric(clusterPhyId, mirrorMakerList, metricNameList);
return Result.buildSuc(ConvertUtil.list2List(connectorLatestMetricList, MirrorMakerMetrics.class));
}
@Override
public Result<MirrorMakerMetrics> getLatestMetricsFromES(Long connectClusterId, String connectorName, List<String> metricsNames) {
ConnectorMetricPO connectorLatestMetric = mirrorMakerMetricESDAO.getConnectorLatestMetric(null, connectClusterId, connectorName, metricsNames);
MirrorMakerMetrics mirrorMakerMetrics = ConvertUtil.obj2Obj(connectorLatestMetric, MirrorMakerMetrics.class);
return Result.buildSuc(mirrorMakerMetrics);
}
private List<Tuple<Long, String>> listTopNMirrorMakerList(Long clusterPhyId, Integer topN) {
List<ConnectorPO> poList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId);
if (CollectionUtils.isEmpty(poList)) {
return new ArrayList<>();
}
return poList.subList(0, Math.min(topN, poList.size()))
.stream()
.map( c -> new Tuple<>(c.getId(), c.getConnectorName()) )
.collect(Collectors.toList());
}
protected List<MetricMultiLinesVO> metricMap2VO(Long connectClusterId,
Map<String/*metric*/, Map<Tuple<Long, String>, List<MetricPointVO>>> map){
List<MetricMultiLinesVO> multiLinesVOS = new ArrayList<>();
if (map == null || map.isEmpty()) {
// 如果为空,则直接返回
return multiLinesVOS;
}
for(String metric : map.keySet()){
try {
MetricMultiLinesVO multiLinesVO = new MetricMultiLinesVO();
multiLinesVO.setMetricName(metric);
List<MetricLineVO> metricLines = new ArrayList<>();
Map<Tuple<Long, String>, List<MetricPointVO>> metricPointMap = map.get(metric);
if(null == metricPointMap || metricPointMap.isEmpty()){continue;}
for(Map.Entry<Tuple<Long, String>, List<MetricPointVO>> entry : metricPointMap.entrySet()){
MetricLineVO metricLineVO = new MetricLineVO();
metricLineVO.setName(entry.getKey().getV1() + "#" + entry.getKey().getV2());
metricLineVO.setMetricName(metric);
metricLineVO.setMetricPoints(entry.getValue());
metricLines.add(metricLineVO);
}
multiLinesVO.setMetricLines(metricLines);
multiLinesVOS.add(multiLinesVO);
}catch (Exception e){
LOGGER.error("method=metricMap2VO||connectClusterId={}||msg=exception!", connectClusterId, e);
}
}
return multiLinesVOS;
}
private Result<MirrorMakerMetrics> doNothing(VersionItemParam metricParam) {
MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam;
Long connectClusterId = param.getConnectClusterId();
String mirrorMakerName = param.getMirrorMakerName();
return Result.buildSuc(new MirrorMakerMetrics(connectClusterId,mirrorMakerName));
}
private Result<MirrorMakerMetrics> getMetricHealthScore(VersionItemParam metricParam) {
MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam;
Long connectClusterId = param.getConnectClusterId();
String mirrorMakerName = param.getMirrorMakerName();
MirrorMakerMetrics metrics = healthStateService.calMirrorMakerHealthMetrics(connectClusterId, mirrorMakerName);
return Result.buildSuc(metrics);
}
private Result<MirrorMakerMetrics> getTopicPartitionMetricListSum(VersionItemParam metricParam) {
MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam;
Long connectClusterId = param.getConnectClusterId();
String mirrorMakerName = param.getMirrorMakerName();
List<MirrorMakerTopic> mirrorMakerTopicList = param.getMirrorMakerTopicList();
String metric = param.getMetric();
Result<List<MirrorMakerTopicPartitionMetrics>> ret = this.getTopicPartitionMetricList(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metric);
if (!ret.hasData() || ret.getData().isEmpty()) {
return Result.buildFailure(NOT_EXIST);
}
Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get();
return Result.buildSuc(MirrorMakerMetrics.initWithMetric(connectClusterId, mirrorMakerName, metric, sum));
}
private Result<MirrorMakerMetrics> getTopicPartitionMetricListAvg(VersionItemParam metricParam) {
MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam;
Long connectClusterId = param.getConnectClusterId();
String mirrorMakerName = param.getMirrorMakerName();
List<MirrorMakerTopic> mirrorMakerTopicList = param.getMirrorMakerTopicList();
String metric = param.getMetric();
Result<List<MirrorMakerTopicPartitionMetrics>> ret = this.getTopicPartitionMetricList(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metric);
if (!ret.hasData() || ret.getData().isEmpty()) {
return Result.buildFailure(NOT_EXIST);
}
Float sum = ret.getData().stream().map(elem -> elem.getMetric(metric)).reduce(Float::sum).get();
return Result.buildSuc(MirrorMakerMetrics.initWithMetric(connectClusterId, mirrorMakerName, metric, sum / ret.getData().size()));
}
private Result<MirrorMakerMetrics> getTopicPartitionMetricListMax(VersionItemParam metricParam) {
MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam;
Long connectClusterId = param.getConnectClusterId();
String mirrorMakerName = param.getMirrorMakerName();
List<MirrorMakerTopic> mirrorMakerTopicList = param.getMirrorMakerTopicList();
String metric = param.getMetric();
Result<List<MirrorMakerTopicPartitionMetrics>> ret = this.getTopicPartitionMetricList(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metric);
if (!ret.hasData() || ret.getData().isEmpty()) {
return Result.buildFailure(NOT_EXIST);
}
Float max = ret.getData().stream().max((a, b) -> a.getMetric(metric).compareTo(b.getMetric(metric))).get().getMetric(metric);
return Result.buildSuc(MirrorMakerMetrics.initWithMetric(connectClusterId, mirrorMakerName, metric, max));
}
private Result<MirrorMakerMetrics> getTopicPartitionMetricListMin(VersionItemParam metricParam) {
MirrorMakerMetricParam param = (MirrorMakerMetricParam) metricParam;
Long connectClusterId = param.getConnectClusterId();
String mirrorMakerName = param.getMirrorMakerName();
List<MirrorMakerTopic> mirrorMakerTopicList = param.getMirrorMakerTopicList();
String metric = param.getMetric();
Result<List<MirrorMakerTopicPartitionMetrics>> ret = this.getTopicPartitionMetricList(connectClusterId, mirrorMakerName, mirrorMakerTopicList, metric);
if (!ret.hasData() || ret.getData().isEmpty()) {
return Result.buildFailure(NOT_EXIST);
}
Float min = ret.getData().stream().max((a, b) -> b.getMetric(metric).compareTo(a.getMetric(metric))).get().getMetric(metric);
return Result.buildSuc(MirrorMakerMetrics.initWithMetric(connectClusterId, mirrorMakerName, metric, min));
}
private Result<List<MirrorMakerTopicPartitionMetrics>> getTopicPartitionMetricList(Long connectClusterId, String mirrorMakerName, List<MirrorMakerTopic> mirrorMakerTopicList, String metric) {
List<MirrorMakerTopicPartitionMetrics> topicPartitionMetricsList = new ArrayList<>();
for (MirrorMakerTopic mirrorMakerTopic : mirrorMakerTopicList) {
for (Map.Entry<Integer, String> entry : mirrorMakerTopic.getPartitionMap().entrySet()) {
Result<MirrorMakerTopicPartitionMetrics> ret = this.getMirrorMakerTopicPartitionMetric(connectClusterId, mirrorMakerName, mirrorMakerTopic.getClusterAlias(), mirrorMakerTopic.getTopicName(), entry.getKey(), entry.getValue(), metric);
if (!ret.hasData() || ret.getData().getMetric(metric) == null) {
continue;
}
topicPartitionMetricsList.add(ret.getData());
}
}
return Result.buildSuc(topicPartitionMetricsList);
}
private Result<MirrorMakerTopicPartitionMetrics> getMirrorMakerTopicPartitionMetric(Long connectClusterId, String mirrorMakerName, String clusterAlias, String topicName, Integer partitionId, String workerId, String metric) {
VersionConnectJmxInfo jmxInfo = getJMXInfo(connectClusterId, metric);
if (null == jmxInfo) {
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
}
String jmxObjectName = String.format(jmxInfo.getJmxObjectName(), clusterAlias, topicName, partitionId);
JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, workerId);
if (ValidateUtils.isNull(jmxConnectorWrap)) {
return Result.buildFailure(VC_JMX_INIT_ERROR);
}
try {
//2、获取jmx指标
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxObjectName), jmxInfo.getJmxAttribute()).toString();
MirrorMakerTopicPartitionMetrics metrics = MirrorMakerTopicPartitionMetrics.initWithMetric(connectClusterId, mirrorMakerName, clusterAlias, topicName, partitionId, workerId, metric, Float.valueOf(value));
return Result.buildSuc(metrics);
} catch (InstanceNotFoundException e) {
// 忽略该错误该错误出现的原因是该指标在JMX中不存在
return Result.buildSuc(new MirrorMakerTopicPartitionMetrics(connectClusterId, mirrorMakerName, clusterAlias, topicName, partitionId, workerId));
} catch (Exception e) {
LOGGER.error("method=getMirrorMakerTopicPartitionMetric||connectClusterId={}||mirrorMakerName={}||clusterAlias={}||topicName={}||partitionId={}||workerId={}||metrics={}||jmx={}||msg={}",
connectClusterId, mirrorMakerName, clusterAlias, topicName, partitionId, workerId, metric, jmxObjectName, e.getClass().getName());
return Result.buildFailure(VC_JMX_CONNECT_ERROR);
}
}
}

View File

@@ -1,26 +1,124 @@
package com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.BaseMetricVersionMetric;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_CONNECT_MIRROR_MAKER;
import static com.xiaojukeji.know.streaming.km.common.jmx.JmxAttribute.*;
import static com.xiaojukeji.know.streaming.km.common.jmx.JmxName.JMX_MIRROR_MAKER_SOURCE;
import static com.xiaojukeji.know.streaming.km.core.service.connect.mm2.impl.MirrorMakerMetricServiceImpl.*;
@Component
public class MirrorMakerMetricVersionItems extends BaseMetricVersionMetric {
public static final String MIRROR_MAKER_METRIC_COLLECT_COST_TIME = Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME;
public static final String MIRROR_MAKER_METRIC_HEALTH_STATE = "HealthState";
public static final String MIRROR_MAKER_METRIC_HEALTH_CHECK_PASSED = "HealthCheckPassed";
public static final String MIRROR_MAKER_METRIC_HEALTH_CHECK_TOTAL = "HealthCheckTotal";
public static final String MIRROR_MAKER_METRIC_BYTE_COUNT = "ByteCount";
public static final String MIRROR_MAKER_METRIC_BYTE_RATE = "ByteRate";
public static final String MIRROR_MAKER_METRIC_RECORD_AGE_MS = "RecordAgeMs";
public static final String MIRROR_MAKER_METRIC_RECORD_AGE_MS_AVG = "RecordAgeMsAvg";
public static final String MIRROR_MAKER_METRIC_RECORD_AGE_MS_MAX = "RecordAgeMsMax";
public static final String MIRROR_MAKER_METRIC_RECORD_AGE_MS_MIN = "RecordAgeMsMin";
public static final String MIRROR_MAKER_METRIC_RECORD_COUNT = "RecordCount";
public static final String MIRROR_MAKER_METRIC_RECORD_RATE = "RecordRate";
public static final String MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS = "ReplicationLatencyMs";
public static final String MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_AVG = "ReplicationLatencyMsAvg";
public static final String MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX = "ReplicationLatencyMsMax";
public static final String MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MIN = "ReplicationLatencyMsMin";
@Override
public int versionItemType() {
return METRIC_CONNECT_MIRROR_MAKER.getCode();
}
@Override
public List<VersionMetricControlItem> init(){
public List<VersionMetricControlItem> init() {
List<VersionMetricControlItem> items = new ArrayList<>();
// HealthScore 指标
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_HEALTH_STATE).unit("0:好 1:中 2:差 3:宕机").desc("健康状态(0:好 1:中 2:差 3:宕机)").category(CATEGORY_HEALTH)
.extendMethod(MIRROR_MAKER_METHOD_GET_HEALTH_SCORE));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_HEALTH_CHECK_PASSED).unit("").desc("健康项检查通过数").category(CATEGORY_HEALTH)
.extendMethod(MIRROR_MAKER_METHOD_GET_HEALTH_SCORE));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_HEALTH_CHECK_TOTAL).unit("").desc("健康项检查总数").category(CATEGORY_HEALTH)
.extendMethod(MIRROR_MAKER_METHOD_GET_HEALTH_SCORE));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_COLLECT_COST_TIME).unit("").desc("采集mirrorMaker指标的耗时").category(CATEGORY_PERFORMANCE)
.extendMethod(MIRROR_MAKER_METHOD_DO_NOTHING));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_BYTE_COUNT).unit("byte").desc("消息复制流量大小").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(BYTE_COUNT)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_BYTE_RATE).unit(BYTE_PER_SEC).desc("复制流量速率").category(CATEGORY_FLOW)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(BYTE_RATE)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_RECORD_AGE_MS).unit("ms").desc("消息获取时年龄").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_AGE_MS)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_RECORD_AGE_MS_AVG).unit("ms").desc("消息获取时平均年龄").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_AGE_MS_AVG)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_RECORD_AGE_MS_MAX).unit("ms").desc("消息获取时最大年龄").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MAX)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_AGE_MS_MAX)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_RECORD_AGE_MS_MIN).unit("ms").desc("消息获取时最小年龄").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MIN)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_AGE_MS_MIN)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_RECORD_COUNT).unit("").desc("消息复制条数").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_COUNT)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_RECORD_RATE).unit("条/s").desc("消息复制速率").category(CATEGORY_FLOW)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_SUM)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(RECORD_RATE)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS).unit("ms").desc("消息复制延迟时间").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(REPLICATION_LATENCY_MS)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_AVG).unit("ms").desc("消息复制平均延迟时间").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_AVG)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(REPLICATION_LATENCY_MS_AVG)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX).unit("ms").desc("消息复制最大延迟时间").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MAX)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(REPLICATION_LATENCY_MS_MAX)));
items.add(buildAllVersionsItem()
.name(MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MIN).unit("ms").desc("消息复制最小延迟时间").category(CATEGORY_PERFORMANCE)
.extend(buildConnectJMXMethodExtend(MIRROR_MAKER_METHOD_GET_TOPIC_PARTITION_METRIC_LIST_MIN)
.jmxObjectName(JMX_MIRROR_MAKER_SOURCE).jmxAttribute(REPLICATION_LATENCY_MS_MIN)));
return items;
}
}

View File

@@ -0,0 +1,31 @@
package com.xiaojukeji.know.streaming.km.task.connect.mm2.metrics;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.xiaojukeji.know.streaming.km.collector.metric.connect.mm2.MirrorMakerMetricCollector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.task.connect.metrics.AbstractAsyncMetricsDispatchTask;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author wyb
* @date 2022/12/21
*/
@Task(name = "MirrorMakerCollectorTask",
description = "MirrorMaker指标采集任务",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class MirrorMakerCollectorTask extends AbstractAsyncMetricsDispatchTask {
@Autowired
private MirrorMakerMetricCollector mirrorMakerMetricCollector;
@Override
public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) throws Exception {
mirrorMakerMetricCollector.collectConnectMetrics(connectCluster);
return TaskResult.SUCCESS;
}
}