diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorConfigModifyDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorConfigModifyDTO.java deleted file mode 100644 index 40f617c8..00000000 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorConfigModifyDTO.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector; - -import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO; -import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; -import lombok.Data; - -import javax.validation.constraints.NotNull; -import java.util.Properties; - -/** - * @author zengqiao - * @date 2022-10-17 - */ -@Data -@ApiModel(description = "修改Connector配置") -public class ConnectorConfigModifyDTO extends ClusterConnectorDTO { - @NotNull(message = "configs不允许为空") - @ApiModelProperty(value = "配置", example = "") - private Properties configs; -} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorTaskMetrics.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorTaskMetrics.java index 26b244e7..eb0dc42d 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorTaskMetrics.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/connect/ConnectorTaskMetrics.java @@ -1,7 +1,6 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ApiPrefix.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ApiPrefix.java index 332f639c..c7c1803b 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ApiPrefix.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ApiPrefix.java @@ -14,6 +14,10 @@ public class ApiPrefix { public static final String API_V3_CONNECT_PREFIX = API_V3_PREFIX + "kafka-connect/"; + public static final String API_V3_MM2_PREFIX = API_V3_PREFIX + "kafka-mm2/"; + + public static final String API_V3_HA_MIRROR_PREFIX = API_V3_PREFIX + "ha-mirror/"; + public static final String API_V3_OPEN_PREFIX = API_V3_PREFIX + "open/"; private ApiPrefix() { diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ConnectConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ConnectConverter.java index 387c8469..6dcc30e4 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ConnectConverter.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ConnectConverter.java @@ -10,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectCl import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicCombineExistVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerBasicVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; @@ -58,6 +59,25 @@ public class ConnectConverter { return voList; } + public static List convert2MirrorMakerBasicVOList( + List clusterList, + List poList) { + Map clusterMap = new HashMap<>(); + clusterList.stream().forEach(elem -> clusterMap.put(elem.getId(), elem)); + + List voList = new ArrayList<>(); + poList.stream().filter(item -> clusterMap.containsKey(item.getConnectClusterId())).forEach(elem -> { + MirrorMakerBasicVO vo = new MirrorMakerBasicVO(); + vo.setConnectClusterId(elem.getConnectClusterId()); + vo.setConnectClusterName(clusterMap.get(elem.getConnectClusterId()).getName()); + vo.setConnectorName(elem.getConnectorName()); + + voList.add(vo); + }); + + return voList; + } + public static ConnectClusterBasicCombineExistVO convert2ConnectClusterBasicCombineExistVO(ConnectCluster connectCluster) { if (connectCluster == null) { ConnectClusterBasicCombineExistVO combineExistVO = new ConnectClusterBasicCombineExistVO(); diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterMirrorMakersController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterMirrorMakersController.java new file mode 100644 index 00000000..e74b7a99 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterMirrorMakersController.java @@ -0,0 +1,82 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster; + +import com.xiaojukeji.know.streaming.km.biz.connect.mm2.MirrorMakerManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterMirrorMakersOverviewDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.mm2.MetricsMirrorMakersDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.ClusterMirrorMakerOverviewVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerBasicVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; +import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author zengqiao + * @date 22/12/12 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "集群MM2s-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_PREFIX) // 这里使用 API_V3_PREFIX 没有使用 API_V3_CONNECT_PREFIX 的原因是这个接口在Kafka集群页面下 +public class ClusterMirrorMakersController { + @Autowired + private MirrorMakerMetricService mirrorMakerMetricService; + + @Autowired + private MirrorMakerManager mirrorMakerManager; + + @Autowired + private ConnectClusterService connectClusterService; + + @Autowired + private ConnectorService connectorService; + + @ApiOperation(value = "集群MM2状态", notes = "") + @GetMapping(value = "kafka-clusters/{clusterPhyId}/mirror-makers-state") + @ResponseBody + public Result getClusterMM2State(@PathVariable Long clusterPhyId) { + return mirrorMakerManager.getMirrorMakerStateVO(clusterPhyId); + } + + @ApiOperation(value = "集群MM2基本信息", notes = "") + @GetMapping(value = "clusters/{clusterPhyId}/mirror-makers-basic") + @ResponseBody + public Result> getClusterMirrorMakersBasic(@PathVariable Long clusterPhyId) { + return Result.buildSuc( + ConnectConverter.convert2MirrorMakerBasicVOList( + connectClusterService.listByKafkaCluster(clusterPhyId), + connectorService.listByKafkaClusterIdFromDB(clusterPhyId).stream().filter(elem -> elem.getConnectorClassName().equals(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE)).collect(Collectors.toList()) + ) + ); + } + + @ApiOperation(value = "集群MM2概览列表", notes = "") + @PostMapping(value = "clusters/{clusterPhyId}/mirror-makers-overview") + @ResponseBody + public PaginationResult getClusterMirrorMakersOverview(@PathVariable Long clusterPhyId, + @Validated @RequestBody ClusterMirrorMakersOverviewDTO dto) { + return mirrorMakerManager.getClusterMirrorMakersOverview(clusterPhyId,dto); + } + + @ApiOperation(value = "集群MM2指标信息") + @PostMapping(value = "clusters/{clusterPhyId}/mirror-makers-metrics") + @ResponseBody + public Result> getClusterMirrorMakersMetrics(@PathVariable Long clusterPhyId, + @Validated @RequestBody MetricsMirrorMakersDTO dto) { + return mirrorMakerMetricService.listMirrorMakerClusterMetricsFromES(clusterPhyId, dto); + } +} diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java index 8e5e7237..d60314bb 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java @@ -6,7 +6,6 @@ import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorActionDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorDeleteDTO; -import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorConfigModifyDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; @@ -73,14 +72,14 @@ public class KafkaConnectorController { @ApiOperation(value = "修改Connector配置", notes = "") @PutMapping(value ="connectors-config") @ResponseBody - public Result modifyConnectors(@Validated @RequestBody ConnectorConfigModifyDTO dto) { + public Result modifyConnectors(@Validated @RequestBody ConnectorCreateDTO dto) { return connectorManager.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), HttpRequestUtil.getOperator()); } @ApiOperation(value = "校验Connector配置", notes = "") @PutMapping(value ="connectors-config/validate") @ResponseBody - public Result validateConnectors(@Validated @RequestBody ConnectorConfigModifyDTO dto) { + public Result validateConnectors(@Validated @RequestBody ConnectorCreateDTO dto) { Result infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs()); if (infoResult.failed()) { return Result.buildFromIgnoreData(infoResult); diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/mm2/KafkaMirrorMakerController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/mm2/KafkaMirrorMakerController.java new file mode 100644 index 00000000..fdc1136d --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/mm2/KafkaMirrorMakerController.java @@ -0,0 +1,76 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.connect.mm2; + +import com.didiglobal.logi.security.util.HttpRequestUtil; +import com.xiaojukeji.know.streaming.km.biz.connect.mm2.MirrorMakerManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMaker2ActionDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMakerCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMaker2DeleteDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectConfigInfosVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectActionEnum; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + + +/** + * @author zengqiao + * @date 22/12/12 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "MM2-MM2自身-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_MM2_PREFIX) +public class KafkaMirrorMakerController { + @Autowired + private MirrorMakerManager mirrorMakerManager; + + @ApiOperation(value = "创建MM2", notes = "") + @PostMapping(value = "mirror-makers") + @ResponseBody + public Result createMM2(@Validated @RequestBody MirrorMakerCreateDTO dto) { + return mirrorMakerManager.createMirrorMaker(dto, HttpRequestUtil.getOperator()); + } + + @ApiOperation(value = "删除MM2", notes = "") + @DeleteMapping(value ="mirror-makers") + @ResponseBody + public Result deleteMM2(@Validated @RequestBody MirrorMaker2DeleteDTO dto) { + return mirrorMakerManager.deleteMirrorMaker(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + } + + @ApiOperation(value = "操作MM2", notes = "") + @PutMapping(value ="mirror-makers") + @ResponseBody + public Result operateMM2s(@Validated @RequestBody MirrorMaker2ActionDTO dto) { + if (ConnectActionEnum.RESTART.getValue().equals(dto.getAction())) { + return mirrorMakerManager.restartMirrorMaker(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + } else if (ConnectActionEnum.STOP.getValue().equals(dto.getAction())) { + return mirrorMakerManager.stopMirrorMaker(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + } else if (ConnectActionEnum.RESUME.getValue().equals(dto.getAction())) { + return mirrorMakerManager.resumeMirrorMaker(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + } + + return Result.buildFailure(ResultStatus.PARAM_ILLEGAL); + } + + @ApiOperation(value = "MM2配置修改", notes = "") + @PutMapping(value ="mirror-makers-config") + @ResponseBody + public Result modifyMM2s(@Validated @RequestBody MirrorMakerCreateDTO dto) { + return mirrorMakerManager.modifyMirrorMakerConfig(dto, HttpRequestUtil.getOperator()); + } + + @ApiOperation(value = "校验MM2配置", notes = "") + @PutMapping(value ="mirror-makers-config/validate") + @ResponseBody + public Result> validateConnectors(@Validated @RequestBody MirrorMakerCreateDTO dto) { + return mirrorMakerManager.validateConnectors(dto); + } +} diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/mm2/KafkaMirrorMakerStateController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/mm2/KafkaMirrorMakerStateController.java new file mode 100644 index 00000000..2cfad408 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/mm2/KafkaMirrorMakerStateController.java @@ -0,0 +1,68 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.connect.mm2; + +import com.didiglobal.logi.security.util.HttpRequestUtil; +import com.xiaojukeji.know.streaming.km.biz.connect.mm2.MirrorMakerManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMakerCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerBaseStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + + +/** + * @author zengqiao + * @date 22/12/12 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "MM2-MM2状态-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_MM2_PREFIX) +public class KafkaMirrorMakerStateController { + @Autowired + private MirrorMakerManager mirrorMakerManager; + + @Autowired + private MirrorMakerMetricService mirrorMakerMetricService; + + @ApiOperation(value = "获取mm2任务的状态", notes = "") + @GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/state") + @ResponseBody + public Result getMirrorMakerStateVO(@PathVariable Long connectClusterId, @PathVariable String connectorName) { + return mirrorMakerManager.getMirrorMakerState(connectClusterId, connectorName); + } + + @ApiOperation(value = "获取MM2的Task列表", notes = "") + @GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/tasks") + @ResponseBody + public Result>> getConnectorTasks(@PathVariable Long connectClusterId, @PathVariable String connectorName) { + return mirrorMakerManager.getTaskOverview(connectClusterId, connectorName); + } + + @ApiOperation(value = "MM2配置", notes = "") + @GetMapping(value ="clusters/{connectClusterId}/connectors/{connectorName}/config") + @ResponseBody + public Result> getMM2Configs(@PathVariable Long connectClusterId, @PathVariable String connectorName) { + return mirrorMakerManager.getMM2Configs(connectClusterId, connectorName); + } + + @ApiOperation(value = "Connector近期指标") + @PostMapping(value = "clusters/{connectClusterId}/connectors/{mirrorMakerName}/latest-metrics") + @ResponseBody + public Result getMirrorMakerLatestMetrics(@PathVariable Long connectClusterId, + @PathVariable String mirrorMakerName, + @RequestBody List metricsNames) { + return mirrorMakerMetricService.getLatestMetricsFromES(connectClusterId, mirrorMakerName, metricsNames); + } + +}