diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/connect/mm2/MirrorMakerMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/connect/mm2/MirrorMakerMetricCollector.java new file mode 100644 index 00000000..36436fba --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/connect/mm2/MirrorMakerMetricCollector.java @@ -0,0 +1,117 @@ +package com.xiaojukeji.know.streaming.km.collector.metric.connect.mm2; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.collector.metric.connect.AbstractConnectMetricCollector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.mm2.MirrorMakerMetricEvent; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService; +import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerService; +import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_CONNECT_MIRROR_MAKER; + +/** + * @author wyb + * @date 2022/12/15 + */ +@Component +public class MirrorMakerMetricCollector extends AbstractConnectMetricCollector { + protected static final ILog LOGGER = LogFactory.getLog(MirrorMakerMetricCollector.class); + + @Autowired + private VersionControlService versionControlService; + + @Autowired + private MirrorMakerService mirrorMakerService; + + @Autowired + private ConnectorService connectorService; + + @Autowired + private MirrorMakerMetricService mirrorMakerMetricService; + + @Override + public VersionItemTypeEnum collectorType() { + return METRIC_CONNECT_MIRROR_MAKER; + } + + @Override + public List collectConnectMetrics(ConnectCluster connectCluster) { + Long clusterPhyId = connectCluster.getKafkaClusterPhyId(); + Long connectClusterId = connectCluster.getId(); + + List mirrorMakerList = connectorService.listByConnectClusterIdFromDB(connectClusterId).stream().filter(elem -> elem.getConnectorClassName().equals(MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList()); + Map mirrorMakerTopicMap = mirrorMakerService.getMirrorMakerTopicMap(connectClusterId).getData(); + + List items = versionControlService.listVersionControlItem(this.getClusterVersion(connectCluster), collectorType().getCode()); + FutureWaitUtil future = this.getFutureUtilByClusterPhyId(clusterPhyId); + + List metricsList = new ArrayList<>(); + + for (ConnectorPO mirrorMaker : mirrorMakerList) { + MirrorMakerMetrics metrics = new MirrorMakerMetrics(clusterPhyId, connectClusterId, mirrorMaker.getConnectorName()); + metricsList.add(metrics); + + List mirrorMakerTopicList = mirrorMakerService.getMirrorMakerTopicList(mirrorMaker, mirrorMakerTopicMap); + future.runnableTask(String.format("class=MirrorMakerMetricCollector||connectClusterId=%d||mirrorMakerName=%s", connectClusterId, mirrorMaker.getConnectorName()), + 30000, + () -> collectMetrics(connectClusterId, mirrorMaker.getConnectorName(), metrics, items, mirrorMakerTopicList)); + } + future.waitResult(30000); + + this.publishMetric(new MirrorMakerMetricEvent(this,metricsList)); + + return metricsList; + } + + /**************************************************** private method ****************************************************/ + private void collectMetrics(Long connectClusterId, String mirrorMakerName, MirrorMakerMetrics metrics, List items, List mirrorMakerTopicList) { + long startTime = System.currentTimeMillis(); + metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME); + + for (VersionControlItem v : items) { + try { + //已测量指标过滤 + if (metrics.getMetrics().get(v.getName()) != null) { + continue; + } + + Result ret = mirrorMakerMetricService.collectMirrorMakerMetricsFromKafka(connectClusterId, mirrorMakerName, mirrorMakerTopicList, v.getName()); + if (ret == null || !ret.hasData()) { + continue; + } + metrics.putMetric(ret.getData().getMetrics()); + + } catch (Exception e) { + LOGGER.error( + "method=collectMetrics||connectClusterId={}||mirrorMakerName={}||metric={}||errMsg=exception!", + connectClusterId, mirrorMakerName, v.getName(), e + ); + + } + } + metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f); + + } +} + + + diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/connector/KSConnector.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/connector/KSConnector.java index b8fab0b6..6b3f7b57 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/connector/KSConnector.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/connector/KSConnector.java @@ -45,4 +45,14 @@ public class KSConnector implements Serializable { * 状态 */ private String state; + + /** + * 心跳检测connector名称 + */ + private String heartbeatConnectorName; + + /** + * 进度确认connector名称 + */ + private String checkpointConnectorName; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/connect/ConnectorPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/connect/ConnectorPO.java index 1853deef..1d82a9e1 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/connect/ConnectorPO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/connect/ConnectorPO.java @@ -47,4 +47,14 @@ public class ConnectorPO extends BasePO { * 状态 */ private String state; + + /** + * 心跳检测connector + */ + private String heartbeatConnectorName; + + /** + * 进度确认connector + */ + private String checkpointConnectorName; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/MirrorMakerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/MirrorMakerService.java new file mode 100644 index 00000000..7c2ff225 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/MirrorMakerService.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.know.streaming.km.core.service.connect.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; + +import java.util.List; +import java.util.Map; + +/** + * @author wyb + * @date 2022/12/14 + */ +public interface MirrorMakerService { + Result> getMirrorMakerTopicMap(Long connectClusterId); + + List getMirrorMakerTopicList(ConnectorPO mirrorMaker, Map mirrorMakerTopicMap); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerServiceImpl.java new file mode 100644 index 00000000..b011cf8f --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/mm2/impl/MirrorMakerServiceImpl.java @@ -0,0 +1,95 @@ +package com.xiaojukeji.know.streaming.km.core.service.connect.mm2.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectWorker; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.mm2.MirrorMakerTopic; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; +import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; +import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; +import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; +import com.xiaojukeji.know.streaming.km.persistence.connect.ConnectJMXClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.management.ObjectName; +import java.util.*; + +import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TOPIC_PARTITION_PATTERN; + +/** + * @author wyb + * @date 2022/12/14 + */ +@Service +public class MirrorMakerServiceImpl implements MirrorMakerService { + private static final ILog LOGGER = LogFactory.getLog(MirrorMakerServiceImpl.class); + + @Autowired + private WorkerService workerService; + + @Autowired + private ConnectJMXClient connectJMXClient; + + @Override + public Result> getMirrorMakerTopicMap(Long connectClusterId) { + + List connectWorkerList = workerService.listFromDB(connectClusterId); + + //Map + Map topicMap = new HashMap<>(); + + for (ConnectWorker connectWorker : connectWorkerList) { + JmxConnectorWrap jmxConnectorWrap = connectJMXClient.getClientWithCheck(connectClusterId, connectWorker.getWorkerId()); + Set objectNameSet = new HashSet<>(); + try { + objectNameSet = jmxConnectorWrap.queryNames(new ObjectName(MIRROR_MAKER_TOPIC_PARTITION_PATTERN), null); + } catch (Exception e) { + LOGGER.error("method=getMirrorMakerTopic||connectClusterId={}||workerId={}||queryNames failed!", + connectClusterId, connectWorker.getWorkerId()); + continue; + } + + //解析数据 + for (ObjectName objectName : objectNameSet) { + try { + String[] paramList = objectName.getCanonicalName().split(","); + + String clusterAlias = paramList[1].split("=")[1]; + String topicName = paramList[2].split("=")[1]; + Integer partition = Integer.valueOf(paramList[0].split("=")[1]); + + MirrorMakerTopic mirrorMakerTopic = topicMap.get(topicName); + + if (mirrorMakerTopic == null) { + mirrorMakerTopic = new MirrorMakerTopic(clusterAlias, topicName, new HashMap<>()); + topicMap.put(topicName, mirrorMakerTopic); + } + + mirrorMakerTopic.getPartitionMap().put(partition, connectWorker.getWorkerId()); + } catch (Exception e) { + LOGGER.error("method=getMirrorMakerTopic||connectClusterId={}||workerId={}||canonicalName={}||canonicalName explain error!", + connectClusterId, connectWorker.getWorkerId(), objectName.getCanonicalName()); + } + + } + } + return Result.buildSuc(topicMap); + } + + @Override + public List getMirrorMakerTopicList(ConnectorPO mirrorMaker, Map mirrorMakerTopicMap) { + List mirrorMakerTopicList = new ArrayList<>(); + List topicList = CommonUtils.string2StrList(mirrorMaker.getTopics()); + + for (String topicName : topicList) { + MirrorMakerTopic mirrorMakerTopic = mirrorMakerTopicMap.get(topicName); + if (mirrorMakerTopic != null) { + mirrorMakerTopicList.add(mirrorMakerTopic); + } + } + return mirrorMakerTopicList; + } +}