mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
增加Connect 业务层方法
This commit is contained in:
@@ -0,0 +1,15 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.cluster;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterConnectorsOverviewDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectStateVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO;
|
||||
|
||||
/**
|
||||
* Kafka集群Connector概览
|
||||
*/
|
||||
public interface ClusterConnectorsManager {
|
||||
PaginationResult<ClusterConnectorOverviewVO> getClusterConnectorsOverview(Long clusterPhyId, ClusterConnectorsOverviewDTO dto);
|
||||
|
||||
ConnectStateVO getClusterConnectorsState(Long clusterPhyId);
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.cluster.impl;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterConnectorsManager;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterConnectorsOverviewDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.connect.MetricsConnectorsDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectWorker;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectStateVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
|
||||
import org.apache.kafka.connect.runtime.AbstractStatus;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Service
|
||||
public class ClusterConnectorsManagerImpl implements ClusterConnectorsManager {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ClusterConnectorsManagerImpl.class);
|
||||
|
||||
@Autowired
|
||||
private ConnectorService connectorService;
|
||||
|
||||
@Autowired
|
||||
private ConnectClusterService connectClusterService;
|
||||
|
||||
@Autowired
|
||||
private ConnectorMetricService connectorMetricService;
|
||||
|
||||
@Autowired
|
||||
private WorkerService workerService;
|
||||
|
||||
@Autowired
|
||||
private WorkerConnectorService workerConnectorService;
|
||||
|
||||
@Override
|
||||
public PaginationResult<ClusterConnectorOverviewVO> getClusterConnectorsOverview(Long clusterPhyId, ClusterConnectorsOverviewDTO dto) {
|
||||
List<ConnectCluster> clusterList = connectClusterService.listByKafkaCluster(clusterPhyId);
|
||||
|
||||
List<ConnectorPO> poList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId);
|
||||
|
||||
// 查询实时指标
|
||||
Result<List<ConnectorMetrics>> latestMetricsResult = connectorMetricService.getLatestMetricsFromES(
|
||||
clusterPhyId,
|
||||
poList.stream().map(elem -> new ClusterConnectorDTO(elem.getConnectClusterId(), elem.getConnectorName())).collect(Collectors.toList()),
|
||||
dto.getLatestMetricNames()
|
||||
);
|
||||
|
||||
if (latestMetricsResult.failed()) {
|
||||
LOGGER.error("method=getClusterConnectorsOverview||clusterPhyId={}||result={}||errMsg=get latest metric failed", clusterPhyId, latestMetricsResult);
|
||||
return PaginationResult.buildFailure(latestMetricsResult, dto);
|
||||
}
|
||||
|
||||
// 转换成vo
|
||||
List<ClusterConnectorOverviewVO> voList = ConnectConverter.convert2ClusterConnectorOverviewVOList(clusterList, poList,latestMetricsResult.getData());
|
||||
|
||||
// 请求分页信息
|
||||
PaginationResult<ClusterConnectorOverviewVO> voPaginationResult = this.pagingConnectorInLocal(voList, dto);
|
||||
if (voPaginationResult.failed()) {
|
||||
LOGGER.error("method=getClusterConnectorsOverview||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult);
|
||||
|
||||
return PaginationResult.buildFailure(voPaginationResult, dto);
|
||||
}
|
||||
|
||||
// 查询历史指标
|
||||
Result<List<MetricMultiLinesVO>> lineMetricsResult = connectorMetricService.listConnectClusterMetricsFromES(
|
||||
clusterPhyId,
|
||||
this.buildMetricsConnectorsDTO(
|
||||
voPaginationResult.getData().getBizData().stream().map(elem -> new ClusterConnectorDTO(elem.getConnectClusterId(), elem.getConnectorName())).collect(Collectors.toList()),
|
||||
dto.getMetricLines()
|
||||
)
|
||||
);
|
||||
|
||||
|
||||
return PaginationResult.buildSuc(
|
||||
ConnectConverter.supplyData2ClusterConnectorOverviewVOList(
|
||||
voPaginationResult.getData().getBizData(),
|
||||
lineMetricsResult.getData()
|
||||
),
|
||||
voPaginationResult
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectStateVO getClusterConnectorsState(Long clusterPhyId) {
|
||||
//获取Connect集群Id列表
|
||||
List<ConnectCluster> connectClusterList = connectClusterService.listByKafkaCluster(clusterPhyId);
|
||||
List<ConnectorPO> connectorPOList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId);
|
||||
List<WorkerConnector> workerConnectorList = workerConnectorService.listByKafkaClusterIdFromDB(clusterPhyId);
|
||||
List<ConnectWorker> connectWorkerList = workerService.listByKafkaClusterIdFromDB(clusterPhyId);
|
||||
|
||||
return convert2ConnectStateVO(connectClusterList, connectorPOList, workerConnectorList, connectWorkerList);
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private MetricsConnectorsDTO buildMetricsConnectorsDTO(List<ClusterConnectorDTO> connectorDTOList, MetricDTO metricDTO) {
|
||||
MetricsConnectorsDTO dto = ConvertUtil.obj2Obj(metricDTO, MetricsConnectorsDTO.class);
|
||||
dto.setConnectorNameList(connectorDTOList == null? new ArrayList<>(): connectorDTOList);
|
||||
|
||||
return dto;
|
||||
}
|
||||
|
||||
private ConnectStateVO convert2ConnectStateVO(List<ConnectCluster> connectClusterList, List<ConnectorPO> connectorPOList, List<WorkerConnector> workerConnectorList, List<ConnectWorker> connectWorkerList) {
|
||||
ConnectStateVO connectStateVO = new ConnectStateVO();
|
||||
connectStateVO.setConnectClusterCount(connectClusterList.size());
|
||||
connectStateVO.setTotalConnectorCount(connectorPOList.size());
|
||||
connectStateVO.setAliveConnectorCount(connectorPOList.stream().filter(elem -> elem.getState().equals(AbstractStatus.State.RUNNING.name())).collect(Collectors.toList()).size());
|
||||
connectStateVO.setWorkerCount(connectWorkerList.size());
|
||||
connectStateVO.setTotalTaskCount(workerConnectorList.size());
|
||||
connectStateVO.setAliveTaskCount(workerConnectorList.stream().filter(elem -> elem.getState().equals(AbstractStatus.State.RUNNING.name())).collect(Collectors.toList()).size());
|
||||
return connectStateVO;
|
||||
}
|
||||
|
||||
private PaginationResult<ClusterConnectorOverviewVO> pagingConnectorInLocal(List<ClusterConnectorOverviewVO> connectorVOList, ClusterConnectorsOverviewDTO dto) {
|
||||
//模糊匹配
|
||||
connectorVOList = PaginationUtil.pageByFuzzyFilter(connectorVOList, dto.getSearchKeywords(), Arrays.asList("connectClusterName"));
|
||||
|
||||
//排序
|
||||
if (!dto.getLatestMetricNames().isEmpty()) {
|
||||
PaginationMetricsUtil.sortMetrics(connectorVOList, "latestMetrics", dto.getSortMetricNameList(), "connectClusterName", dto.getSortType());
|
||||
} else {
|
||||
PaginationUtil.pageBySort(connectorVOList, dto.getSortField(), dto.getSortType(), "connectClusterName", dto.getSortType());
|
||||
}
|
||||
|
||||
//分页
|
||||
return PaginationUtil.pageBySubData(connectorVOList, dto);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.connect.connector;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public interface ConnectorManager {
|
||||
Result<Void> updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator);
|
||||
|
||||
Result<Void> createConnector(ConnectorCreateDTO dto, String operator);
|
||||
|
||||
Result<ConnectorStateVO> getConnectorStateVO(Long connectClusterId, String connectorName);
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.connect.connector;
|
||||
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author wyb
|
||||
* @date 2022/11/14
|
||||
*/
|
||||
public interface WorkerConnectorManager {
|
||||
Result<List<KCTaskOverviewVO>> getTaskOverview(Long connectClusterId, String connectorName);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.connect.connector.impl;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
|
||||
import org.apache.kafka.connect.runtime.AbstractStatus;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
public class ConnectorManagerImpl implements ConnectorManager {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ConnectorManagerImpl.class);
|
||||
|
||||
@Autowired
|
||||
private PluginService pluginService;
|
||||
|
||||
@Autowired
|
||||
private ConnectorService connectorService;
|
||||
|
||||
@Autowired
|
||||
private WorkerConnectorService workerConnectorService;
|
||||
|
||||
@Override
|
||||
public Result<Void> updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator) {
|
||||
Result<ConnectConfigInfos> infosResult = pluginService.validateConfig(connectClusterId, configs);
|
||||
if (infosResult.failed()) {
|
||||
return Result.buildFromIgnoreData(infosResult);
|
||||
}
|
||||
|
||||
if (infosResult.getData().getErrorCount() > 0) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Connector参数错误");
|
||||
}
|
||||
|
||||
return connectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {
|
||||
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
|
||||
if (createResult.failed()) {
|
||||
return Result.buildFromIgnoreData(createResult);
|
||||
}
|
||||
|
||||
Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
|
||||
if (ksConnectorResult.failed()) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
|
||||
}
|
||||
|
||||
connectorService.addNewToDB(ksConnectorResult.getData());
|
||||
return Result.buildSuc();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<ConnectorStateVO> getConnectorStateVO(Long connectClusterId, String connectorName) {
|
||||
ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, connectorName);
|
||||
|
||||
if (connectorPO == null) {
|
||||
return Result.buildFailure(ResultStatus.NOT_EXIST);
|
||||
}
|
||||
|
||||
List<WorkerConnector> workerConnectorList = workerConnectorService.listFromDB(connectClusterId).stream().filter(elem -> elem.getConnectorName().equals(connectorName)).collect(Collectors.toList());
|
||||
|
||||
return Result.buildSuc(convert2ConnectorOverviewVO(connectorPO, workerConnectorList));
|
||||
}
|
||||
|
||||
private ConnectorStateVO convert2ConnectorOverviewVO(ConnectorPO connectorPO, List<WorkerConnector> workerConnectorList) {
|
||||
ConnectorStateVO connectorStateVO = new ConnectorStateVO();
|
||||
connectorStateVO.setConnectClusterId(connectorPO.getConnectClusterId());
|
||||
connectorStateVO.setName(connectorPO.getConnectorName());
|
||||
connectorStateVO.setType(connectorPO.getConnectorType());
|
||||
connectorStateVO.setState(connectorPO.getState());
|
||||
connectorStateVO.setTotalTaskCount(workerConnectorList.size());
|
||||
connectorStateVO.setAliveTaskCount(workerConnectorList.stream().filter(elem -> elem.getState().equals(AbstractStatus.State.RUNNING.name())).collect(Collectors.toList()).size());
|
||||
connectorStateVO.setTotalWorkerCount(workerConnectorList.stream().map(elem -> elem.getWorkerId()).collect(Collectors.toSet()).size());
|
||||
return connectorStateVO;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.connect.connector.impl;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.biz.connect.connector.WorkerConnectorManager;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author wyb
|
||||
* @date 2022/11/14
|
||||
*/
|
||||
@Service
|
||||
public class WorkerConnectorManageImpl implements WorkerConnectorManager {
|
||||
|
||||
private static final ILog LOGGER = LogFactory.getLog(WorkerConnectorManageImpl.class);
|
||||
|
||||
@Autowired
|
||||
private WorkerConnectorService workerConnectorService;
|
||||
|
||||
@Override
|
||||
public Result<List<KCTaskOverviewVO>> getTaskOverview(Long connectClusterId, String connectorName) {
|
||||
ConnectCluster connectCluster = LoadedConnectClusterCache.getByPhyId(connectClusterId);
|
||||
List<WorkerConnector> workerConnectorList = workerConnectorService.getWorkerConnectorListFromCluster(connectCluster, connectorName);
|
||||
|
||||
return Result.buildSuc(ConvertUtil.list2List(workerConnectorList, KCTaskOverviewVO.class));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user