[Feature]MM2管理-MM2管理相关接口类(#894)

This commit is contained in:
zengqiao
2023-02-09 16:54:31 +08:00
committed by EricZeng
parent 78b2b8a45e
commit ab6a4d7099
8 changed files with 252 additions and 25 deletions

View File

@@ -1,21 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotNull;
import java.util.Properties;
/**
* @author zengqiao
* @date 2022-10-17
*/
@Data
@ApiModel(description = "修改Connector配置")
public class ConnectorConfigModifyDTO extends ClusterConnectorDTO {
@NotNull(message = "configs不允许为空")
@ApiModelProperty(value = "配置", example = "")
private Properties configs;
}

View File

@@ -1,7 +1,6 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect; package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;

View File

@@ -14,6 +14,10 @@ public class ApiPrefix {
public static final String API_V3_CONNECT_PREFIX = API_V3_PREFIX + "kafka-connect/"; public static final String API_V3_CONNECT_PREFIX = API_V3_PREFIX + "kafka-connect/";
public static final String API_V3_MM2_PREFIX = API_V3_PREFIX + "kafka-mm2/";
public static final String API_V3_HA_MIRROR_PREFIX = API_V3_PREFIX + "ha-mirror/";
public static final String API_V3_OPEN_PREFIX = API_V3_PREFIX + "open/"; public static final String API_V3_OPEN_PREFIX = API_V3_PREFIX + "open/";
private ApiPrefix() { private ApiPrefix() {

View File

@@ -10,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectCl
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicCombineExistVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicCombineExistVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerBasicVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
@@ -58,6 +59,25 @@ public class ConnectConverter {
return voList; return voList;
} }
public static List<MirrorMakerBasicVO> convert2MirrorMakerBasicVOList(
List<ConnectCluster> clusterList,
List<ConnectorPO> poList) {
Map<Long, ConnectCluster> clusterMap = new HashMap<>();
clusterList.stream().forEach(elem -> clusterMap.put(elem.getId(), elem));
List<MirrorMakerBasicVO> voList = new ArrayList<>();
poList.stream().filter(item -> clusterMap.containsKey(item.getConnectClusterId())).forEach(elem -> {
MirrorMakerBasicVO vo = new MirrorMakerBasicVO();
vo.setConnectClusterId(elem.getConnectClusterId());
vo.setConnectClusterName(clusterMap.get(elem.getConnectClusterId()).getName());
vo.setConnectorName(elem.getConnectorName());
voList.add(vo);
});
return voList;
}
public static ConnectClusterBasicCombineExistVO convert2ConnectClusterBasicCombineExistVO(ConnectCluster connectCluster) { public static ConnectClusterBasicCombineExistVO convert2ConnectClusterBasicCombineExistVO(ConnectCluster connectCluster) {
if (connectCluster == null) { if (connectCluster == null) {
ConnectClusterBasicCombineExistVO combineExistVO = new ConnectClusterBasicCombineExistVO(); ConnectClusterBasicCombineExistVO combineExistVO = new ConnectClusterBasicCombineExistVO();

View File

@@ -0,0 +1,82 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster;
import com.xiaojukeji.know.streaming.km.biz.connect.mm2.MirrorMakerManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterMirrorMakersOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.mm2.MetricsMirrorMakersDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.ClusterMirrorMakerOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerBasicVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerStateVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author zengqiao
* @date 22/12/12
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "集群MM2s-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_PREFIX) // 这里使用 API_V3_PREFIX 没有使用 API_V3_CONNECT_PREFIX 的原因是这个接口在Kafka集群页面下
public class ClusterMirrorMakersController {
@Autowired
private MirrorMakerMetricService mirrorMakerMetricService;
@Autowired
private MirrorMakerManager mirrorMakerManager;
@Autowired
private ConnectClusterService connectClusterService;
@Autowired
private ConnectorService connectorService;
@ApiOperation(value = "集群MM2状态", notes = "")
@GetMapping(value = "kafka-clusters/{clusterPhyId}/mirror-makers-state")
@ResponseBody
public Result<MirrorMakerStateVO> getClusterMM2State(@PathVariable Long clusterPhyId) {
return mirrorMakerManager.getMirrorMakerStateVO(clusterPhyId);
}
@ApiOperation(value = "集群MM2基本信息", notes = "")
@GetMapping(value = "clusters/{clusterPhyId}/mirror-makers-basic")
@ResponseBody
public Result<List<MirrorMakerBasicVO>> getClusterMirrorMakersBasic(@PathVariable Long clusterPhyId) {
return Result.buildSuc(
ConnectConverter.convert2MirrorMakerBasicVOList(
connectClusterService.listByKafkaCluster(clusterPhyId),
connectorService.listByKafkaClusterIdFromDB(clusterPhyId).stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList())
)
);
}
@ApiOperation(value = "集群MM2概览列表", notes = "")
@PostMapping(value = "clusters/{clusterPhyId}/mirror-makers-overview")
@ResponseBody
public PaginationResult<ClusterMirrorMakerOverviewVO> getClusterMirrorMakersOverview(@PathVariable Long clusterPhyId,
@Validated @RequestBody ClusterMirrorMakersOverviewDTO dto) {
return mirrorMakerManager.getClusterMirrorMakersOverview(clusterPhyId,dto);
}
@ApiOperation(value = "集群MM2指标信息")
@PostMapping(value = "clusters/{clusterPhyId}/mirror-makers-metrics")
@ResponseBody
public Result<List<MetricMultiLinesVO>> getClusterMirrorMakersMetrics(@PathVariable Long clusterPhyId,
@Validated @RequestBody MetricsMirrorMakersDTO dto) {
return mirrorMakerMetricService.listMirrorMakerClusterMetricsFromES(clusterPhyId, dto);
}
}

View File

@@ -6,7 +6,6 @@ import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorActionDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorActionDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorDeleteDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorConfigModifyDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
@@ -73,14 +72,14 @@ public class KafkaConnectorController {
@ApiOperation(value = "修改Connector配置", notes = "") @ApiOperation(value = "修改Connector配置", notes = "")
@PutMapping(value ="connectors-config") @PutMapping(value ="connectors-config")
@ResponseBody @ResponseBody
public Result<Void> modifyConnectors(@Validated @RequestBody ConnectorConfigModifyDTO dto) { public Result<Void> modifyConnectors(@Validated @RequestBody ConnectorCreateDTO dto) {
return connectorManager.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), HttpRequestUtil.getOperator()); return connectorManager.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), HttpRequestUtil.getOperator());
} }
@ApiOperation(value = "校验Connector配置", notes = "") @ApiOperation(value = "校验Connector配置", notes = "")
@PutMapping(value ="connectors-config/validate") @PutMapping(value ="connectors-config/validate")
@ResponseBody @ResponseBody
public Result<ConnectConfigInfosVO> validateConnectors(@Validated @RequestBody ConnectorConfigModifyDTO dto) { public Result<ConnectConfigInfosVO> validateConnectors(@Validated @RequestBody ConnectorCreateDTO dto) {
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs()); Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs());
if (infoResult.failed()) { if (infoResult.failed()) {
return Result.buildFromIgnoreData(infoResult); return Result.buildFromIgnoreData(infoResult);

View File

@@ -0,0 +1,76 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.connect.mm2;
import com.didiglobal.logi.security.util.HttpRequestUtil;
import com.xiaojukeji.know.streaming.km.biz.connect.mm2.MirrorMakerManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMaker2ActionDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMakerCreateDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMaker2DeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectConfigInfosVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectActionEnum;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author zengqiao
* @date 22/12/12
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "MM2-MM2自身-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_MM2_PREFIX)
public class KafkaMirrorMakerController {
@Autowired
private MirrorMakerManager mirrorMakerManager;
@ApiOperation(value = "创建MM2", notes = "")
@PostMapping(value = "mirror-makers")
@ResponseBody
public Result<Void> createMM2(@Validated @RequestBody MirrorMakerCreateDTO dto) {
return mirrorMakerManager.createMirrorMaker(dto, HttpRequestUtil.getOperator());
}
@ApiOperation(value = "删除MM2", notes = "")
@DeleteMapping(value ="mirror-makers")
@ResponseBody
public Result<Void> deleteMM2(@Validated @RequestBody MirrorMaker2DeleteDTO dto) {
return mirrorMakerManager.deleteMirrorMaker(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator());
}
@ApiOperation(value = "操作MM2", notes = "")
@PutMapping(value ="mirror-makers")
@ResponseBody
public Result<Void> operateMM2s(@Validated @RequestBody MirrorMaker2ActionDTO dto) {
if (ConnectActionEnum.RESTART.getValue().equals(dto.getAction())) {
return mirrorMakerManager.restartMirrorMaker(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator());
} else if (ConnectActionEnum.STOP.getValue().equals(dto.getAction())) {
return mirrorMakerManager.stopMirrorMaker(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator());
} else if (ConnectActionEnum.RESUME.getValue().equals(dto.getAction())) {
return mirrorMakerManager.resumeMirrorMaker(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator());
}
return Result.buildFailure(ResultStatus.PARAM_ILLEGAL);
}
@ApiOperation(value = "MM2配置修改", notes = "")
@PutMapping(value ="mirror-makers-config")
@ResponseBody
public Result<Void> modifyMM2s(@Validated @RequestBody MirrorMakerCreateDTO dto) {
return mirrorMakerManager.modifyMirrorMakerConfig(dto, HttpRequestUtil.getOperator());
}
@ApiOperation(value = "校验MM2配置", notes = "")
@PutMapping(value ="mirror-makers-config/validate")
@ResponseBody
public Result<List<ConnectConfigInfosVO>> validateConnectors(@Validated @RequestBody MirrorMakerCreateDTO dto) {
return mirrorMakerManager.validateConnectors(dto);
}
}

View File

@@ -0,0 +1,68 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.connect.mm2;
import com.didiglobal.logi.security.util.HttpRequestUtil;
import com.xiaojukeji.know.streaming.km.biz.connect.mm2.MirrorMakerManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMakerCreateDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerBaseStateVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* @author zengqiao
* @date 22/12/12
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "MM2-MM2状态-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_MM2_PREFIX)
public class KafkaMirrorMakerStateController {
@Autowired
private MirrorMakerManager mirrorMakerManager;
@Autowired
private MirrorMakerMetricService mirrorMakerMetricService;
@ApiOperation(value = "获取mm2任务的状态", notes = "")
@GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/state")
@ResponseBody
public Result<MirrorMakerBaseStateVO> getMirrorMakerStateVO(@PathVariable Long connectClusterId, @PathVariable String connectorName) {
return mirrorMakerManager.getMirrorMakerState(connectClusterId, connectorName);
}
@ApiOperation(value = "获取MM2的Task列表", notes = "")
@GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/tasks")
@ResponseBody
public Result<Map<String, List<KCTaskOverviewVO>>> getConnectorTasks(@PathVariable Long connectClusterId, @PathVariable String connectorName) {
return mirrorMakerManager.getTaskOverview(connectClusterId, connectorName);
}
@ApiOperation(value = "MM2配置", notes = "")
@GetMapping(value ="clusters/{connectClusterId}/connectors/{connectorName}/config")
@ResponseBody
public Result<List<Properties>> getMM2Configs(@PathVariable Long connectClusterId, @PathVariable String connectorName) {
return mirrorMakerManager.getMM2Configs(connectClusterId, connectorName);
}
@ApiOperation(value = "Connector近期指标")
@PostMapping(value = "clusters/{connectClusterId}/connectors/{mirrorMakerName}/latest-metrics")
@ResponseBody
public Result<MirrorMakerMetrics> getMirrorMakerLatestMetrics(@PathVariable Long connectClusterId,
@PathVariable String mirrorMakerName,
@RequestBody List<String> metricsNames) {
return mirrorMakerMetricService.getLatestMetricsFromES(connectClusterId, mirrorMakerName, metricsNames);
}
}