增加Connect Rest接口

This commit is contained in:
zengqiao
2022-12-06 19:44:18 +08:00
committed by EricZeng
parent 7a0db7161b
commit d9c59cb3d3
6 changed files with 453 additions and 0 deletions

View File

@@ -0,0 +1,133 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster;
import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterConnectorsManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterConnectorsOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.connect.MetricsConnectClustersDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.connect.MetricsConnectorsDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectClusterBasicCombineExistVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectClusterBasicVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterWorkerOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectStateVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author zengqiao
* @date 22/10/27
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "集群Connects-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_PREFIX) // 这里使用 API_V3_PREFIX 没有使用 API_V3_CONNECT_PREFIX 的原因是这个接口在Kafka集群页面下
public class ClusterConnectsController {
@Autowired
private ConnectorService connectorService;
@Autowired
private ConnectorMetricService connectorMetricService;
@Autowired
private ConnectClusterService connectClusterService;
@Autowired
private ConnectClusterMetricService connectClusterMetricService;
@Autowired
private WorkerService workerService;
@Autowired
private ClusterConnectorsManager clusterConnectorsManager;
/**************************************************** connect method ****************************************************/
@ApiOperation(value = "Connect集群基本信息", notes = "")
@GetMapping(value = "kafka-clusters/{clusterPhyId}/connect-clusters/{connectClusterName}/basic-combine-exist")
@ResponseBody
public Result<ConnectClusterBasicCombineExistVO> getBasicCombineExist(@PathVariable Long clusterPhyId,
@PathVariable String connectClusterName) {
return Result.buildSuc(ConnectConverter.convert2ConnectClusterBasicCombineExistVO(
connectClusterService.getByName(clusterPhyId, connectClusterName))
);
}
@ApiOperation(value = "Connect集群基本信息列表", notes = "")
@GetMapping(value = "kafka-clusters/{clusterPhyId}/connect-clusters-basic")
@ResponseBody
public Result<List<ConnectClusterBasicVO>> getClusterConnectClustersBasic(@PathVariable Long clusterPhyId) {
return Result.buildSuc(ConvertUtil.list2List(connectClusterService.listByKafkaCluster(clusterPhyId), ConnectClusterBasicVO.class));
}
@ApiOperation(value = "Connect集群指标信息")
@PostMapping(value = "kafka-clusters/{clusterPhyId}/connect-cluster-metrics")
@ResponseBody
public Result<List<MetricMultiLinesVO>> getConnectClusterMetrics(@PathVariable Long clusterPhyId,
@Validated @RequestBody MetricsConnectClustersDTO dto) {
return connectClusterMetricService.listConnectClusterMetricsFromES(clusterPhyId, dto);
}
@ApiOperation(value = "集群Connectors状态", notes = "")
@GetMapping(value = "kafka-clusters/{clusterPhyId}/connect-state")
@ResponseBody
public Result<ConnectStateVO> getClusterConnectorsState(@PathVariable Long clusterPhyId) {
return Result.buildSuc(clusterConnectorsManager.getClusterConnectorsState(clusterPhyId));
}
/**************************************************** connector method ****************************************************/
@ApiOperation(value = "Connectors基本信息", notes = "")
@GetMapping(value = "clusters/{clusterPhyId}/connectors-basic")
@ResponseBody
public Result<List<ConnectorBasicVO>> getClusterConnectorsBasic(@PathVariable Long clusterPhyId) {
return Result.buildSuc(
ConnectConverter.convert2BasicVOList(
connectClusterService.listByKafkaCluster(clusterPhyId),
connectorService.listByKafkaClusterIdFromDB(clusterPhyId)
)
);
}
@ApiOperation(value = "Connectors概览列表", notes = "")
@PostMapping(value = "clusters/{clusterPhyId}/connectors-overview")
@ResponseBody
public PaginationResult<ClusterConnectorOverviewVO> getClusterConnectorsOverview(@PathVariable Long clusterPhyId,
@Validated @RequestBody ClusterConnectorsOverviewDTO dto) {
return clusterConnectorsManager.getClusterConnectorsOverview(clusterPhyId, dto);
}
@ApiOperation(value = "集群Connectors指标信息")
@PostMapping(value = "clusters/{clusterPhyId}/connectors-metrics")
@ResponseBody
public Result<List<MetricMultiLinesVO>> getClusterPhyMetrics(@PathVariable Long clusterPhyId,
@Validated @RequestBody MetricsConnectorsDTO dto) {
return connectorMetricService.listConnectClusterMetricsFromES(clusterPhyId, dto);
}
/**************************************************** connector method ****************************************************/
@ApiOperation(value = "worker概览列表", notes = "")
@GetMapping(value = "clusters/{clusterPhyId}/workers-overview")
@ResponseBody
public PaginationResult<ClusterWorkerOverviewVO> getClusterWorkersOverview(@PathVariable Long clusterPhyId, PaginationBaseDTO dto) {
return workerService.pageWorkByKafkaClusterPhy(clusterPhyId, dto);
}
}

View File

@@ -0,0 +1,42 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.connect;
import com.didiglobal.logi.security.util.HttpRequestUtil;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.cluster.ConnectClusterDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author zengqiao
* @date 22/10/17
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Cluster-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX)
public class KafkaConnectClusterController {
@Autowired
private ConnectClusterService connectClusterService;
@ApiOperation(value = "删除Connect集群")
@DeleteMapping(value = "connect-clusters")
@ResponseBody
public Result<Void> deleteConnectCluster(@RequestParam("connectClusterId") Long connectClusterId) {
return connectClusterService.deleteInDB(connectClusterId, HttpRequestUtil.getOperator());
}
@ApiOperation(value = "修改Connect集群", notes = "")
@PutMapping(value = "batch-connect-clusters")
@ResponseBody
public Result<Void> batchModifyConnectCluster(@Validated @RequestBody List<ConnectClusterDTO> dtoList) {
return connectClusterService.batchModifyInDB(dtoList, HttpRequestUtil.getOperator());
}
}

View File

@@ -0,0 +1,56 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.plugin.ConnectPluginBasic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectConfigInfosVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectPluginBasicVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author zengqiao
* @date 22/10/17
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Plugin-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX)
public class KafkaConnectPluginController {
@Autowired
private PluginService pluginService;
@ApiOperation(value = "Connect集群插件", notes = "")
@GetMapping(value = "clusters/{connectClusterId}/connector-plugins")
@ResponseBody
public Result<List<ConnectPluginBasicVO>> getConnectorPlugins(@PathVariable Long connectClusterId) {
Result<List<ConnectPluginBasic>> listResult = pluginService.listPluginsFromCluster(connectClusterId);
if (listResult.failed()) {
return Result.buildFromIgnoreData(listResult);
}
listResult.getData().forEach(elem -> elem.setHelpDocLink("https://www.confluent.io/hub/"));
return Result.buildSuc(ConvertUtil.list2List(listResult.getData(), ConnectPluginBasicVO.class));
}
@ApiOperation(value = "Connect插件配置", notes = "")
@GetMapping(value = "clusters/{connectClusterId}/connector-plugins/{pluginName}/config")
@ResponseBody
public Result<ConnectConfigInfosVO> getPluginConfig(@PathVariable Long connectClusterId, @PathVariable String pluginName) {
Result<ConnectConfigInfos> infosResult = pluginService.getConfig(connectClusterId, pluginName);
if (infosResult.failed()) {
return Result.buildFromIgnoreData(infosResult);
}
return Result.buildSuc(ConvertUtil.obj2Obj(infosResult.getData(), ConnectConfigInfosVO.class));
}
}

View File

@@ -0,0 +1,91 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.connect;
import com.didiglobal.logi.security.util.HttpRequestUtil;
import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorActionDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorConfigModifyDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectConfigInfosVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectActionEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
/**
* @author zengqiao
* @date 22/10/17
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Connector自身-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX)
public class KafkaConnectorController {
@Autowired
private ConnectorService connectorService;
@Autowired
private ConnectorManager connectorManager;
@Autowired
private PluginService pluginService;
@ApiOperation(value = "创建Connector", notes = "")
@PostMapping(value = "connectors")
@ResponseBody
public Result<Void> createConnector(@Validated @RequestBody ConnectorCreateDTO dto) {
return connectorManager.createConnector(dto, HttpRequestUtil.getOperator());
}
@ApiOperation(value = "删除Connector", notes = "")
@DeleteMapping(value ="connectors")
@ResponseBody
public Result<Void> deleteConnectors(@Validated @RequestBody ConnectorDeleteDTO dto) {
return connectorService.deleteConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator());
}
@ApiOperation(value = "操作Connector", notes = "")
@PutMapping(value ="connectors")
@ResponseBody
public Result<Void> operateConnectors(@Validated @RequestBody ConnectorActionDTO dto) {
if (ConnectActionEnum.RESTART.getValue().equals(dto.getAction())) {
return connectorService.restartConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator());
} else if (ConnectActionEnum.STOP.getValue().equals(dto.getAction())) {
return connectorService.stopConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator());
} else if (ConnectActionEnum.RESUME.getValue().equals(dto.getAction())) {
return connectorService.resumeConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator());
}
return Result.buildFailure(ResultStatus.PARAM_ILLEGAL);
}
@ApiOperation(value = "修改Connector配置", notes = "")
@PutMapping(value ="connectors-config")
@ResponseBody
public Result<Void> modifyConnectors(@Validated @RequestBody ConnectorConfigModifyDTO dto) {
return connectorManager.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), HttpRequestUtil.getOperator());
}
@ApiOperation(value = "校验Connector配置", notes = "")
@PutMapping(value ="connectors-config/validate")
@ResponseBody
public Result<ConnectConfigInfosVO> validateConnectors(@Validated @RequestBody ConnectorConfigModifyDTO dto) {
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs());
if (infoResult.failed()) {
return Result.buildFromIgnoreData(infoResult);
}
return Result.buildSuc(ConvertUtil.obj2Obj(infoResult.getData(), ConnectConfigInfosVO.class));
}
}

View File

@@ -0,0 +1,98 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.connect;
import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager;
import com.xiaojukeji.know.streaming.km.biz.connect.connector.WorkerConnectorManager;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicCombineExistVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Properties;
/**
* @author zengqiao
* @date 22/10/17
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Connector状态-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX)
public class KafkaConnectorStateController {
@Autowired
private ConnectorService connectorService;
@Autowired
private ConnectorMetricService connectorMetricService;
@Autowired
private WorkerConnectorManager workerConnectorManager;
@Autowired
private ConnectorManager connectorManager;
@Autowired
private ConnectClusterService connectClusterService;
@ApiOperation(value = "Connectors基本信息", notes = "")
@GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/basic-combine-exist")
@ResponseBody
public Result<ConnectorBasicCombineExistVO> getConnectorBasicCombineExist(@PathVariable Long connectClusterId, @PathVariable String connectorName) {
return Result.buildSuc(
ConnectConverter.convert2BasicVO(
connectClusterService.getById(connectClusterId),
connectorService.getConnectorFromDB(connectClusterId, connectorName)
)
);
}
@ApiOperation(value = "Connector配置", notes = "")
@GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/config")
@ResponseBody
public Result<Properties> getConnectorConfig(@PathVariable Long connectClusterId, @PathVariable String connectorName) {
Result<KSConnectorInfo> connectorResult = connectorService.getConnectorInfoFromCluster(connectClusterId, connectorName);
if (connectorResult.failed()) {
return Result.buildFromIgnoreData(connectorResult);
}
Properties props = new Properties();
props.putAll(connectorResult.getData().getConfig());
return Result.buildSuc(props);
}
@ApiOperation(value = "获取Connector的Task列表", notes = "")
@GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/tasks")
@ResponseBody
public Result<List<KCTaskOverviewVO>> getConnectorTasks(@PathVariable Long connectClusterId, @PathVariable String connectorName) {
return workerConnectorManager.getTaskOverview(connectClusterId, connectorName);
}
@ApiOperation(value = "Connector近期指标")
@PostMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/latest-metrics")
@ResponseBody
public Result<ConnectorMetrics> getConnectorLatestMetrics(@PathVariable Long connectClusterId,
@PathVariable String connectorName,
@RequestBody List<String> metricsNames) {
return connectorMetricService.getLatestMetricsFromES(connectClusterId, connectorName, metricsNames);
}
@ApiOperation(value = "获取Connector的状态", notes = "")
@GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/state")
@ResponseBody
public Result<ConnectorStateVO> getConnectorStateVO(@PathVariable Long connectClusterId, @PathVariable String connectorName) {
return connectorManager.getConnectorStateVO(connectClusterId, connectorName);
}
}

View File

@@ -0,0 +1,33 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.connect;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.task.TaskActionDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
/**
* @author zengqiao
* @date 22/10/17
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Task-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX)
public class KafkaTaskController {
@Autowired
private WorkerConnectorService workerConnectorService;
@ApiOperation(value = "操作Task", notes = "")
@PutMapping(value ="tasks")
@ResponseBody
public Result<Void> actionTask(@Validated @RequestBody TaskActionDTO dto) {
return workerConnectorService.actionTask(dto);
}
}