[Feature]MM2管理-采集MM2指标任务(#894)

This commit is contained in:
zengqiao
2023-02-09 14:53:38 +08:00
committed by EricZeng
parent 6ba3dceb84
commit caccf9cef5
5 changed files with 250 additions and 0 deletions

View File

@@ -0,0 +1,117 @@
package com.xiaojukeji.know.streaming.km.collector.metric.connect.mm2;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.collector.metric.connect.AbstractConnectMetricCollector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.mm2.MirrorMakerMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerService;
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_CONNECT_MIRROR_MAKER;
/**
* @author wyb
* @date 2022/12/15
*/
@Component
public class MirrorMakerMetricCollector extends AbstractConnectMetricCollector<MirrorMakerMetrics> {
protected static final ILog LOGGER = LogFactory.getLog(MirrorMakerMetricCollector.class);
@Autowired
private VersionControlService versionControlService;
@Autowired
private MirrorMakerService mirrorMakerService;
@Autowired
private ConnectorService connectorService;
@Autowired
private MirrorMakerMetricService mirrorMakerMetricService;
@Override
public VersionItemTypeEnum collectorType() {
return METRIC_CONNECT_MIRROR_MAKER;
}
@Override
public List<MirrorMakerMetrics> collectConnectMetrics(ConnectCluster connectCluster) {
Long clusterPhyId = connectCluster.getKafkaClusterPhyId();
Long connectClusterId = connectCluster.getId();
List<ConnectorPO> mirrorMakerList = connectorService.listByConnectClusterIdFromDB(connectClusterId).stream().filter(elem -> elem.getConnectorClassName().equals(MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList());
Map<String, MirrorMakerTopic> mirrorMakerTopicMap = mirrorMakerService.getMirrorMakerTopicMap(connectClusterId).getData();
List<VersionControlItem> items = versionControlService.listVersionControlItem(this.getClusterVersion(connectCluster), collectorType().getCode());
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
List<MirrorMakerMetrics> metricsList = new ArrayList<>();
for (ConnectorPO mirrorMaker : mirrorMakerList) {
MirrorMakerMetrics metrics = new MirrorMakerMetrics(clusterPhyId, connectClusterId, mirrorMaker.getConnectorName());
metricsList.add(metrics);
List<MirrorMakerTopic> mirrorMakerTopicList = mirrorMakerService.getMirrorMakerTopicList(mirrorMaker, mirrorMakerTopicMap);
future.runnableTask(String.format("class=MirrorMakerMetricCollector||connectClusterId=%d||mirrorMakerName=%s", connectClusterId, mirrorMaker.getConnectorName()),
30000,
() -> collectMetrics(connectClusterId, mirrorMaker.getConnectorName(), metrics, items, mirrorMakerTopicList));
}
future.waitResult(30000);
this.publishMetric(new MirrorMakerMetricEvent(this,metricsList));
return metricsList;
}
/**************************************************** private method ****************************************************/
private void collectMetrics(Long connectClusterId, String mirrorMakerName, MirrorMakerMetrics metrics, List<VersionControlItem> items, List<MirrorMakerTopic> mirrorMakerTopicList) {
long startTime = System.currentTimeMillis();
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
for (VersionControlItem v : items) {
try {
//已测量指标过滤
if (metrics.getMetrics().get(v.getName()) != null) {
continue;
}
Result<MirrorMakerMetrics> ret = mirrorMakerMetricService.collectMirrorMakerMetricsFromKafka(connectClusterId, mirrorMakerName, mirrorMakerTopicList, v.getName());
if (ret == null || !ret.hasData()) {
continue;
}
metrics.putMetric(ret.getData().getMetrics());
} catch (Exception e) {
LOGGER.error(
"method=collectMetrics||connectClusterId={}||mirrorMakerName={}||metric={}||errMsg=exception!",
connectClusterId, mirrorMakerName, v.getName(), e
);
}
}
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
}
}

View File

@@ -45,4 +45,14 @@ public class KSConnector implements Serializable {
* 状态
*/
private String state;
/**
* 心跳检测connector名称
*/
private String heartbeatConnectorName;
/**
* 进度确认connector名称
*/
private String checkpointConnectorName;
}

View File

@@ -47,4 +47,14 @@ public class ConnectorPO extends BasePO {
* 状态
*/
private String state;
/**
* 心跳检测connector
*/
private String heartbeatConnectorName;
/**
* 进度确认connector
*/
private String checkpointConnectorName;
}

View File

@@ -0,0 +1,18 @@
package com.xiaojukeji.know.streaming.km.core.service.connect.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
import java.util.List;
import java.util.Map;
/**
* @author wyb
* @date 2022/12/14
*/
public interface MirrorMakerService {
Result<Map<String, MirrorMakerTopic>> getMirrorMakerTopicMap(Long connectClusterId);
List<MirrorMakerTopic> getMirrorMakerTopicList(ConnectorPO mirrorMaker, Map<String, MirrorMakerTopic> mirrorMakerTopicMap);
}

View File

@@ -0,0 +1,95 @@
package com.xiaojukeji.know.streaming.km.core.service.connect.mm2.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectWorker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.management.ObjectName;
import java.util.*;
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TOPIC_PARTITION_PATTERN;
/**
* @author wyb
* @date 2022/12/14
*/
@Service
public class MirrorMakerServiceImpl implements MirrorMakerService {
private static final ILog LOGGER = LogFactory.getLog(MirrorMakerServiceImpl.class);
@Autowired
private WorkerService workerService;
@Autowired
private ConnectJMXClient connectJMXClient;
@Override
public Result<Map<String, MirrorMakerTopic>> getMirrorMakerTopicMap(Long connectClusterId) {
List<ConnectWorker> connectWorkerList = workerService.listFromDB(connectClusterId);
//Map<TopicName,MirrorMakerTopic>
Map<String, MirrorMakerTopic> topicMap = new HashMap<>();
for (ConnectWorker connectWorker : connectWorkerList) {
JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, connectWorker.getWorkerId());
Set<ObjectName> objectNameSet = new HashSet<>();
try {
objectNameSet = jmxConnectorWrap.queryNames(new ObjectName(MIRROR_MAKER_TOPIC_PARTITION_PATTERN), null);
} catch (Exception e) {
LOGGER.error("method=getMirrorMakerTopic||connectClusterId={}||workerId={}||queryNames failed!",
connectClusterId, connectWorker.getWorkerId());
continue;
}
//解析数据
for (ObjectName objectName : objectNameSet) {
try {
String[] paramList = objectName.getCanonicalName().split(",");
String clusterAlias = paramList[1].split("=")[1];
String topicName = paramList[2].split("=")[1];
Integer partition = Integer.valueOf(paramList[0].split("=")[1]);
MirrorMakerTopic mirrorMakerTopic = topicMap.get(topicName);
if (mirrorMakerTopic == null) {
mirrorMakerTopic = new MirrorMakerTopic(clusterAlias, topicName, new HashMap<>());
topicMap.put(topicName, mirrorMakerTopic);
}
mirrorMakerTopic.getPartitionMap().put(partition, connectWorker.getWorkerId());
} catch (Exception e) {
LOGGER.error("method=getMirrorMakerTopic||connectClusterId={}||workerId={}||canonicalName={}||canonicalName explain error!",
connectClusterId, connectWorker.getWorkerId(), objectName.getCanonicalName());
}
}
}
return Result.buildSuc(topicMap);
}
@Override
public List<MirrorMakerTopic> getMirrorMakerTopicList(ConnectorPO mirrorMaker, Map<String, MirrorMakerTopic> mirrorMakerTopicMap) {
List<MirrorMakerTopic> mirrorMakerTopicList = new ArrayList<>();
List<String> topicList = CommonUtils.string2StrList(mirrorMaker.getTopics());
for (String topicName : topicList) {
MirrorMakerTopic mirrorMakerTopic = mirrorMakerTopicMap.get(topicName);
if (mirrorMakerTopic != null) {
mirrorMakerTopicList.add(mirrorMakerTopic);
}
}
return mirrorMakerTopicList;
}
}