From 6e56688a3181c6517de68479562fb82373611290 Mon Sep 17 00:00:00 2001 From: EricZeng Date: Tue, 15 Aug 2023 14:24:23 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=E7=BB=9F=E4=B8=80DB=E5=85=83?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E6=9B=B4=E6=96=B0=E6=A0=BC=E5=BC=8F-Part1=20?= =?UTF-8?q?(#1125)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、引入KafkaMetaService; 2、将Connector的更新按照KafkaMetaService进行更新; 3、简化Connect-MirrorMaker的关联逻辑; 4、GroupService创建的AdminClient中的ClientID增加时间戳,减少Mbean冲突; --- .../connector/impl/ConnectorManagerImpl.java | 18 +- .../mm2/impl/MirrorMakerManagerImpl.java | 34 +- .../ConnectConnectorMetricCollector.java | 2 +- .../bean/entity/meta/KafkaMetaService.java | 44 ++ .../km/common/converter/ConnectConverter.java | 65 ++ .../connect/connector/ConnectorService.java | 29 +- .../connect/connector/OpConnectorService.java | 26 + .../connector/impl/ConnectorServiceImpl.java | 572 ++++-------------- .../impl/OpConnectorServiceImpl.java | 352 +++++++++++ .../service/group/impl/GroupServiceImpl.java | 4 +- .../v3/connect/KafkaConnectorController.java | 13 +- .../connect/metadata/SyncConnectorTask.java | 43 +- 12 files changed, 658 insertions(+), 544 deletions(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/meta/KafkaMetaService.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/OpConnectorService.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/OpConnectorServiceImpl.java diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java index 191afc6b..6e1440ef 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java @@ -12,6 +12,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO; import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService; import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; import org.apache.kafka.connect.runtime.AbstractStatus; @@ -30,6 +31,9 @@ public class ConnectorManagerImpl implements ConnectorManager { @Autowired private ConnectorService connectorService; + @Autowired + private OpConnectorService opConnectorService; + @Autowired private WorkerConnectorService workerConnectorService; @@ -44,24 +48,24 @@ public class ConnectorManagerImpl implements ConnectorManager { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Connector参数错误"); } - return connectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator); + return opConnectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator); } @Override public Result createConnector(ConnectorCreateDTO dto, String operator) { dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName()); - Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator); + Result createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator); if (createResult.failed()) { return Result.buildFromIgnoreData(createResult); } - Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName()); + Result ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName()); if (ksConnectorResult.failed()) { return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟"); } - connectorService.addNewToDB(ksConnectorResult.getData()); + opConnectorService.addNewToDB(ksConnectorResult.getData()); return Result.buildSuc(); } @@ -69,12 +73,12 @@ public class ConnectorManagerImpl implements ConnectorManager { public Result createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) { dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName()); - Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator); + Result createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator); if (createResult.failed()) { return Result.buildFromIgnoreData(createResult); } - Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName()); + Result ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName()); if (ksConnectorResult.failed()) { return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟"); } @@ -83,7 +87,7 @@ public class ConnectorManagerImpl implements ConnectorManager { connector.setCheckpointConnectorName(checkpointName); connector.setHeartbeatConnectorName(heartbeatName); - connectorService.addNewToDB(connector); + opConnectorService.addNewToDB(connector); return Result.buildSuc(); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java index de10b0f0..750220a7 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java @@ -37,6 +37,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService; import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService; import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; @@ -67,6 +68,9 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { @Autowired private ConnectorService connectorService; + @Autowired + private OpConnectorService opConnectorService; + @Autowired private WorkerConnectorService workerConnectorService; @@ -156,20 +160,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { Result rv = Result.buildSuc(); if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) { - rv = connectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); + rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); } if (rv.failed()) { return rv; } if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) { - rv = connectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); + rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); } if (rv.failed()) { return rv; } - return connectorService.deleteConnector(connectClusterId, sourceConnectorName, operator); + return opConnectorService.deleteConnector(connectClusterId, sourceConnectorName, operator); } @Override @@ -181,20 +185,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { Result rv = Result.buildSuc(); if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName()) && dto.getCheckpointConnectorConfigs() != null) { - rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator); + rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator); } if (rv.failed()) { return rv; } if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName()) && dto.getHeartbeatConnectorConfigs() != null) { - rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator); + rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator); } if (rv.failed()) { return rv; } - return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator); + return opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator); } @Override @@ -206,20 +210,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { Result rv = Result.buildSuc(); if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) { - rv = connectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); + rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); } if (rv.failed()) { return rv; } if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) { - rv = connectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); + rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); } if (rv.failed()) { return rv; } - return connectorService.restartConnector(connectClusterId, sourceConnectorName, operator); + return opConnectorService.restartConnector(connectClusterId, sourceConnectorName, operator); } @Override @@ -231,20 +235,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { Result rv = Result.buildSuc(); if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) { - rv = connectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); + rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); } if (rv.failed()) { return rv; } if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) { - rv = connectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); + rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); } if (rv.failed()) { return rv; } - return connectorService.stopConnector(connectClusterId, sourceConnectorName, operator); + return opConnectorService.stopConnector(connectClusterId, sourceConnectorName, operator); } @Override @@ -256,20 +260,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { Result rv = Result.buildSuc(); if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) { - rv = connectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); + rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); } if (rv.failed()) { return rv; } if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) { - rv = connectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); + rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); } if (rv.failed()) { return rv; } - return connectorService.resumeConnector(connectClusterId, sourceConnectorName, operator); + return opConnectorService.resumeConnector(connectClusterId, sourceConnectorName, operator); } @Override diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/connect/ConnectConnectorMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/connect/ConnectConnectorMetricCollector.java index 4da6d8fd..c49e1688 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/connect/ConnectConnectorMetricCollector.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/connect/ConnectConnectorMetricCollector.java @@ -44,7 +44,7 @@ public class ConnectConnectorMetricCollector extends AbstractConnectMetricCollec Long connectClusterId = connectCluster.getId(); List items = versionControlService.listVersionControlItem(this.getClusterVersion(connectCluster), collectorType().getCode()); - Result> connectorList = connectorService.listConnectorsFromCluster(connectClusterId); + Result> connectorList = connectorService.listConnectorsFromCluster(connectCluster); FutureWaitUtil future = this.getFutureUtilByClusterPhyId(connectClusterId); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/meta/KafkaMetaService.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/meta/KafkaMetaService.java new file mode 100644 index 00000000..d0307afc --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/meta/KafkaMetaService.java @@ -0,0 +1,44 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.meta; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Kafka元信息服务接口 + */ +public interface KafkaMetaService { + /** + * 从Kafka中获取数据 + * @param connectCluster connect集群 + * @return 全部资源列表, 成功的资源列表 + */ + default Result, List>> getDataFromKafka(ConnectCluster connectCluster) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); } + + /** + * 从Kafka中获取数据 + * @param clusterPhy kafka集群 + * @return 全部资源集合, 成功的资源列表 + */ + default Result, List>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); } + + /** + * 元信息同步至DB中 + * @param clusterId 集群ID + * @param fullNameSet 全部资源列表 + * @param dataList 成功的资源列表 + */ + default void writeToDB(Long clusterId, Set fullNameSet, List dataList) {} + + /** + * 依据kafka集群ID删除数据 + * @param clusterPhyId kafka集群ID + */ + default int deleteInDBByKafkaClusterId(Long clusterPhyId) { return 0; } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ConnectConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ConnectConverter.java index 6dcc30e4..c20add65 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ConnectConverter.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ConnectConverter.java @@ -16,6 +16,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiL import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.Triple; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import java.util.ArrayList; import java.util.HashMap; @@ -24,6 +26,9 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME; +import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME; + public class ConnectConverter { public static ConnectorBasicCombineExistVO convert2BasicVO(ConnectCluster connectCluster, ConnectorPO connectorPO) { ConnectorBasicCombineExistVO vo = new ConnectorBasicCombineExistVO(); @@ -153,6 +158,66 @@ public class ConnectConverter { return ksConnector; } + public static List convertAndSupplyMirrorMakerInfo(ConnectCluster connectCluster, List, KSConnectorStateInfo>> connectorFullInfoList) { + // + Map sourceMap = new HashMap<>(); + + // + Map heartbeatMap = new HashMap<>(); + Map checkpointMap = new HashMap<>(); + + // 获取每个类型的connector的map信息 + connectorFullInfoList.forEach(connector -> { + Map mm2Map = null; + if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) { + mm2Map = sourceMap; + } else if (KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) { + mm2Map = heartbeatMap; + } else if (KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) { + mm2Map = checkpointMap; + } + + String targetBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); + String sourceBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); + + if (ValidateUtils.anyBlank(targetBootstrapServers, sourceBootstrapServers) || mm2Map == null) { + return; + } + + if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) { + // source 类型的格式和 heartbeat & checkpoint 的不一样 + mm2Map.put(connector.v1().getName(), targetBootstrapServers + "@" + sourceBootstrapServers); + } else { + mm2Map.put(targetBootstrapServers + "@" + sourceBootstrapServers, connector.v1().getName()); + } + }); + + + List connectorList = new ArrayList<>(); + connectorFullInfoList.forEach(connector -> { + // 转化并添加到list中 + KSConnector ksConnector = ConnectConverter.convert2KSConnector( + connectCluster.getKafkaClusterPhyId(), + connectCluster.getId(), + connector.v1(), + connector.v3(), + connector.v2() + ); + connectorList.add(ksConnector); + + // 补充mm2信息 + String targetAndSource = sourceMap.get(ksConnector.getConnectorName()); + if (ValidateUtils.isBlank(targetAndSource)) { + return; + } + + ksConnector.setHeartbeatConnectorName(heartbeatMap.getOrDefault(targetAndSource, "")); + ksConnector.setCheckpointConnectorName(checkpointMap.getOrDefault(targetAndSource, "")); + }); + + return connectorList; + } + private static String genConnectorKey(Long connectorId, String connectorName){ return connectorId + "#" + connectorName; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java index 220e4e89..7a85ffd2 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java @@ -4,49 +4,30 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluste import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.meta.KafkaMetaService; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum; import java.util.List; -import java.util.Properties; -import java.util.Set; /** * 查看Connector */ -public interface ConnectorService { - Result createConnector(Long connectClusterId, String connectorName, Properties configs, String operator); - +public interface ConnectorService extends KafkaMetaService { /** * 获取所有的连接器名称列表 */ - Result> listConnectorsFromCluster(Long connectClusterId); + Result> listConnectorsFromCluster(ConnectCluster connectCluster); /** * 获取单个连接器信息 */ Result getConnectorInfoFromCluster(Long connectClusterId, String connectorName); - Result> getConnectorTopicsFromCluster(Long connectClusterId, String connectorName); - Result getConnectorStateInfoFromCluster(Long connectClusterId, String connectorName); - Result getAllConnectorInfoFromCluster(Long connectClusterId, String connectorName); - - Result resumeConnector(Long connectClusterId, String connectorName, String operator); - - Result restartConnector(Long connectClusterId, String connectorName, String operator); - - Result stopConnector(Long connectClusterId, String connectorName, String operator); - - Result deleteConnector(Long connectClusterId, String connectorName, String operator); - - Result updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator); - - void batchReplace(Long kafkaClusterPhyId, Long connectClusterId, List connectorList, Set allConnectorNameSet); - - void addNewToDB(KSConnector connector); + Result getConnectorFromKafka(Long connectClusterId, String connectorName); List listByKafkaClusterIdFromDB(Long kafkaClusterPhyId); @@ -57,6 +38,4 @@ public interface ConnectorService { ConnectorPO getConnectorFromDB(Long connectClusterId, String connectorName); ConnectorTypeEnum getConnectorType(Long connectClusterId, String connectorName); - - void completeMirrorMakerInfo(ConnectCluster connectCluster, List connectorList); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/OpConnectorService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/OpConnectorService.java new file mode 100644 index 00000000..f94c7c08 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/OpConnectorService.java @@ -0,0 +1,26 @@ +package com.xiaojukeji.know.streaming.km.core.service.connect.connector; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; + +import java.util.Properties; + +/** + * 查看Connector + */ +public interface OpConnectorService { + Result createConnector(Long connectClusterId, String connectorName, Properties configs, String operator); + + Result resumeConnector(Long connectClusterId, String connectorName, String operator); + + Result restartConnector(Long connectClusterId, String connectorName, String operator); + + Result stopConnector(Long connectClusterId, String connectorName, String operator); + + Result deleteConnector(Long connectClusterId, String connectorName, String operator); + + Result updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator); + + void addNewToDB(KSConnector connector); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java index 133355a8..74c298b5 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java @@ -3,7 +3,6 @@ package com.xiaojukeji.know.streaming.km.core.service.connect.connector.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; -import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; @@ -13,19 +12,14 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; import com.xiaojukeji.know.streaming.km.common.component.RestTool; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; -import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter; import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum; -import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; -import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; -import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; -import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.Triple; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; -import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; -import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.mysql.connect.ConnectorDAO; import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; @@ -34,14 +28,9 @@ import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; import java.util.*; -import java.util.stream.Collectors; - -import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME; -import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME; -import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_CONNECT_CONNECTOR; @Service -public class ConnectorServiceImpl extends BaseVersionControlService implements ConnectorService { +public class ConnectorServiceImpl implements ConnectorService { private static final ILog LOGGER = LogFactory.getLog(ConnectorServiceImpl.class); @Autowired @@ -53,79 +42,14 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C @Autowired private ConnectClusterService connectClusterService; - @Autowired - private OpLogWrapService opLogWrapService; - private static final String LIST_CONNECTORS_URI = "/connectors"; private static final String GET_CONNECTOR_INFO_PREFIX_URI = "/connectors"; private static final String GET_CONNECTOR_TOPICS_URI = "/connectors/%s/topics"; private static final String GET_CONNECTOR_STATUS_URI = "/connectors/%s/status"; - private static final String CREATE_CONNECTOR_URI = "/connectors"; - private static final String RESUME_CONNECTOR_URI = "/connectors/%s/resume"; - private static final String RESTART_CONNECTOR_URI = "/connectors/%s/restart"; - private static final String PAUSE_CONNECTOR_URI = "/connectors/%s/pause"; - private static final String DELETE_CONNECTOR_URI = "/connectors/%s"; - private static final String UPDATE_CONNECTOR_CONFIG_URI = "/connectors/%s/config"; - @Override - protected VersionItemTypeEnum getVersionItemType() { - return SERVICE_OP_CONNECT_CONNECTOR; - } - - @Override - public Result createConnector(Long connectClusterId, String connectorName, Properties configs, String operator) { + public Result> listConnectorsFromCluster(ConnectCluster connectCluster) { try { - ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); - if (ValidateUtils.isNull(connectCluster)) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); - } - - // 构造参数 - Properties props = new Properties(); - props.put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, connectorName); - props.put("config", configs); - - ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent( - connectCluster.getSuitableRequestUrl() + CREATE_CONNECTOR_URI, - props, - ConnectorInfo.class - ); - - opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( - operator, - OperationEnum.ADD.getDesc(), - ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), - MsgConstant.getConnectorBizStr(connectClusterId, connectorName), - ConvertUtil.obj2Json(configs) - )); - - KSConnectorInfo connector = new KSConnectorInfo(); - connector.setConnectClusterId(connectClusterId); - connector.setConfig(connectorInfo.config()); - connector.setName(connectorInfo.name()); - connector.setTasks(connectorInfo.tasks()); - connector.setType(connectorInfo.type()); - - return Result.buildSuc(connector); - } catch (Exception e) { - LOGGER.error( - "method=createConnector||connectClusterId={}||connectorName={}||configs={}||operator={}||errMsg=exception", - connectClusterId, connectorName, configs, operator, e - ); - - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); - } - } - - @Override - public Result> listConnectorsFromCluster(Long connectClusterId) { - try { - ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); - if (ValidateUtils.isNull(connectCluster)) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); - } - List nameList = restTool.getArrayObjectWithJsonContent( connectCluster.getSuitableRequestUrl() + LIST_CONNECTORS_URI, new HashMap<>(), @@ -135,8 +59,8 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C return Result.buildSuc(nameList); } catch (Exception e) { LOGGER.error( - "method=listConnectorsFromCluster||connectClusterId={}||errMsg=exception", - connectClusterId, e + "method=listConnectorsFromCluster||connectClusterId={}||connectClusterSuitableUrl={}||errMsg=exception", + connectCluster.getId(), connectCluster.getSuitableRequestUrl(), e ); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); @@ -153,16 +77,6 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C return this.getConnectorInfoFromCluster(connectCluster, connectorName); } - @Override - public Result> getConnectorTopicsFromCluster(Long connectClusterId, String connectorName) { - ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); - if (ValidateUtils.isNull(connectCluster)) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); - } - - return this.getConnectorTopicsFromCluster(connectCluster, connectorName); - } - @Override public Result getConnectorStateInfoFromCluster(Long connectClusterId, String connectorName) { ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); @@ -174,270 +88,26 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C } @Override - public Result getAllConnectorInfoFromCluster(Long connectClusterId, String connectorName) { + public Result getConnectorFromKafka(Long connectClusterId, String connectorName) { ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); if (ValidateUtils.isNull(connectCluster)) { return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); } - Result connectorResult = this.getConnectorInfoFromCluster(connectCluster, connectorName); - if (connectorResult.failed()) { - LOGGER.error( - "method=getAllConnectorInfoFromCluster||connectClusterId={}||connectorName={}||result={}", - connectClusterId, connectorName, connectorResult - ); - - return Result.buildFromIgnoreData(connectorResult); - } - - Result> topicNameListResult = this.getConnectorTopicsFromCluster(connectCluster, connectorName); - if (topicNameListResult.failed()) { - LOGGER.error( - "method=getAllConnectorInfoFromCluster||connectClusterId={}||connectorName={}||result={}", - connectClusterId, connectorName, connectorResult - ); - } - - Result stateInfoResult = this.getConnectorStateInfoFromCluster(connectCluster, connectorName); - if (stateInfoResult.failed()) { - LOGGER.error( - "method=getAllConnectorInfoFromCluster||connectClusterId={}||connectorName={}||result={}", - connectClusterId, connectorName, connectorResult - ); + Result, KSConnectorStateInfo>> fullInfoResult = this.getConnectorFullInfoFromKafka(connectCluster, connectorName); + if (fullInfoResult.failed()) { + return Result.buildFromIgnoreData(fullInfoResult); } return Result.buildSuc(ConnectConverter.convert2KSConnector( connectCluster.getKafkaClusterPhyId(), connectCluster.getId(), - connectorResult.getData(), - stateInfoResult.getData(), - topicNameListResult.getData() + fullInfoResult.getData().v1(), + fullInfoResult.getData().v3(), + fullInfoResult.getData().v2() )); } - @Override - public Result resumeConnector(Long connectClusterId, String connectorName, String operator) { - try { - ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); - if (ValidateUtils.isNull(connectCluster)) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); - } - - restTool.putJsonForObject( - connectCluster.getSuitableRequestUrl() + String.format(RESUME_CONNECTOR_URI, connectorName), - new HashMap<>(), - String.class - ); - - this.updateStatus(connectCluster, connectClusterId, connectorName); - - opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( - operator, - OperationEnum.ENABLE.getDesc(), - ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), - MsgConstant.getConnectorBizStr(connectClusterId, connectorName), - "" - )); - - return Result.buildSuc(); - } catch (Exception e) { - LOGGER.error( - "class=ConnectorServiceImpl||method=resumeConnector||connectClusterId={}||errMsg=exception", - connectClusterId, e - ); - - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); - } - } - - @Override - public Result restartConnector(Long connectClusterId, String connectorName, String operator) { - try { - ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); - if (ValidateUtils.isNull(connectCluster)) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); - } - - restTool.postObjectWithJsonContent( - connectCluster.getSuitableRequestUrl() + String.format(RESTART_CONNECTOR_URI, connectorName), - new HashMap<>(), - String.class - ); - - this.updateStatus(connectCluster, connectClusterId, connectorName); - - opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( - operator, - OperationEnum.RESTART.getDesc(), - ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), - MsgConstant.getConnectorBizStr(connectClusterId, connectorName), - "" - )); - - return Result.buildSuc(); - } catch (Exception e) { - LOGGER.error( - "method=restartConnector||connectClusterId={}||errMsg=exception", - connectClusterId, e - ); - - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); - } - } - - @Override - public Result stopConnector(Long connectClusterId, String connectorName, String operator) { - try { - ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); - if (ValidateUtils.isNull(connectCluster)) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); - } - - restTool.putJsonForObject( - connectCluster.getSuitableRequestUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName), - new HashMap<>(), - String.class - ); - - this.updateStatus(connectCluster, connectClusterId, connectorName); - - opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( - operator, - OperationEnum.DISABLE.getDesc(), - ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), - MsgConstant.getConnectorBizStr(connectClusterId, connectorName), - "" - )); - - return Result.buildSuc(); - } catch (Exception e) { - LOGGER.error( - "method=stopConnector||connectClusterId={}||errMsg=exception", - connectClusterId, e - ); - - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); - } - } - - @Override - public Result deleteConnector(Long connectClusterId, String connectorName, String operator) { - try { - ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); - if (ValidateUtils.isNull(connectCluster)) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); - } - - restTool.deleteWithParamsAndHeader( - connectCluster.getSuitableRequestUrl() + String.format(DELETE_CONNECTOR_URI, connectorName), - new HashMap<>(), - new HashMap<>(), - String.class - ); - - opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( - operator, - OperationEnum.DELETE.getDesc(), - ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), - MsgConstant.getConnectorBizStr(connectClusterId, connectorName), - "" - )); - - this.deleteConnectorInDB(connectClusterId, connectorName); - - return Result.buildSuc(); - } catch (Exception e) { - LOGGER.error( - "method=deleteConnector||connectClusterId={}||errMsg=exception", - connectClusterId, e - ); - - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); - } - } - - @Override - public Result updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator) { - try { - ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); - if (ValidateUtils.isNull(connectCluster)) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); - } - - ConnectorInfo connectorInfo = restTool.putJsonForObject( - connectCluster.getSuitableRequestUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName), - configs, - org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo.class - ); - - this.updateStatus(connectCluster, connectClusterId, connectorName); - - opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( - operator, - OperationEnum.EDIT.getDesc(), - ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), - MsgConstant.getConnectorBizStr(connectClusterId, connectorName), - ConvertUtil.obj2Json(configs) - )); - - return Result.buildSuc(); - } catch (Exception e) { - LOGGER.error( - "method=updateConnectorConfig||connectClusterId={}||errMsg=exception", - connectClusterId, e - ); - - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); - } - } - - @Override - public void batchReplace(Long kafkaClusterPhyId, Long connectClusterId, List connectorList, Set allConnectorNameSet) { - List poList = this.listByConnectClusterIdFromDB(connectClusterId); - - Map oldPOMap = new HashMap<>(); - poList.forEach(elem -> oldPOMap.put(elem.getConnectorName(), elem)); - - for (KSConnector connector: connectorList) { - try { - ConnectorPO oldPO = oldPOMap.remove(connector.getConnectorName()); - if (oldPO == null) { - oldPO = ConvertUtil.obj2Obj(connector, ConnectorPO.class); - connectorDAO.insert(oldPO); - } else { - ConnectorPO newPO = ConvertUtil.obj2Obj(connector, ConnectorPO.class); - newPO.setId(oldPO.getId()); - connectorDAO.updateById(newPO); - } - } catch (DuplicateKeyException dke) { - // ignore - } - } - - try { - oldPOMap.values().forEach(elem -> { - if (allConnectorNameSet.contains(elem.getConnectorName())) { - // 当前connector还存在 - return; - } - - // 当前connector不存在了,则进行删除 - connectorDAO.deleteById(elem.getId()); - }); - } catch (Exception e) { - // ignore - } - } - - @Override - public void addNewToDB(KSConnector connector) { - try { - connectorDAO.insert(ConvertUtil.obj2Obj(connector, ConnectorPO.class)); - } catch (DuplicateKeyException dke) { - // ignore - } - } - @Override public List listByKafkaClusterIdFromDB(Long kafkaClusterPhyId) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); @@ -482,53 +152,98 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C } @Override - public void completeMirrorMakerInfo(ConnectCluster connectCluster, List connectorList) { - List sourceConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList()); - if (sourceConnectorList.isEmpty()) { - return; + public Result, List>> getDataFromKafka(ConnectCluster connectCluster) { + Result> nameListResult = this.listConnectorsFromCluster(connectCluster); + if (nameListResult.failed()) { + return Result.buildFromIgnoreData(nameListResult); } - List heartBeatConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE)).collect(Collectors.toList()); - List checkpointConnectorList = connectorList.stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE)).collect(Collectors.toList()); - - Map heartbeatMap = this.buildMirrorMakerMap(connectCluster, heartBeatConnectorList); - Map checkpointMap = this.buildMirrorMakerMap(connectCluster, checkpointConnectorList); - - for (KSConnector sourceConnector : sourceConnectorList) { - Result ret = this.getConnectorInfoFromCluster(connectCluster, sourceConnector.getConnectorName()); - - if (!ret.hasData()) { - LOGGER.error( - "method=completeMirrorMakerInfo||connectClusterId={}||connectorName={}||get connectorInfo fail!", - connectCluster.getId(), sourceConnector.getConnectorName() - ); - continue; - } - KSConnectorInfo ksConnectorInfo = ret.getData(); - String targetServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); - String sourceServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); - - if (ValidateUtils.anyBlank(targetServers, sourceServers)) { + // 逐个获取 + List, KSConnectorStateInfo>> connectorFullInfoList = new ArrayList<>(); + for (String connectorName: nameListResult.getData()) { + Result, KSConnectorStateInfo>> ksConnectorResult = this.getConnectorFullInfoFromKafka(connectCluster, connectorName); + if (ksConnectorResult.failed()) { continue; } - String[] targetBrokerList = getBrokerList(targetServers); - String[] sourceBrokerList = getBrokerList(sourceServers); - sourceConnector.setHeartbeatConnectorName(this.findBindConnector(targetBrokerList, sourceBrokerList, heartbeatMap)); - sourceConnector.setCheckpointConnectorName(this.findBindConnector(targetBrokerList, sourceBrokerList, checkpointMap)); + connectorFullInfoList.add(ksConnectorResult.getData()); } + // 返回结果 + return Result.buildSuc(new Tuple<>( + new HashSet<>(nameListResult.getData()), + ConnectConverter.convertAndSupplyMirrorMakerInfo(connectCluster, connectorFullInfoList)) // 转换并补充mm2相关信息 + ); } - /**************************************************** private method ****************************************************/ - private int deleteConnectorInDB(Long connectClusterId, String connectorName) { + @Override + public void writeToDB(Long connectClusterId, Set fullNameSet, List dataList) { + List poList = this.listByConnectClusterIdFromDB(connectClusterId); + + Map oldPOMap = new HashMap<>(); + poList.forEach(elem -> oldPOMap.put(elem.getConnectorName(), elem)); + + for (KSConnector connector: dataList) { + try { + ConnectorPO oldPO = oldPOMap.remove(connector.getConnectorName()); + if (oldPO == null) { + oldPO = ConvertUtil.obj2Obj(connector, ConnectorPO.class); + connectorDAO.insert(oldPO); + continue; + } + + ConnectorPO newPO = ConvertUtil.obj2Obj(connector, ConnectorPO.class); + newPO.setId(oldPO.getId()); + if (!ValidateUtils.isBlank(oldPO.getCheckpointConnectorName()) + && ValidateUtils.isBlank(newPO.getCheckpointConnectorName()) + && fullNameSet.contains(oldPO.getCheckpointConnectorName())) { + // 新的po里面没有checkpoint的信息,但是db中的数据显示有,且集群中有该connector,则保留该checkpoint数据 + newPO.setCheckpointConnectorName(oldPO.getCheckpointConnectorName()); + } + + if (!ValidateUtils.isBlank(oldPO.getHeartbeatConnectorName()) + && ValidateUtils.isBlank(newPO.getHeartbeatConnectorName()) + && fullNameSet.contains(oldPO.getHeartbeatConnectorName())) { + // 新的po里面没有checkpoint的信息,但是db中的数据显示有,且集群中有该connector,则保留该checkpoint数据 + newPO.setHeartbeatConnectorName(oldPO.getHeartbeatConnectorName()); + } + + connectorDAO.updateById(newPO); + } catch (DuplicateKeyException dke) { + // ignore + } catch (Exception e) { + LOGGER.error( + "method=writeToDB||connectClusterId={}||connectorName={}||errMsg=exception", + connector.getConnectClusterId(), connector.getConnectorName(), e + ); + } + } + + try { + oldPOMap.values().forEach(elem -> { + if (fullNameSet.contains(elem.getConnectorName())) { + // 当前connector还存在 + return; + } + + // 当前connector不存在了,则进行删除 + connectorDAO.deleteById(elem.getId()); + }); + } catch (Exception e) { + // ignore + } + } + + @Override + public int deleteInDBByKafkaClusterId(Long clusterPhyId) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(ConnectorPO::getConnectClusterId, connectClusterId); - lambdaQueryWrapper.eq(ConnectorPO::getConnectorName, connectorName); + lambdaQueryWrapper.eq(ConnectorPO::getKafkaClusterPhyId, clusterPhyId); return connectorDAO.delete(lambdaQueryWrapper); } + /**************************************************** private method ****************************************************/ + private Result getConnectorInfoFromCluster(ConnectCluster connectCluster, String connectorName) { try { ConnectorInfo connectorInfo = restTool.getForObject( @@ -594,90 +309,37 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C } } - private void updateStatus(ConnectCluster connectCluster, Long connectClusterId, String connectorName) { - try { - // 延迟3秒 - BackoffUtils.backoff(2000); - - Result stateInfoResult = this.getConnectorStateInfoFromCluster(connectCluster, connectorName); - if (stateInfoResult.failed()) { - return; - } - - ConnectorPO po = new ConnectorPO(); - po.setConnectClusterId(connectClusterId); - po.setConnectorName(connectorName); - po.setState(stateInfoResult.getData().getConnector().getState()); - - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(ConnectorPO::getConnectClusterId, connectClusterId); - lambdaQueryWrapper.eq(ConnectorPO::getConnectorName, connectorName); - - connectorDAO.update(po, lambdaQueryWrapper); - } catch (Exception e) { + private Result, KSConnectorStateInfo>> getConnectorFullInfoFromKafka(ConnectCluster connectCluster, String connectorName) { + Result connectorResult = this.getConnectorInfoFromCluster(connectCluster, connectorName); + if (connectorResult.failed()) { LOGGER.error( - "method=updateStatus||connectClusterId={}||connectorName={}||errMsg=exception", - connectClusterId, connectorName, e + "method=getConnectorAllInfoFromKafka||connectClusterId={}||connectClusterSuitableUrl={}||result={}||errMsg=get connectors info from cluster failed", + connectCluster.getId(), connectCluster.getSuitableRequestUrl(), connectorResult + ); + + return Result.buildFromIgnoreData(connectorResult); + } + + Result> topicNameListResult = this.getConnectorTopicsFromCluster(connectCluster, connectorName); + if (topicNameListResult.failed()) { + LOGGER.error( + "method=getConnectorAllInfoFromKafka||connectClusterId={}||connectClusterSuitableUrl={}||result={}||errMsg=get connectors topics from cluster failed", + connectCluster.getId(), connectCluster.getSuitableRequestUrl(), topicNameListResult ); } - } - - private Map buildMirrorMakerMap(ConnectCluster connectCluster, List ksConnectorList) { - Map bindMap = new HashMap<>(); - - for (KSConnector ksConnector : ksConnectorList) { - Result ret = this.getConnectorInfoFromCluster(connectCluster, ksConnector.getConnectorName()); - - if (!ret.hasData()) { - LOGGER.error( - "method=buildMirrorMakerMap||connectClusterId={}||connectorName={}||get connectorInfo fail!", - connectCluster.getId(), ksConnector.getConnectorName() - ); - continue; - } - - KSConnectorInfo ksConnectorInfo = ret.getData(); - String targetServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); - String sourceServers = ksConnectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME); - - if (ValidateUtils.anyBlank(targetServers, sourceServers)) { - continue; - } - - String[] targetBrokerList = getBrokerList(targetServers); - String[] sourceBrokerList = getBrokerList(sourceServers); - for (String targetBroker : targetBrokerList) { - for (String sourceBroker : sourceBrokerList) { - bindMap.put(targetBroker + "@" + sourceBroker, ksConnector.getConnectorName()); - } - } + Result stateInfoResult = this.getConnectorStateInfoFromCluster(connectCluster, connectorName); + if (stateInfoResult.failed()) { + LOGGER.error( + "method=getConnectorAllInfoFromKafka||connectClusterId={}||connectClusterSuitableUrl={}||result={}||errMsg=get connectors state from cluster failed", + connectCluster.getId(), connectCluster.getSuitableRequestUrl(), stateInfoResult + ); } - return bindMap; - } - private String findBindConnector(String[] targetBrokerList, String[] sourceBrokerList, Map connectorBindMap) { - for (String targetBroker : targetBrokerList) { - for (String sourceBroker : sourceBrokerList) { - String connectorName = connectorBindMap.get(targetBroker + "@" + sourceBroker); - if (connectorName != null) { - return connectorName; - } - } - } - return ""; - } - - private String[] getBrokerList(String str) { - if (ValidateUtils.isBlank(str)) { - return new String[0]; - } - if (str.contains(";")) { - return str.split(";"); - } - if (str.contains(",")) { - return str.split(","); - } - return new String[]{str}; + return Result.buildSuc(new Triple<>( + connectorResult.getData(), + topicNameListResult.getData(), + stateInfoResult.getData() + )); } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/OpConnectorServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/OpConnectorServiceImpl.java new file mode 100644 index 00000000..df0e9633 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/OpConnectorServiceImpl.java @@ -0,0 +1,352 @@ +package com.xiaojukeji.know.streaming.km.core.service.connect.connector.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; +import com.xiaojukeji.know.streaming.km.common.component.RestTool; +import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; +import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; +import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; +import com.xiaojukeji.know.streaming.km.persistence.mysql.connect.ConnectorDAO; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.stereotype.Service; + +import java.util.*; + +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_CONNECT_CONNECTOR; + +@Service +public class OpConnectorServiceImpl extends BaseVersionControlService implements OpConnectorService { + private static final ILog LOGGER = LogFactory.getLog(OpConnectorServiceImpl.class); + + @Autowired + private RestTool restTool; + + @Autowired + private ConnectorDAO connectorDAO; + + @Autowired + private ConnectClusterService connectClusterService; + + @Autowired + private OpLogWrapService opLogWrapService; + + private static final String GET_CONNECTOR_STATUS_URI = "/connectors/%s/status"; + + private static final String CREATE_CONNECTOR_URI = "/connectors"; + private static final String RESUME_CONNECTOR_URI = "/connectors/%s/resume"; + private static final String RESTART_CONNECTOR_URI = "/connectors/%s/restart"; + private static final String PAUSE_CONNECTOR_URI = "/connectors/%s/pause"; + private static final String DELETE_CONNECTOR_URI = "/connectors/%s"; + private static final String UPDATE_CONNECTOR_CONFIG_URI = "/connectors/%s/config"; + + @Override + protected VersionItemTypeEnum getVersionItemType() { + return SERVICE_OP_CONNECT_CONNECTOR; + } + + @Override + public Result createConnector(Long connectClusterId, String connectorName, Properties configs, String operator) { + try { + ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); + if (ValidateUtils.isNull(connectCluster)) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); + } + + // 构造参数 + Properties props = new Properties(); + props.put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, connectorName); + props.put("config", configs); + + ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent( + connectCluster.getSuitableRequestUrl() + CREATE_CONNECTOR_URI, + props, + ConnectorInfo.class + ); + + opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( + operator, + OperationEnum.ADD.getDesc(), + ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), + MsgConstant.getConnectorBizStr(connectClusterId, connectorName), + ConvertUtil.obj2Json(configs) + )); + + KSConnectorInfo connector = new KSConnectorInfo(); + connector.setConnectClusterId(connectClusterId); + connector.setConfig(connectorInfo.config()); + connector.setName(connectorInfo.name()); + connector.setTasks(connectorInfo.tasks()); + connector.setType(connectorInfo.type()); + + return Result.buildSuc(connector); + } catch (Exception e) { + LOGGER.error( + "method=createConnector||connectClusterId={}||connectorName={}||configs={}||operator={}||errMsg=exception", + connectClusterId, connectorName, configs, operator, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); + } + } + + @Override + public Result resumeConnector(Long connectClusterId, String connectorName, String operator) { + try { + ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); + if (ValidateUtils.isNull(connectCluster)) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); + } + + restTool.putJsonForObject( + connectCluster.getSuitableRequestUrl() + String.format(RESUME_CONNECTOR_URI, connectorName), + new HashMap<>(), + String.class + ); + + this.updateStatus(connectCluster, connectClusterId, connectorName); + + opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( + operator, + OperationEnum.ENABLE.getDesc(), + ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), + MsgConstant.getConnectorBizStr(connectClusterId, connectorName), + "" + )); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "class=ConnectorServiceImpl||method=resumeConnector||connectClusterId={}||errMsg=exception", + connectClusterId, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); + } + } + + @Override + public Result restartConnector(Long connectClusterId, String connectorName, String operator) { + try { + ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); + if (ValidateUtils.isNull(connectCluster)) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); + } + + restTool.postObjectWithJsonContent( + connectCluster.getSuitableRequestUrl() + String.format(RESTART_CONNECTOR_URI, connectorName), + new HashMap<>(), + String.class + ); + + this.updateStatus(connectCluster, connectClusterId, connectorName); + + opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( + operator, + OperationEnum.RESTART.getDesc(), + ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), + MsgConstant.getConnectorBizStr(connectClusterId, connectorName), + "" + )); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "method=restartConnector||connectClusterId={}||errMsg=exception", + connectClusterId, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); + } + } + + @Override + public Result stopConnector(Long connectClusterId, String connectorName, String operator) { + try { + ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); + if (ValidateUtils.isNull(connectCluster)) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); + } + + restTool.putJsonForObject( + connectCluster.getSuitableRequestUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName), + new HashMap<>(), + String.class + ); + + this.updateStatus(connectCluster, connectClusterId, connectorName); + + opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( + operator, + OperationEnum.DISABLE.getDesc(), + ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), + MsgConstant.getConnectorBizStr(connectClusterId, connectorName), + "" + )); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "method=stopConnector||connectClusterId={}||errMsg=exception", + connectClusterId, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); + } + } + + @Override + public Result deleteConnector(Long connectClusterId, String connectorName, String operator) { + try { + ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); + if (ValidateUtils.isNull(connectCluster)) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); + } + + restTool.deleteWithParamsAndHeader( + connectCluster.getSuitableRequestUrl() + String.format(DELETE_CONNECTOR_URI, connectorName), + new HashMap<>(), + new HashMap<>(), + String.class + ); + + opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( + operator, + OperationEnum.DELETE.getDesc(), + ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), + MsgConstant.getConnectorBizStr(connectClusterId, connectorName), + "" + )); + + this.deleteConnectorInDB(connectClusterId, connectorName); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "method=deleteConnector||connectClusterId={}||errMsg=exception", + connectClusterId, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); + } + } + + @Override + public Result updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator) { + try { + ConnectCluster connectCluster = connectClusterService.getById(connectClusterId); + if (ValidateUtils.isNull(connectCluster)) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectClusterNotExist(connectClusterId)); + } + + ConnectorInfo connectorInfo = restTool.putJsonForObject( + connectCluster.getSuitableRequestUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName), + configs, + ConnectorInfo.class + ); + + this.updateStatus(connectCluster, connectClusterId, connectorName); + + opLogWrapService.saveOplogAndIgnoreException(new OplogDTO( + operator, + OperationEnum.EDIT.getDesc(), + ModuleEnum.KAFKA_CONNECT_CONNECTOR.getDesc(), + MsgConstant.getConnectorBizStr(connectClusterId, connectorName), + ConvertUtil.obj2Json(configs) + )); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "method=updateConnectorConfig||connectClusterId={}||errMsg=exception", + connectClusterId, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); + } + } + + @Override + public void addNewToDB(KSConnector connector) { + try { + connectorDAO.insert(ConvertUtil.obj2Obj(connector, ConnectorPO.class)); + } catch (DuplicateKeyException dke) { + // ignore + } + } + + /**************************************************** private method ****************************************************/ + private int deleteConnectorInDB(Long connectClusterId, String connectorName) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(ConnectorPO::getConnectClusterId, connectClusterId); + lambdaQueryWrapper.eq(ConnectorPO::getConnectorName, connectorName); + + return connectorDAO.delete(lambdaQueryWrapper); + } + + private Result getConnectorStateInfoFromCluster(ConnectCluster connectCluster, String connectorName) { + try { + KSConnectorStateInfo connectorStateInfo = restTool.getForObject( + connectCluster.getSuitableRequestUrl() + String.format(GET_CONNECTOR_STATUS_URI, connectorName), + new HashMap<>(), + KSConnectorStateInfo.class + ); + + return Result.buildSuc(connectorStateInfo); + } catch (Exception e) { + LOGGER.error( + "method=getConnectorStateInfoFromCluster||connectClusterId={}||connectorName={}||errMsg=exception", + connectCluster.getId(), connectorName, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_CONNECTOR_READ_FAILED, e.getMessage()); + } + } + + private void updateStatus(ConnectCluster connectCluster, Long connectClusterId, String connectorName) { + try { + // 延迟3秒 + BackoffUtils.backoff(2000); + + Result stateInfoResult = this.getConnectorStateInfoFromCluster(connectCluster, connectorName); + if (stateInfoResult.failed()) { + return; + } + + ConnectorPO po = new ConnectorPO(); + po.setConnectClusterId(connectClusterId); + po.setConnectorName(connectorName); + po.setState(stateInfoResult.getData().getConnector().getState()); + + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(ConnectorPO::getConnectClusterId, connectClusterId); + lambdaQueryWrapper.eq(ConnectorPO::getConnectorName, connectorName); + + connectorDAO.update(po, lambdaQueryWrapper); + } catch (Exception e) { + LOGGER.error( + "method=updateStatus||connectClusterId={}||connectorName={}||errMsg=exception", + connectClusterId, connectorName, e + ); + } + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java index b0a7d7b5..5bfb85ba 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java @@ -78,7 +78,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements } props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers()); - props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d", clusterPhy.getId())); + props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d||timestamp=%d", clusterPhy.getId(), System.currentTimeMillis())); adminClient = KSPartialKafkaAdminClient.create(props); KSListGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups( @@ -179,7 +179,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements } props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers()); - props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d", clusterPhy.getId())); + props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d||timestamp=%d", clusterPhy.getId(), System.currentTimeMillis())); adminClient = KSPartialKafkaAdminClient.create(props); diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java index b03ca7cc..32d76be3 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java @@ -15,7 +15,7 @@ import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectActionEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService; import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -31,9 +31,8 @@ import org.springframework.web.bind.annotation.*; @RestController @RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX) public class KafkaConnectorController { - @Autowired - private ConnectorService connectorService; + private OpConnectorService opConnectorService; @Autowired private ConnectorManager connectorManager; @@ -56,7 +55,7 @@ public class KafkaConnectorController { @DeleteMapping(value ="connectors") @ResponseBody public Result deleteConnectors(@Validated @RequestBody ConnectorDeleteDTO dto) { - return connectorService.deleteConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + return opConnectorService.deleteConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); } @ApiOperation(value = "操作Connector", notes = "") @@ -64,11 +63,11 @@ public class KafkaConnectorController { @ResponseBody public Result operateConnectors(@Validated @RequestBody ConnectorActionDTO dto) { if (ConnectActionEnum.RESTART.getValue().equals(dto.getAction())) { - return connectorService.restartConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + return opConnectorService.restartConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); } else if (ConnectActionEnum.STOP.getValue().equals(dto.getAction())) { - return connectorService.stopConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + return opConnectorService.stopConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); } else if (ConnectActionEnum.RESUME.getValue().equals(dto.getAction())) { - return connectorService.resumeConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + return opConnectorService.resumeConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); } return Result.buildFailure(ResultStatus.PARAM_ILLEGAL); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java index 00e58425..799d7223 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java @@ -3,17 +3,15 @@ package com.xiaojukeji.know.streaming.km.task.connect.metadata; import com.didiglobal.logi.job.annotation.Task; import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; +import java.util.Set; @Task(name = "SyncConnectorTask", @@ -23,40 +21,21 @@ import java.util.List; consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask { - private static final ILog LOGGER = LogFactory.getLog(SyncConnectorTask.class); - @Autowired private ConnectorService connectorService; + @Override public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) { - Result> nameListResult = connectorService.listConnectorsFromCluster(connectCluster.getId()); - if (nameListResult.failed()) { - return TaskResult.FAIL; + // 获取信息 + Result, List>> dataResult = connectorService.getDataFromKafka(connectCluster); + if (dataResult.failed()) { + return new TaskResult(TaskResult.FAIL_CODE, dataResult.getMessage()); } - boolean allSuccess = true; + // 更新到DB + connectorService.writeToDB( connectCluster.getId(), dataResult.getData().v1(), dataResult.getData().v2()); - List connectorList = new ArrayList<>(); - for (String connectorName: nameListResult.getData()) { - Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(connectCluster.getId(), connectorName); - if (ksConnectorResult.failed()) { - LOGGER.error( - "method=processClusterTask||connectClusterId={}||connectorName={}||result={}", - connectCluster.getId(), connectorName, ksConnectorResult - ); - - allSuccess = false; - continue; - } - - connectorList.add(ksConnectorResult.getData()); - } - - //mm2相关信息的添加 - connectorService.completeMirrorMakerInfo(connectCluster, connectorList); - - connectorService.batchReplace(connectCluster.getKafkaClusterPhyId(), connectCluster.getId(), connectorList, new HashSet<>(nameListResult.getData())); - - return allSuccess? TaskResult.SUCCESS: TaskResult.FAIL; + // 返回结果 + return dataResult.getData().v1().size() == dataResult.getData().v2().size()? TaskResult.SUCCESS: TaskResult.FAIL; } }