From 7a0db7161ba05999811a94328ebb0051632cea70 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 6 Dec 2022 19:43:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Connect=20=E4=B8=9A=E5=8A=A1?= =?UTF-8?q?=E5=B1=82=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../biz/cluster/ClusterConnectorsManager.java | 15 ++ .../impl/ClusterConnectorsManagerImpl.java | 152 ++++++++++++++++++ .../connect/connector/ConnectorManager.java | 15 ++ .../connector/WorkerConnectorManager.java | 16 ++ .../connector/impl/ConnectorManagerImpl.java | 93 +++++++++++ .../impl/WorkerConnectorManageImpl.java | 37 +++++ 6 files changed, 328 insertions(+) create mode 100644 km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterConnectorsManager.java create mode 100644 km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterConnectorsManagerImpl.java create mode 100644 km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java create mode 100644 km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/WorkerConnectorManager.java create mode 100644 km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java create mode 100644 km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/WorkerConnectorManageImpl.java diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterConnectorsManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterConnectorsManager.java new file mode 100644 index 00000000..c20c5c77 --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterConnectorsManager.java @@ -0,0 +1,15 @@ +package com.xiaojukeji.know.streaming.km.biz.cluster; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterConnectorsOverviewDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO; + +/** + * Kafka集群Connector概览 + */ +public interface ClusterConnectorsManager { + PaginationResult getClusterConnectorsOverview(Long clusterPhyId, ClusterConnectorsOverviewDTO dto); + + ConnectStateVO getClusterConnectorsState(Long clusterPhyId); +} diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterConnectorsManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterConnectorsManagerImpl.java new file mode 100644 index 00000000..46d34378 --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterConnectorsManagerImpl.java @@ -0,0 +1,152 @@ +package com.xiaojukeji.know.streaming.km.biz.cluster.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterConnectorsManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterConnectorsOverviewDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.connect.MetricsConnectorsDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectWorker; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; +import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil; +import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorMetricService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; +import org.apache.kafka.connect.runtime.AbstractStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + + +@Service +public class ClusterConnectorsManagerImpl implements ClusterConnectorsManager { + private static final ILog LOGGER = LogFactory.getLog(ClusterConnectorsManagerImpl.class); + + @Autowired + private ConnectorService connectorService; + + @Autowired + private ConnectClusterService connectClusterService; + + @Autowired + private ConnectorMetricService connectorMetricService; + + @Autowired + private WorkerService workerService; + + @Autowired + private WorkerConnectorService workerConnectorService; + + @Override + public PaginationResult getClusterConnectorsOverview(Long clusterPhyId, ClusterConnectorsOverviewDTO dto) { + List clusterList = connectClusterService.listByKafkaCluster(clusterPhyId); + + List poList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId); + + // 查询实时指标 + Result> latestMetricsResult = connectorMetricService.getLatestMetricsFromES( + clusterPhyId, + poList.stream().map(elem -> new ClusterConnectorDTO(elem.getConnectClusterId(), elem.getConnectorName())).collect(Collectors.toList()), + dto.getLatestMetricNames() + ); + + if (latestMetricsResult.failed()) { + LOGGER.error("method=getClusterConnectorsOverview||clusterPhyId={}||result={}||errMsg=get latest metric failed", clusterPhyId, latestMetricsResult); + return PaginationResult.buildFailure(latestMetricsResult, dto); + } + + // 转换成vo + List voList = ConnectConverter.convert2ClusterConnectorOverviewVOList(clusterList, poList,latestMetricsResult.getData()); + + // 请求分页信息 + PaginationResult voPaginationResult = this.pagingConnectorInLocal(voList, dto); + if (voPaginationResult.failed()) { + LOGGER.error("method=getClusterConnectorsOverview||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult); + + return PaginationResult.buildFailure(voPaginationResult, dto); + } + + // 查询历史指标 + Result> lineMetricsResult = connectorMetricService.listConnectClusterMetricsFromES( + clusterPhyId, + this.buildMetricsConnectorsDTO( + voPaginationResult.getData().getBizData().stream().map(elem -> new ClusterConnectorDTO(elem.getConnectClusterId(), elem.getConnectorName())).collect(Collectors.toList()), + dto.getMetricLines() + ) + ); + + + return PaginationResult.buildSuc( + ConnectConverter.supplyData2ClusterConnectorOverviewVOList( + voPaginationResult.getData().getBizData(), + lineMetricsResult.getData() + ), + voPaginationResult + ); + } + + @Override + public ConnectStateVO getClusterConnectorsState(Long clusterPhyId) { + //获取Connect集群Id列表 + List connectClusterList = connectClusterService.listByKafkaCluster(clusterPhyId); + List connectorPOList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId); + List workerConnectorList = workerConnectorService.listByKafkaClusterIdFromDB(clusterPhyId); + List connectWorkerList = workerService.listByKafkaClusterIdFromDB(clusterPhyId); + + return convert2ConnectStateVO(connectClusterList, connectorPOList, workerConnectorList, connectWorkerList); + } + + /**************************************************** private method ****************************************************/ + + private MetricsConnectorsDTO buildMetricsConnectorsDTO(List connectorDTOList, MetricDTO metricDTO) { + MetricsConnectorsDTO dto = ConvertUtil.obj2Obj(metricDTO, MetricsConnectorsDTO.class); + dto.setConnectorNameList(connectorDTOList == null? new ArrayList<>(): connectorDTOList); + + return dto; + } + + private ConnectStateVO convert2ConnectStateVO(List connectClusterList, List connectorPOList, List workerConnectorList, List connectWorkerList) { + ConnectStateVO connectStateVO = new ConnectStateVO(); + connectStateVO.setConnectClusterCount(connectClusterList.size()); + connectStateVO.setTotalConnectorCount(connectorPOList.size()); + connectStateVO.setAliveConnectorCount(connectorPOList.stream().filter(elem -> elem.getState().equals(AbstractStatus.State.RUNNING.name())).collect(Collectors.toList()).size()); + connectStateVO.setWorkerCount(connectWorkerList.size()); + connectStateVO.setTotalTaskCount(workerConnectorList.size()); + connectStateVO.setAliveTaskCount(workerConnectorList.stream().filter(elem -> elem.getState().equals(AbstractStatus.State.RUNNING.name())).collect(Collectors.toList()).size()); + return connectStateVO; + } + + private PaginationResult pagingConnectorInLocal(List connectorVOList, ClusterConnectorsOverviewDTO dto) { + //模糊匹配 + connectorVOList = PaginationUtil.pageByFuzzyFilter(connectorVOList, dto.getSearchKeywords(), Arrays.asList("connectClusterName")); + + //排序 + if (!dto.getLatestMetricNames().isEmpty()) { + PaginationMetricsUtil.sortMetrics(connectorVOList, "latestMetrics", dto.getSortMetricNameList(), "connectClusterName", dto.getSortType()); + } else { + PaginationUtil.pageBySort(connectorVOList, dto.getSortField(), dto.getSortType(), "connectClusterName", dto.getSortType()); + } + + //分页 + return PaginationUtil.pageBySubData(connectorVOList, dto); + } + +} diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java new file mode 100644 index 00000000..0247a7d3 --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java @@ -0,0 +1,15 @@ +package com.xiaojukeji.know.streaming.km.biz.connect.connector; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO; + +import java.util.Properties; + +public interface ConnectorManager { + Result updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator); + + Result createConnector(ConnectorCreateDTO dto, String operator); + + Result getConnectorStateVO(Long connectClusterId, String connectorName); +} diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/WorkerConnectorManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/WorkerConnectorManager.java new file mode 100644 index 00000000..eaf82423 --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/WorkerConnectorManager.java @@ -0,0 +1,16 @@ +package com.xiaojukeji.know.streaming.km.biz.connect.connector; + + +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO; + +import java.util.List; + +/** + * @author wyb + * @date 2022/11/14 + */ +public interface WorkerConnectorManager { + Result> getTaskOverview(Long connectClusterId, String connectorName); + +} diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java new file mode 100644 index 00000000..c28f310f --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java @@ -0,0 +1,93 @@ +package com.xiaojukeji.know.streaming.km.biz.connect.connector.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; +import org.apache.kafka.connect.runtime.AbstractStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +@Service +public class ConnectorManagerImpl implements ConnectorManager { + private static final ILog LOGGER = LogFactory.getLog(ConnectorManagerImpl.class); + + @Autowired + private PluginService pluginService; + + @Autowired + private ConnectorService connectorService; + + @Autowired + private WorkerConnectorService workerConnectorService; + + @Override + public Result updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator) { + Result infosResult = pluginService.validateConfig(connectClusterId, configs); + if (infosResult.failed()) { + return Result.buildFromIgnoreData(infosResult); + } + + if (infosResult.getData().getErrorCount() > 0) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Connector参数错误"); + } + + return connectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator); + } + + @Override + public Result createConnector(ConnectorCreateDTO dto, String operator) { + Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator); + if (createResult.failed()) { + return Result.buildFromIgnoreData(createResult); + } + + Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName()); + if (ksConnectorResult.failed()) { + return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟"); + } + + connectorService.addNewToDB(ksConnectorResult.getData()); + return Result.buildSuc(); + } + + @Override + public Result getConnectorStateVO(Long connectClusterId, String connectorName) { + ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, connectorName); + + if (connectorPO == null) { + return Result.buildFailure(ResultStatus.NOT_EXIST); + } + + List workerConnectorList = workerConnectorService.listFromDB(connectClusterId).stream().filter(elem -> elem.getConnectorName().equals(connectorName)).collect(Collectors.toList()); + + return Result.buildSuc(convert2ConnectorOverviewVO(connectorPO, workerConnectorList)); + } + + private ConnectorStateVO convert2ConnectorOverviewVO(ConnectorPO connectorPO, List workerConnectorList) { + ConnectorStateVO connectorStateVO = new ConnectorStateVO(); + connectorStateVO.setConnectClusterId(connectorPO.getConnectClusterId()); + connectorStateVO.setName(connectorPO.getConnectorName()); + connectorStateVO.setType(connectorPO.getConnectorType()); + connectorStateVO.setState(connectorPO.getState()); + connectorStateVO.setTotalTaskCount(workerConnectorList.size()); + connectorStateVO.setAliveTaskCount(workerConnectorList.stream().filter(elem -> elem.getState().equals(AbstractStatus.State.RUNNING.name())).collect(Collectors.toList()).size()); + connectorStateVO.setTotalWorkerCount(workerConnectorList.stream().map(elem -> elem.getWorkerId()).collect(Collectors.toSet()).size()); + return connectorStateVO; + } +} diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/WorkerConnectorManageImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/WorkerConnectorManageImpl.java new file mode 100644 index 00000000..4d0cd317 --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/WorkerConnectorManageImpl.java @@ -0,0 +1,37 @@ +package com.xiaojukeji.know.streaming.km.biz.connect.connector.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.biz.connect.connector.WorkerConnectorManager; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; +import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @author wyb + * @date 2022/11/14 + */ +@Service +public class WorkerConnectorManageImpl implements WorkerConnectorManager { + + private static final ILog LOGGER = LogFactory.getLog(WorkerConnectorManageImpl.class); + + @Autowired + private WorkerConnectorService workerConnectorService; + + @Override + public Result> getTaskOverview(Long connectClusterId, String connectorName) { + ConnectCluster connectCluster = LoadedConnectClusterCache.getByPhyId(connectClusterId); + List workerConnectorList = workerConnectorService.getWorkerConnectorListFromCluster(connectCluster, connectorName); + + return Result.buildSuc(ConvertUtil.list2List(workerConnectorList, KCTaskOverviewVO.class)); + } +}