[Feature]MM2管理-MM2管理相关服务类(#894)

This commit is contained in:
zengqiao
2023-02-09 16:52:27 +08:00
committed by EricZeng
parent 235c0ed30e
commit add2af4f3f
6 changed files with 126 additions and 41 deletions

View File

@@ -34,6 +34,7 @@ import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafk
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ZookeeperMetricVersionItems.*;
@Service
@@ -114,6 +115,14 @@ public class VersionControlManagerImpl implements VersionControlManager {
defaultMetrics.add(new UserMetricConfig(METRIC_ZOOKEEPER.getCode(), ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC, true));
defaultMetrics.add(new UserMetricConfig(METRIC_ZOOKEEPER.getCode(), ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC, true));
defaultMetrics.add(new UserMetricConfig(METRIC_ZOOKEEPER.getCode(), ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH, true));
// mm2
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_BYTE_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_BYTE_RATE, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_AGE_MS_MAX, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_RATE, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX, true));
}
@Autowired

View File

@@ -69,8 +69,8 @@ public class ConnectClusterServiceImpl implements ConnectClusterService {
if (ValidateUtils.isBlank(oldPO.getVersion())) {
oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION);
}
if (ValidateUtils.isBlank(oldPO.getClusterUrl())) {
oldPO.setClusterUrl(metadata.getMemberLeaderUrl());
if (!ValidateUtils.isBlank(clusterUrl)) {
oldPO.setClusterUrl(clusterUrl);
}
connectClusterDAO.updateById(oldPO);

View File

@@ -1,5 +1,6 @@
package com.xiaojukeji.know.streaming.km.core.service.connect.connector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo;
@@ -56,4 +57,6 @@ public interface ConnectorService {
ConnectorPO getConnectorFromDB(Long connectClusterId, String connectorName);
ConnectorTypeEnum getConnectorType(Long connectClusterId, String connectorName);
void completeMirrorMakerInfo(ConnectCluster connectCluster, List<KSConnector> connectorList);
}

View File

@@ -13,6 +13,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
import com.xiaojukeji.know.streaming.km.common.component.RestTool;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter;
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
@@ -33,7 +34,10 @@ import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_CONNECT_CONNECTOR;
@Service
@@ -79,7 +83,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
// 构造参数
Properties props = new Properties();
props.put("name", connectorName);
props.put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, connectorName);
props.put("config", configs);
ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent(
@@ -477,6 +481,45 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
return connectorType;
}
@Override
public void completeMirrorMakerInfo(ConnectCluster connectCluster, List<KSConnector> connectorList) {
List<KSConnector> sourceConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList());
if (sourceConnectorList.isEmpty()) {
return;
}
List<KSConnector> heartBeatConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE)).collect(Collectors.toList());
List<KSConnector> checkpointConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE)).collect(Collectors.toList());
Map<String, String> heartbeatMap = this.buildMirrorMakerMap(connectCluster, heartBeatConnectorList);
Map<String, String> checkpointMap = this.buildMirrorMakerMap(connectCluster, checkpointConnectorList);
for (KSConnector sourceConnector : sourceConnectorList) {
Result<KSConnectorInfo> ret = this.getConnectorInfoFromCluster(connectCluster, sourceConnector.getConnectorName());
if (!ret.hasData()) {
LOGGER.error(
"method=completeMirrorMakerInfo||connectClusterId={}||connectorName={}||get connectorInfo fail!",
connectCluster.getId(), sourceConnector.getConnectorName()
);
continue;
}
KSConnectorInfo ksConnectorInfo = ret.getData();
String targetServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
String sourceServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
if (ValidateUtils.anyBlank(targetServers, sourceServers)) {
continue;
}
String[] targetBrokerList = getBrokerList(targetServers);
String[] sourceBrokerList = getBrokerList(sourceServers);
sourceConnector.setHeartbeatConnectorName(this.findBindConnector(targetBrokerList, sourceBrokerList, heartbeatMap));
sourceConnector.setCheckpointConnectorName(this.findBindConnector(targetBrokerList, sourceBrokerList, checkpointMap));
}
}
/**************************************************** private method ****************************************************/
private int deleteConnectorInDB(Long connectClusterId, String connectorName) {
LambdaQueryWrapper<ConnectorPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
@@ -578,4 +621,63 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
);
}
}
private Map<String, String> buildMirrorMakerMap(ConnectCluster connectCluster, List<KSConnector> ksConnectorList) {
Map<String, String> bindMap = new HashMap<>();
for (KSConnector ksConnector : ksConnectorList) {
Result<KSConnectorInfo> ret = this.getConnectorInfoFromCluster(connectCluster, ksConnector.getConnectorName());
if (!ret.hasData()) {
LOGGER.error(
"method=buildMirrorMakerMap||connectClusterId={}||connectorName={}||get connectorInfo fail!",
connectCluster.getId(), ksConnector.getConnectorName()
);
continue;
}
KSConnectorInfo ksConnectorInfo = ret.getData();
String targetServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
String sourceServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
if (ValidateUtils.anyBlank(targetServers, sourceServers)) {
continue;
}
String[] targetBrokerList = getBrokerList(targetServers);
String[] sourceBrokerList = getBrokerList(sourceServers);
for (String targetBroker : targetBrokerList) {
for (String sourceBroker : sourceBrokerList) {
bindMap.put(targetBroker + "@" + sourceBroker, ksConnector.getConnectorName());
}
}
}
return bindMap;
}
private String findBindConnector(String[] targetBrokerList, String[] sourceBrokerList, Map<String, String> connectorBindMap) {
for (String targetBroker : targetBrokerList) {
for (String sourceBroker : sourceBrokerList) {
String connectorName = connectorBindMap.get(targetBroker + "@" + sourceBroker);
if (connectorName != null) {
return connectorName;
}
}
}
return "";
}
private String[] getBrokerList(String str) {
if (ValidateUtils.isBlank(str)) {
return new String[0];
}
if (str.contains(";")) {
return str.split(";");
}
if (str.contains(",")) {
return str.split(",");
}
return new String[]{str};
}
}

View File

@@ -1,17 +1,8 @@
package com.xiaojukeji.know.streaming.km.core.service.version;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchQuery;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -19,10 +10,7 @@ import java.util.stream.Collectors;
* @date 2022/11/9
*/
public abstract class BaseConnectorMetricService extends BaseConnectorVersionControlService{
private static final ILog LOGGER = LogFactory.getLog(BaseMetricService.class);
private List<String> metricNames = new ArrayList<>();
private List<String> metricFields = new ArrayList<>();
@PostConstruct
public void init(){
@@ -32,7 +20,6 @@ public abstract class BaseConnectorMetricService extends BaseConnectorVersionCon
protected void initMetricFieldAndNameList(){
metricNames = listVersionControlItems().stream().map(v -> v.getName()).collect(Collectors.toList());
metricFields = listMetricPOFields();
}
protected abstract List<String> listMetricPOFields();
@@ -46,29 +33,4 @@ public abstract class BaseConnectorMetricService extends BaseConnectorVersionCon
protected boolean isMetricName(String str){
return metricNames.contains(str);
}
/**
* 检查 str 是不是一个 fieldName
* @param str
*/
protected boolean isMetricField(String str){
return metricFields.contains(str);
}
protected void setQueryMetricFlag(SearchQuery query){
if(null == query){return;}
String fieldName = query.getQueryName();
query.setMetric(isMetricName(fieldName));
query.setField(isMetricField(fieldName));
}
protected <T extends SearchQuery> void setQueryMetricFlag(List<T> matches){
if(CollectionUtils.isEmpty(matches)){return;}
for (SearchQuery match : matches){
setQueryMetricFlag(match);
}
}
}

View File

@@ -62,6 +62,15 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CONNECTOR = "HealthCheckPassed_Connector";
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CONNECTOR = "HealthCheckTotal_Connector";
/**
* mm2健康指标
*/
public static final String CLUSTER_METRIC_HEALTH_STATE_MIRROR_MAKER = "HealthState_MirrorMaker";
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_MIRROR_MAKER = "HealthCheckPassed_MirrorMaker";
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_MIRROR_MAKER = "HealthCheckTotal_MirrorMaker";
public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize";
public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize";
public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";