diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java index d87e1bcc..740974d7 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java @@ -34,6 +34,7 @@ import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafk import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.*; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ZookeeperMetricVersionItems.*; @Service @@ -114,6 +115,14 @@ public class VersionControlManagerImpl implements VersionControlManager { defaultMetrics.add(new UserMetricConfig(METRIC_ZOOKEEPER.getCode(), ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC, true)); defaultMetrics.add(new UserMetricConfig(METRIC_ZOOKEEPER.getCode(), ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC, true)); defaultMetrics.add(new UserMetricConfig(METRIC_ZOOKEEPER.getCode(), ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH, true)); + + // mm2 + defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_BYTE_COUNT, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_BYTE_RATE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_AGE_MS_MAX, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_COUNT, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_RATE, true)); + defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX, true)); } @Autowired diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java index 6597b8ec..030b78ad 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java @@ -69,8 +69,8 @@ public class ConnectClusterServiceImpl implements ConnectClusterService { if (ValidateUtils.isBlank(oldPO.getVersion())) { oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION); } - if (ValidateUtils.isBlank(oldPO.getClusterUrl())) { - oldPO.setClusterUrl(metadata.getMemberLeaderUrl()); + if (!ValidateUtils.isBlank(clusterUrl)) { + oldPO.setClusterUrl(clusterUrl); } connectClusterDAO.updateById(oldPO); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java index 076f5c11..220e4e89 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java @@ -1,5 +1,6 @@ package com.xiaojukeji.know.streaming.km.core.service.connect.connector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo; @@ -56,4 +57,6 @@ public interface ConnectorService { ConnectorPO getConnectorFromDB(Long connectClusterId, String connectorName); ConnectorTypeEnum getConnectorType(Long connectClusterId, String connectorName); + + void completeMirrorMakerInfo(ConnectCluster connectCluster, List connectorList); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java index 9d2136a9..c042276d 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java @@ -13,6 +13,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; import com.xiaojukeji.know.streaming.km.common.component.RestTool; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter; import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; @@ -33,7 +34,10 @@ import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; import java.util.*; +import java.util.stream.Collectors; +import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME; +import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_CONNECT_CONNECTOR; @Service @@ -79,7 +83,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C // 构造参数 Properties props = new Properties(); - props.put("name", connectorName); + props.put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, connectorName); props.put("config", configs); ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent( @@ -477,6 +481,45 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C return connectorType; } + @Override + public void completeMirrorMakerInfo(ConnectCluster connectCluster, List connectorList) { + List sourceConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList()); + if (sourceConnectorList.isEmpty()) { + return; + } + + List heartBeatConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE)).collect(Collectors.toList()); + List checkpointConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE)).collect(Collectors.toList()); + + Map heartbeatMap = this.buildMirrorMakerMap(connectCluster, heartBeatConnectorList); + Map checkpointMap = this.buildMirrorMakerMap(connectCluster, checkpointConnectorList); + + for (KSConnector sourceConnector : sourceConnectorList) { + Result ret = this.getConnectorInfoFromCluster(connectCluster, sourceConnector.getConnectorName()); + + if (!ret.hasData()) { + LOGGER.error( + "method=completeMirrorMakerInfo||connectClusterId={}||connectorName={}||get connectorInfo fail!", + connectCluster.getId(), sourceConnector.getConnectorName() + ); + continue; + } + KSConnectorInfo ksConnectorInfo = ret.getData(); + String targetServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); + String sourceServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); + + if (ValidateUtils.anyBlank(targetServers, sourceServers)) { + continue; + } + + String[] targetBrokerList = getBrokerList(targetServers); + String[] sourceBrokerList = getBrokerList(sourceServers); + sourceConnector.setHeartbeatConnectorName(this.findBindConnector(targetBrokerList, sourceBrokerList, heartbeatMap)); + sourceConnector.setCheckpointConnectorName(this.findBindConnector(targetBrokerList, sourceBrokerList, checkpointMap)); + } + + } + /**************************************************** private method ****************************************************/ private int deleteConnectorInDB(Long connectClusterId, String connectorName) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); @@ -578,4 +621,63 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C ); } } + + private Map buildMirrorMakerMap(ConnectCluster connectCluster, List ksConnectorList) { + Map bindMap = new HashMap<>(); + + for (KSConnector ksConnector : ksConnectorList) { + Result ret = this.getConnectorInfoFromCluster(connectCluster, ksConnector.getConnectorName()); + + if (!ret.hasData()) { + LOGGER.error( + "method=buildMirrorMakerMap||connectClusterId={}||connectorName={}||get connectorInfo fail!", + connectCluster.getId(), ksConnector.getConnectorName() + ); + continue; + } + + KSConnectorInfo ksConnectorInfo = ret.getData(); + String targetServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); + String sourceServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); + + if (ValidateUtils.anyBlank(targetServers, sourceServers)) { + continue; + } + + String[] targetBrokerList = getBrokerList(targetServers); + String[] sourceBrokerList = getBrokerList(sourceServers); + for (String targetBroker : targetBrokerList) { + for (String sourceBroker : sourceBrokerList) { + bindMap.put(targetBroker + "@" + sourceBroker, ksConnector.getConnectorName()); + } + } + + } + return bindMap; + } + + private String findBindConnector(String[] targetBrokerList, String[] sourceBrokerList, Map connectorBindMap) { + for (String targetBroker : targetBrokerList) { + for (String sourceBroker : sourceBrokerList) { + String connectorName = connectorBindMap.get(targetBroker + "@" + sourceBroker); + if (connectorName != null) { + return connectorName; + } + } + } + return ""; + } + + private String[] getBrokerList(String str) { + if (ValidateUtils.isBlank(str)) { + return new String[0]; + } + if (str.contains(";")) { + return str.split(";"); + } + if (str.contains(",")) { + return str.split(","); + } + return new String[]{str}; + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorMetricService.java index 5efc4438..febfdcf4 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorMetricService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/BaseConnectorMetricService.java @@ -1,17 +1,8 @@ package com.xiaojukeji.know.streaming.km.core.service.version; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchQuery; -import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; -import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; -import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import org.springframework.util.CollectionUtils; - import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; /** @@ -19,10 +10,7 @@ import java.util.stream.Collectors; * @date 2022/11/9 */ public abstract class BaseConnectorMetricService extends BaseConnectorVersionControlService{ - private static final ILog LOGGER = LogFactory.getLog(BaseMetricService.class); - private List metricNames = new ArrayList<>(); - private List metricFields = new ArrayList<>(); @PostConstruct public void init(){ @@ -32,7 +20,6 @@ public abstract class BaseConnectorMetricService extends BaseConnectorVersionCon protected void initMetricFieldAndNameList(){ metricNames = listVersionControlItems().stream().map(v -> v.getName()).collect(Collectors.toList()); - metricFields = listMetricPOFields(); } protected abstract List listMetricPOFields(); @@ -46,29 +33,4 @@ public abstract class BaseConnectorMetricService extends BaseConnectorVersionCon protected boolean isMetricName(String str){ return metricNames.contains(str); } - - /** - * 检查 str 是不是一个 fieldName - * @param str - */ - protected boolean isMetricField(String str){ - return metricFields.contains(str); - } - - protected void setQueryMetricFlag(SearchQuery query){ - if(null == query){return;} - - String fieldName = query.getQueryName(); - - query.setMetric(isMetricName(fieldName)); - query.setField(isMetricField(fieldName)); - } - - protected void setQueryMetricFlag(List matches){ - if(CollectionUtils.isEmpty(matches)){return;} - - for (SearchQuery match : matches){ - setQueryMetricFlag(match); - } - } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/kafka/ClusterMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/kafka/ClusterMetricVersionItems.java index 5a72b38c..cefe8930 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/kafka/ClusterMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/kafka/ClusterMetricVersionItems.java @@ -62,6 +62,15 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric { public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CONNECTOR = "HealthCheckPassed_Connector"; public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CONNECTOR = "HealthCheckTotal_Connector"; + /** + * mm2健康指标 + */ + public static final String CLUSTER_METRIC_HEALTH_STATE_MIRROR_MAKER = "HealthState_MirrorMaker"; + public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_MIRROR_MAKER = "HealthCheckPassed_MirrorMaker"; + public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_MIRROR_MAKER = "HealthCheckTotal_MirrorMaker"; + + + public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize"; public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize"; public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";