From 78b2b8a45e80a65fc1c4fd426566a8b4a39fe5b0 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 9 Feb 2023 16:53:12 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]MM2=E7=AE=A1=E7=90=86-MM2=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E7=9B=B8=E5=85=B3=E4=B8=9A=E5=8A=A1=E7=B1=BB(#894)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../connect/connector/ConnectorManager.java | 1 + .../connector/impl/ConnectorManagerImpl.java | 30 +- .../biz/connect/mm2/MirrorMakerManager.java | 43 ++ .../mm2/impl/MirrorMakerManagerImpl.java | 632 ++++++++++++++++++ 4 files changed, 702 insertions(+), 4 deletions(-) create mode 100644 km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/MirrorMakerManager.java create mode 100644 km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java index 0247a7d3..3752504d 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java @@ -10,6 +10,7 @@ public interface ConnectorManager { Result updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator); Result createConnector(ConnectorCreateDTO dto, String operator); + Result createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator); Result getConnectorStateVO(Long connectClusterId, String connectorName); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java index c28f310f..5800b26f 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java @@ -1,7 +1,5 @@ package com.xiaojukeji.know.streaming.km.biz.connect.connector.impl; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector; @@ -12,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO; +import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; @@ -25,8 +24,6 @@ import java.util.stream.Collectors; @Service public class ConnectorManagerImpl implements ConnectorManager { - private static final ILog LOGGER = LogFactory.getLog(ConnectorManagerImpl.class); - @Autowired private PluginService pluginService; @@ -52,6 +49,8 @@ public class ConnectorManagerImpl implements ConnectorManager { @Override public Result createConnector(ConnectorCreateDTO dto, String operator) { + dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName()); + Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator); if (createResult.failed()) { return Result.buildFromIgnoreData(createResult); @@ -66,6 +65,29 @@ public class ConnectorManagerImpl implements ConnectorManager { return Result.buildSuc(); } + @Override + public Result createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) { + dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName()); + + Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator); + if (createResult.failed()) { + return Result.buildFromIgnoreData(createResult); + } + + Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName()); + if (ksConnectorResult.failed()) { + return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟"); + } + + KSConnector connector = ksConnectorResult.getData(); + connector.setCheckpointConnectorName(checkpointName); + connector.setHeartbeatConnectorName(heartbeatName); + + connectorService.addNewToDB(connector); + return Result.buildSuc(); + } + + @Override public Result getConnectorStateVO(Long connectClusterId, String connectorName) { ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, connectorName); diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/MirrorMakerManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/MirrorMakerManager.java new file mode 100644 index 00000000..6851ca5e --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/MirrorMakerManager.java @@ -0,0 +1,43 @@ +package com.xiaojukeji.know.streaming.km.biz.connect.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterMirrorMakersOverviewDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMakerCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.ClusterMirrorMakerOverviewVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerBaseStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectConfigInfosVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * @author wyb + * @date 2022/12/26 + */ +public interface MirrorMakerManager { + Result createMirrorMaker(MirrorMakerCreateDTO dto, String operator); + + Result deleteMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator); + + Result modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String operator); + + Result restartMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator); + Result stopMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator); + Result resumeMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator); + + Result getMirrorMakerStateVO(Long clusterPhyId); + + PaginationResult getClusterMirrorMakersOverview(Long clusterPhyId, ClusterMirrorMakersOverviewDTO dto); + + + Result getMirrorMakerState(Long connectId, String connectName); + + Result>> getTaskOverview(Long connectClusterId, String connectorName); + Result> getMM2Configs(Long connectClusterId, String connectorName); + + Result> validateConnectors(MirrorMakerCreateDTO dto); +} diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java new file mode 100644 index 00000000..7d7351f2 --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java @@ -0,0 +1,632 @@ +package com.xiaojukeji.know.streaming.km.biz.connect.mm2.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager; +import com.xiaojukeji.know.streaming.km.biz.connect.mm2.MirrorMakerManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterMirrorMakersOverviewDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMakerCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.mm2.MetricsMirrorMakersDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectWorker; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.ClusterMirrorMakerOverviewVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerBaseStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectConfigInfosVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO; +import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.utils.*; +import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.MirrorMakerUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService; +import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.runtime.AbstractStatus.State.RUNNING; +import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.*; + + +/** + * @author wyb + * @date 2022/12/26 + */ +@Service +public class MirrorMakerManagerImpl implements MirrorMakerManager { + private static final ILog LOGGER = LogFactory.getLog(MirrorMakerManagerImpl.class); + + @Autowired + private ConnectorService connectorService; + + @Autowired + private WorkerConnectorService workerConnectorService; + + @Autowired + private WorkerService workerService; + + @Autowired + private ConnectorManager connectorManager; + + @Autowired + private ClusterPhyService clusterPhyService; + + @Autowired + private MirrorMakerMetricService mirrorMakerMetricService; + + @Autowired + private ConnectClusterService connectClusterService; + + @Autowired + private PluginService pluginService; + + @Override + public Result createMirrorMaker(MirrorMakerCreateDTO dto, String operator) { + // 检查基本参数 + Result rv = this.checkCreateMirrorMakerParamAndUnifyData(dto); + if (rv.failed()) { + return rv; + } + + // 创建MirrorSourceConnector + Result sourceConnectResult = connectorManager.createConnector( + dto, + dto.getCheckpointConnectorConfigs() != null? MirrorMakerUtil.genCheckpointName(dto.getConnectorName()): "", + dto.getHeartbeatConnectorConfigs() != null? MirrorMakerUtil.genHeartbeatName(dto.getConnectorName()): "", + operator + ); + if (sourceConnectResult.failed()) { + // 创建失败, 直接返回 + return Result.buildFromIgnoreData(sourceConnectResult); + } + + // 创建 checkpoint 任务 + Result checkpointResult = Result.buildSuc(); + if (dto.getCheckpointConnectorConfigs() != null) { + checkpointResult = connectorManager.createConnector( + new ConnectorCreateDTO(dto.getConnectClusterId(), MirrorMakerUtil.genCheckpointName(dto.getConnectorName()), dto.getCheckpointConnectorConfigs()), + operator + ); + } + + // 创建 heartbeat 任务 + Result heartbeatResult = Result.buildSuc(); + if (dto.getHeartbeatConnectorConfigs() != null) { + heartbeatResult = connectorManager.createConnector( + new ConnectorCreateDTO(dto.getConnectClusterId(), MirrorMakerUtil.genHeartbeatName(dto.getConnectorName()), dto.getHeartbeatConnectorConfigs()), + operator + ); + } + + // 全都成功 + if (checkpointResult.successful() && checkpointResult.successful()) { + return Result.buildSuc(); + } else if (checkpointResult.failed() && checkpointResult.failed()) { + return Result.buildFromRSAndMsg( + ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED, + String.format("创建 checkpoint & heartbeat 失败.\n失败信息分别为:%s\n\n%s", checkpointResult.getMessage(), heartbeatResult.getMessage()) + ); + } else if (checkpointResult.failed()) { + return Result.buildFromRSAndMsg( + ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED, + String.format("创建 checkpoint 失败.\n失败信息分别为:%s", checkpointResult.getMessage()) + ); + } else{ + return Result.buildFromRSAndMsg( + ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED, + String.format("创建 heartbeat 失败.\n失败信息分别为:%s", heartbeatResult.getMessage()) + ); + } + } + + @Override + public Result deleteMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator) { + ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, sourceConnectorName); + if (connectorPO == null) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectorNotExist(connectClusterId, sourceConnectorName)); + } + + Result rv = Result.buildSuc(); + if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) { + rv = connectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); + } + if (rv.failed()) { + return rv; + } + + if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) { + rv = connectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); + } + if (rv.failed()) { + return rv; + } + + return connectorService.deleteConnector(connectClusterId, sourceConnectorName, operator); + } + + @Override + public Result modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String operator) { + ConnectorPO connectorPO = connectorService.getConnectorFromDB(dto.getConnectClusterId(), dto.getConnectorName()); + if (connectorPO == null) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectorNotExist(dto.getConnectClusterId(), dto.getConnectorName())); + } + + Result rv = Result.buildSuc(); + if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName()) && dto.getCheckpointConnectorConfigs() != null) { + rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator); + } + if (rv.failed()) { + return rv; + } + + if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName()) && dto.getHeartbeatConnectorConfigs() != null) { + rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator); + } + if (rv.failed()) { + return rv; + } + + return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator); + } + + @Override + public Result restartMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator) { + ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, sourceConnectorName); + if (connectorPO == null) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectorNotExist(connectClusterId, sourceConnectorName)); + } + + Result rv = Result.buildSuc(); + if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) { + rv = connectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); + } + if (rv.failed()) { + return rv; + } + + if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) { + rv = connectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); + } + if (rv.failed()) { + return rv; + } + + return connectorService.restartConnector(connectClusterId, sourceConnectorName, operator); + } + + @Override + public Result stopMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator) { + ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, sourceConnectorName); + if (connectorPO == null) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectorNotExist(connectClusterId, sourceConnectorName)); + } + + Result rv = Result.buildSuc(); + if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) { + rv = connectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); + } + if (rv.failed()) { + return rv; + } + + if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) { + rv = connectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); + } + if (rv.failed()) { + return rv; + } + + return connectorService.stopConnector(connectClusterId, sourceConnectorName, operator); + } + + @Override + public Result resumeMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator) { + ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, sourceConnectorName); + if (connectorPO == null) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getConnectorNotExist(connectClusterId, sourceConnectorName)); + } + + Result rv = Result.buildSuc(); + if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) { + rv = connectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator); + } + if (rv.failed()) { + return rv; + } + + if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) { + rv = connectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator); + } + if (rv.failed()) { + return rv; + } + + return connectorService.resumeConnector(connectClusterId, sourceConnectorName, operator); + } + + @Override + public Result getMirrorMakerStateVO(Long clusterPhyId) { + List connectorPOList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId); + List workerConnectorList = workerConnectorService.listByKafkaClusterIdFromDB(clusterPhyId); + List workerList = workerService.listByKafkaClusterIdFromDB(clusterPhyId); + + return Result.buildSuc(convert2MirrorMakerStateVO(connectorPOList, workerConnectorList, workerList)); + } + + @Override + public PaginationResult getClusterMirrorMakersOverview(Long clusterPhyId, ClusterMirrorMakersOverviewDTO dto) { + List mirrorMakerList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId).stream().filter(elem -> elem.getConnectorClassName().equals(MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList()); + List connectClusterList = connectClusterService.listByKafkaCluster(clusterPhyId); + + + Result> latestMetricsResult = mirrorMakerMetricService.getLatestMetricsFromES(clusterPhyId, + mirrorMakerList.stream().map(elem -> new Tuple<>(elem.getConnectClusterId(), elem.getConnectorName())).collect(Collectors.toList()), + dto.getLatestMetricNames()); + + if (latestMetricsResult.failed()) { + LOGGER.error("method=getClusterMirrorMakersOverview||clusterPhyId={}||result={}||errMsg=get latest metric failed", clusterPhyId, latestMetricsResult); + return PaginationResult.buildFailure(latestMetricsResult, dto); + } + + List mirrorMakerOverviewVOList = this.convert2ClusterMirrorMakerOverviewVO(mirrorMakerList, connectClusterList, latestMetricsResult.getData()); + + PaginationResult voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerOverviewVOList, dto); + + if (voPaginationResult.failed()) { + LOGGER.error("method=ClusterMirrorMakerOverviewVO||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult); + + return PaginationResult.buildFailure(voPaginationResult, dto); + } + + //这里再补充源集群和目的集群信息,减少网络请求。 + this.completeClusterInfo(voPaginationResult.getData().getBizData()); + + + // 查询历史指标 + Result> lineMetricsResult = mirrorMakerMetricService.listMirrorMakerClusterMetricsFromES( + clusterPhyId, + this.buildMetricsConnectorsDTO( + voPaginationResult.getData().getBizData().stream().map(elem -> new ClusterConnectorDTO(elem.getConnectClusterId(), elem.getConnectorName())).collect(Collectors.toList()), + dto.getMetricLines() + )); + + return PaginationResult.buildSuc( + this.supplyData2ClusterMirrorMakerOverviewVOList( + voPaginationResult.getData().getBizData(), + lineMetricsResult.getData() + ), + voPaginationResult + ); + } + + @Override + public Result getMirrorMakerState(Long connectClusterId, String connectName) { + //mm2任务 + ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, connectName); + if (connectorPO == null){ + return Result.buildFrom(ResultStatus.NOT_EXIST); + } + + List workerConnectorList = workerConnectorService.listFromDB(connectClusterId).stream() + .filter(workerConnector -> workerConnector.getConnectorName().equals(connectorPO.getConnectorName()) + || (!StringUtils.isBlank(connectorPO.getCheckpointConnectorName()) && workerConnector.getConnectorName().equals(connectorPO.getCheckpointConnectorName())) + || (!StringUtils.isBlank(connectorPO.getHeartbeatConnectorName()) && workerConnector.getConnectorName().equals(connectorPO.getHeartbeatConnectorName()))) + .collect(Collectors.toList()); + + MirrorMakerBaseStateVO mirrorMakerBaseStateVO = new MirrorMakerBaseStateVO(); + mirrorMakerBaseStateVO.setTotalTaskCount(workerConnectorList.size()); + mirrorMakerBaseStateVO.setAliveTaskCount(workerConnectorList.stream().filter(elem -> elem.getState().equals(RUNNING.name())).collect(Collectors.toList()).size()); + mirrorMakerBaseStateVO.setWorkerCount(workerConnectorList.stream().collect(Collectors.groupingBy(WorkerConnector::getWorkerId)).size()); + return Result.buildSuc(mirrorMakerBaseStateVO); + } + + @Override + public Result>> getTaskOverview(Long connectClusterId, String connectorName) { + ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, connectorName); + if (connectorPO == null){ + return Result.buildFrom(ResultStatus.NOT_EXIST); + } + + Map> listMap = new HashMap<>(); + List workerConnectorList = workerConnectorService.listFromDB(connectClusterId); + if (workerConnectorList.isEmpty()){ + return Result.buildSuc(listMap); + } + workerConnectorList.forEach(workerConnector -> { + if (workerConnector.getConnectorName().equals(connectorPO.getConnectorName())){ + listMap.putIfAbsent(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE, new ArrayList<>()); + listMap.get(MIRROR_MAKER_SOURCE_CONNECTOR_TYPE).add(ConvertUtil.obj2Obj(workerConnector, KCTaskOverviewVO.class)); + } else if (workerConnector.getConnectorName().equals(connectorPO.getCheckpointConnectorName())) { + listMap.putIfAbsent(KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE, new ArrayList<>()); + listMap.get(MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE).add(ConvertUtil.obj2Obj(workerConnector, KCTaskOverviewVO.class)); + } else if (workerConnector.getConnectorName().equals(connectorPO.getHeartbeatConnectorName())) { + listMap.putIfAbsent(KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE, new ArrayList<>()); + listMap.get(MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE).add(ConvertUtil.obj2Obj(workerConnector, KCTaskOverviewVO.class)); + } + + }); + + return Result.buildSuc(listMap); + } + + @Override + public Result> getMM2Configs(Long connectClusterId, String connectorName) { + ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, connectorName); + if (connectorPO == null){ + return Result.buildFrom(ResultStatus.NOT_EXIST); + } + + List propList = new ArrayList<>(); + + // source + Result connectorResult = connectorService.getConnectorInfoFromCluster(connectClusterId, connectorPO.getConnectorName()); + if (connectorResult.failed()) { + return Result.buildFromIgnoreData(connectorResult); + } + + Properties props = new Properties(); + props.putAll(connectorResult.getData().getConfig()); + propList.add(props); + + // checkpoint + if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) { + connectorResult = connectorService.getConnectorInfoFromCluster(connectClusterId, connectorPO.getCheckpointConnectorName()); + if (connectorResult.failed()) { + return Result.buildFromIgnoreData(connectorResult); + } + + props = new Properties(); + props.putAll(connectorResult.getData().getConfig()); + propList.add(props); + } + + + // heartbeat + if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) { + connectorResult = connectorService.getConnectorInfoFromCluster(connectClusterId, connectorPO.getHeartbeatConnectorName()); + if (connectorResult.failed()) { + return Result.buildFromIgnoreData(connectorResult); + } + + props = new Properties(); + props.putAll(connectorResult.getData().getConfig()); + propList.add(props); + } + + return Result.buildSuc(propList); + } + + @Override + public Result> validateConnectors(MirrorMakerCreateDTO dto) { + List voList = new ArrayList<>(); + + Result infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs()); + if (infoResult.failed()) { + return Result.buildFromIgnoreData(infoResult); + } + + voList.add(ConvertUtil.obj2Obj(infoResult.getData(), ConnectConfigInfosVO.class)); + + if (dto.getHeartbeatConnectorConfigs() != null) { + infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getHeartbeatConnectorConfigs()); + if (infoResult.failed()) { + return Result.buildFromIgnoreData(infoResult); + } + + voList.add(ConvertUtil.obj2Obj(infoResult.getData(), ConnectConfigInfosVO.class)); + } + + if (dto.getCheckpointConnectorConfigs() != null) { + infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getCheckpointConnectorConfigs()); + if (infoResult.failed()) { + return Result.buildFromIgnoreData(infoResult); + } + + voList.add(ConvertUtil.obj2Obj(infoResult.getData(), ConnectConfigInfosVO.class)); + } + + return Result.buildSuc(voList); + } + + + /**************************************************** private method ****************************************************/ + + private MetricsMirrorMakersDTO buildMetricsConnectorsDTO(List connectorDTOList, MetricDTO metricDTO) { + MetricsMirrorMakersDTO dto = ConvertUtil.obj2Obj(metricDTO, MetricsMirrorMakersDTO.class); + dto.setConnectorNameList(connectorDTOList == null? new ArrayList<>(): connectorDTOList); + + return dto; + } + + public Result checkCreateMirrorMakerParamAndUnifyData(MirrorMakerCreateDTO dto) { + ClusterPhy sourceClusterPhy = clusterPhyService.getClusterByCluster(dto.getSourceKafkaClusterId()); + if (sourceClusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(dto.getSourceKafkaClusterId())); + } + + ConnectCluster connectCluster = connectClusterService.getById(dto.getConnectClusterId()); + if (connectCluster == null) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getConnectClusterNotExist(dto.getConnectClusterId())); + } + + ClusterPhy targetClusterPhy = clusterPhyService.getClusterByCluster(connectCluster.getKafkaClusterPhyId()); + if (targetClusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(connectCluster.getKafkaClusterPhyId())); + } + + if (!dto.getConfigs().containsKey(CONNECTOR_CLASS_FILED_NAME)) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector缺少connector.class"); + } + + if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getConfigs().getProperty(CONNECTOR_CLASS_FILED_NAME))) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector的connector.class类型错误"); + } + + if (dto.getCheckpointConnectorConfigs() != null) { + if (!dto.getCheckpointConnectorConfigs().containsKey(CONNECTOR_CLASS_FILED_NAME)) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "CheckpointConnector缺少connector.class"); + } + + if (!MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE.equals(dto.getCheckpointConnectorConfigs().getProperty(CONNECTOR_CLASS_FILED_NAME))) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Checkpoint的connector.class类型错误"); + } + } + + if (dto.getHeartbeatConnectorConfigs() != null) { + if (!dto.getHeartbeatConnectorConfigs().containsKey(CONNECTOR_CLASS_FILED_NAME)) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "HeartbeatConnector缺少connector.class"); + } + + if (!MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE.equals(dto.getHeartbeatConnectorConfigs().getProperty(CONNECTOR_CLASS_FILED_NAME))) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Heartbeat的connector.class类型错误"); + } + } + + dto.unifyData( + sourceClusterPhy.getId(), sourceClusterPhy.getBootstrapServers(), ConvertUtil.str2ObjByJson(sourceClusterPhy.getClientProperties(), Properties.class), + targetClusterPhy.getId(), targetClusterPhy.getBootstrapServers(), ConvertUtil.str2ObjByJson(targetClusterPhy.getClientProperties(), Properties.class) + ); + + return Result.buildSuc(); + } + + private MirrorMakerStateVO convert2MirrorMakerStateVO(List connectorPOList,List workerConnectorList,List workerList){ + MirrorMakerStateVO mirrorMakerStateVO = new MirrorMakerStateVO(); + + List sourceSet = connectorPOList.stream().filter(elem -> elem.getConnectorClassName().equals(MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList()); + mirrorMakerStateVO.setMirrorMakerCount(sourceSet.size()); + + Set connectClusterIdSet = sourceSet.stream().map(ConnectorPO::getConnectClusterId).collect(Collectors.toSet()); + mirrorMakerStateVO.setWorkerCount(workerList.stream().filter(elem -> connectClusterIdSet.contains(elem.getConnectClusterId())).collect(Collectors.toList()).size()); + + List mirrorMakerConnectorList = new ArrayList<>(); + mirrorMakerConnectorList.addAll(sourceSet); + mirrorMakerConnectorList.addAll(connectorPOList.stream().filter(elem -> elem.getConnectorClassName().equals(MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE)).collect(Collectors.toList())); + mirrorMakerConnectorList.addAll(connectorPOList.stream().filter(elem -> elem.getConnectorClassName().equals(MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE)).collect(Collectors.toList())); + mirrorMakerStateVO.setTotalConnectorCount(mirrorMakerConnectorList.size()); + mirrorMakerStateVO.setAliveConnectorCount(mirrorMakerConnectorList.stream().filter(elem -> elem.getState().equals(RUNNING.name())).collect(Collectors.toList()).size()); + + Set connectorNameSet = mirrorMakerConnectorList.stream().map(elem -> elem.getConnectorName()).collect(Collectors.toSet()); + List taskList = workerConnectorList.stream().filter(elem -> connectorNameSet.contains(elem.getConnectorName())).collect(Collectors.toList()); + mirrorMakerStateVO.setTotalTaskCount(taskList.size()); + mirrorMakerStateVO.setAliveTaskCount(taskList.stream().filter(elem -> elem.getState().equals(RUNNING.name())).collect(Collectors.toList()).size()); + + return mirrorMakerStateVO; + } + + private List convert2ClusterMirrorMakerOverviewVO(List mirrorMakerList, List connectClusterList, List latestMetric) { + List clusterMirrorMakerOverviewVOList = new ArrayList<>(); + Map metricsMap = latestMetric.stream().collect(Collectors.toMap(elem -> elem.getConnectClusterId() + "@" + elem.getConnectorName(), Function.identity())); + Map connectClusterMap = connectClusterList.stream().collect(Collectors.toMap(elem -> elem.getId(), Function.identity())); + + for (ConnectorPO mirrorMaker : mirrorMakerList) { + ClusterMirrorMakerOverviewVO clusterMirrorMakerOverviewVO = new ClusterMirrorMakerOverviewVO(); + clusterMirrorMakerOverviewVO.setConnectClusterId(mirrorMaker.getConnectClusterId()); + clusterMirrorMakerOverviewVO.setConnectClusterName(connectClusterMap.get(mirrorMaker.getConnectClusterId()).getName()); + clusterMirrorMakerOverviewVO.setConnectorName(mirrorMaker.getConnectorName()); + clusterMirrorMakerOverviewVO.setState(mirrorMaker.getState()); + clusterMirrorMakerOverviewVO.setCheckpointConnector(mirrorMaker.getCheckpointConnectorName()); + clusterMirrorMakerOverviewVO.setTaskCount(mirrorMaker.getTaskCount()); + clusterMirrorMakerOverviewVO.setHeartbeatConnector(mirrorMaker.getHeartbeatConnectorName()); + clusterMirrorMakerOverviewVO.setLatestMetrics(metricsMap.getOrDefault(mirrorMaker.getConnectClusterId() + "@" + mirrorMaker.getConnectorName(), new MirrorMakerMetrics(mirrorMaker.getConnectClusterId(), mirrorMaker.getConnectorName()))); + clusterMirrorMakerOverviewVOList.add(clusterMirrorMakerOverviewVO); + } + return clusterMirrorMakerOverviewVOList; + } + + PaginationResult pagingMirrorMakerInLocal(List mirrorMakerOverviewVOList, ClusterMirrorMakersOverviewDTO dto) { + List mirrorMakerVOList = PaginationUtil.pageByFuzzyFilter(mirrorMakerOverviewVOList, dto.getSearchKeywords(), Arrays.asList("connectClusterName")); + + //排序 + if (!dto.getLatestMetricNames().isEmpty()) { + PaginationMetricsUtil.sortMetrics(mirrorMakerVOList, "latestMetrics", dto.getSortMetricNameList(), "connectClusterName", dto.getSortType()); + } else { + PaginationUtil.pageBySort(mirrorMakerVOList, dto.getSortField(), dto.getSortType(), "connectClusterName", dto.getSortType()); + } + + //分页 + return PaginationUtil.pageBySubData(mirrorMakerVOList, dto); + } + + public static List supplyData2ClusterMirrorMakerOverviewVOList(List voList, + List metricLineVOList) { + Map> metricLineMap = new HashMap<>(); + if (metricLineVOList != null) { + for (MetricMultiLinesVO metricMultiLinesVO : metricLineVOList) { + metricMultiLinesVO.getMetricLines() + .forEach(metricLineVO -> { + String key = metricLineVO.getName(); + List metricLineVOS = metricLineMap.getOrDefault(key, new ArrayList<>()); + metricLineVOS.add(metricLineVO); + metricLineMap.put(key, metricLineVOS); + }); + } + } + + voList.forEach(elem -> { + elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName())); + }); + + return voList; + } + + private void completeClusterInfo(List mirrorMakerVOList) { + + for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) { + Result connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()); + if (!connectorInfoRet.hasData()) { + continue; + } + KSConnectorInfo connectorInfo = connectorInfoRet.getData(); + + String sourceClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME); + String targetClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME); + //先默认设置为集群别名 + mirrorMakerVO.setSourceKafkaClusterName(sourceClusterAlias); + mirrorMakerVO.setDestKafkaClusterName(targetClusterAlias); + + if (!ValidateUtils.isBlank(sourceClusterAlias) && CommonUtils.isNumeric(sourceClusterAlias)) { + ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(Long.valueOf(sourceClusterAlias)); + if (clusterPhy != null) { + mirrorMakerVO.setSourceKafkaClusterId(clusterPhy.getId()); + mirrorMakerVO.setSourceKafkaClusterName(clusterPhy.getName()); + } + } + + if (!ValidateUtils.isBlank(targetClusterAlias) && CommonUtils.isNumeric(targetClusterAlias)) { + ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(Long.valueOf(targetClusterAlias)); + if (clusterPhy != null) { + mirrorMakerVO.setDestKafkaClusterId(clusterPhy.getId()); + mirrorMakerVO.setDestKafkaClusterName(clusterPhy.getName()); + } + } + + } + } +}