diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterConnectsController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterConnectsController.java new file mode 100644 index 00000000..e7d93af8 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterConnectsController.java @@ -0,0 +1,133 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster; + +import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterConnectorsManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterConnectorsOverviewDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.connect.MetricsConnectClustersDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.connect.MetricsConnectorsDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectClusterBasicCombineExistVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectClusterBasicVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterWorkerOverviewVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterMetricService; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorMetricService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * @author zengqiao + * @date 22/10/27 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "集群Connects-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_PREFIX) // 这里使用 API_V3_PREFIX 没有使用 API_V3_CONNECT_PREFIX 的原因是这个接口在Kafka集群页面下 +public class ClusterConnectsController { + @Autowired + private ConnectorService connectorService; + + @Autowired + private ConnectorMetricService connectorMetricService; + + @Autowired + private ConnectClusterService connectClusterService; + + @Autowired + private ConnectClusterMetricService connectClusterMetricService; + + @Autowired + private WorkerService workerService; + + @Autowired + private ClusterConnectorsManager clusterConnectorsManager; + + + /**************************************************** connect method ****************************************************/ + + @ApiOperation(value = "Connect集群基本信息", notes = "") + @GetMapping(value = "kafka-clusters/{clusterPhyId}/connect-clusters/{connectClusterName}/basic-combine-exist") + @ResponseBody + public Result getBasicCombineExist(@PathVariable Long clusterPhyId, + @PathVariable String connectClusterName) { + return Result.buildSuc(ConnectConverter.convert2ConnectClusterBasicCombineExistVO( + connectClusterService.getByName(clusterPhyId, connectClusterName)) + ); + } + + @ApiOperation(value = "Connect集群基本信息列表", notes = "") + @GetMapping(value = "kafka-clusters/{clusterPhyId}/connect-clusters-basic") + @ResponseBody + public Result> getClusterConnectClustersBasic(@PathVariable Long clusterPhyId) { + return Result.buildSuc(ConvertUtil.list2List(connectClusterService.listByKafkaCluster(clusterPhyId), ConnectClusterBasicVO.class)); + } + + @ApiOperation(value = "Connect集群指标信息") + @PostMapping(value = "kafka-clusters/{clusterPhyId}/connect-cluster-metrics") + @ResponseBody + public Result> getConnectClusterMetrics(@PathVariable Long clusterPhyId, + @Validated @RequestBody MetricsConnectClustersDTO dto) { + return connectClusterMetricService.listConnectClusterMetricsFromES(clusterPhyId, dto); + } + + @ApiOperation(value = "集群Connectors状态", notes = "") + @GetMapping(value = "kafka-clusters/{clusterPhyId}/connect-state") + @ResponseBody + public Result getClusterConnectorsState(@PathVariable Long clusterPhyId) { + return Result.buildSuc(clusterConnectorsManager.getClusterConnectorsState(clusterPhyId)); + } + + + /**************************************************** connector method ****************************************************/ + + @ApiOperation(value = "Connectors基本信息", notes = "") + @GetMapping(value = "clusters/{clusterPhyId}/connectors-basic") + @ResponseBody + public Result> getClusterConnectorsBasic(@PathVariable Long clusterPhyId) { + return Result.buildSuc( + ConnectConverter.convert2BasicVOList( + connectClusterService.listByKafkaCluster(clusterPhyId), + connectorService.listByKafkaClusterIdFromDB(clusterPhyId) + ) + ); + } + + @ApiOperation(value = "Connectors概览列表", notes = "") + @PostMapping(value = "clusters/{clusterPhyId}/connectors-overview") + @ResponseBody + public PaginationResult getClusterConnectorsOverview(@PathVariable Long clusterPhyId, + @Validated @RequestBody ClusterConnectorsOverviewDTO dto) { + return clusterConnectorsManager.getClusterConnectorsOverview(clusterPhyId, dto); + } + + @ApiOperation(value = "集群Connectors指标信息") + @PostMapping(value = "clusters/{clusterPhyId}/connectors-metrics") + @ResponseBody + public Result> getClusterPhyMetrics(@PathVariable Long clusterPhyId, + @Validated @RequestBody MetricsConnectorsDTO dto) { + return connectorMetricService.listConnectClusterMetricsFromES(clusterPhyId, dto); + } + + /**************************************************** connector method ****************************************************/ + @ApiOperation(value = "worker概览列表", notes = "") + @GetMapping(value = "clusters/{clusterPhyId}/workers-overview") + @ResponseBody + public PaginationResult getClusterWorkersOverview(@PathVariable Long clusterPhyId, PaginationBaseDTO dto) { + return workerService.pageWorkByKafkaClusterPhy(clusterPhyId, dto); + } +} diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectClusterController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectClusterController.java new file mode 100644 index 00000000..33d5466e --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectClusterController.java @@ -0,0 +1,42 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.connect; + + +import com.didiglobal.logi.security.util.HttpRequestUtil; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.cluster.ConnectClusterDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * @author zengqiao + * @date 22/10/17 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Cluster-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX) +public class KafkaConnectClusterController { + @Autowired + private ConnectClusterService connectClusterService; + + @ApiOperation(value = "删除Connect集群") + @DeleteMapping(value = "connect-clusters") + @ResponseBody + public Result deleteConnectCluster(@RequestParam("connectClusterId") Long connectClusterId) { + return connectClusterService.deleteInDB(connectClusterId, HttpRequestUtil.getOperator()); + } + + @ApiOperation(value = "修改Connect集群", notes = "") + @PutMapping(value = "batch-connect-clusters") + @ResponseBody + public Result batchModifyConnectCluster(@Validated @RequestBody List dtoList) { + return connectClusterService.batchModifyInDB(dtoList, HttpRequestUtil.getOperator()); + } +} diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectPluginController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectPluginController.java new file mode 100644 index 00000000..d8e19db8 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectPluginController.java @@ -0,0 +1,56 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.connect; + + +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.plugin.ConnectPluginBasic; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectConfigInfosVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectPluginBasicVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * @author zengqiao + * @date 22/10/17 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Plugin-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX) +public class KafkaConnectPluginController { + + @Autowired + private PluginService pluginService; + + @ApiOperation(value = "Connect集群插件", notes = "") + @GetMapping(value = "clusters/{connectClusterId}/connector-plugins") + @ResponseBody + public Result> getConnectorPlugins(@PathVariable Long connectClusterId) { + Result> listResult = pluginService.listPluginsFromCluster(connectClusterId); + if (listResult.failed()) { + return Result.buildFromIgnoreData(listResult); + } + + listResult.getData().forEach(elem -> elem.setHelpDocLink("https://www.confluent.io/hub/")); + return Result.buildSuc(ConvertUtil.list2List(listResult.getData(), ConnectPluginBasicVO.class)); + } + + @ApiOperation(value = "Connect插件配置", notes = "") + @GetMapping(value = "clusters/{connectClusterId}/connector-plugins/{pluginName}/config") + @ResponseBody + public Result getPluginConfig(@PathVariable Long connectClusterId, @PathVariable String pluginName) { + Result infosResult = pluginService.getConfig(connectClusterId, pluginName); + if (infosResult.failed()) { + return Result.buildFromIgnoreData(infosResult); + } + + return Result.buildSuc(ConvertUtil.obj2Obj(infosResult.getData(), ConnectConfigInfosVO.class)); + } +} diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java new file mode 100644 index 00000000..8e5e7237 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java @@ -0,0 +1,91 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.connect; + + +import com.didiglobal.logi.security.util.HttpRequestUtil; +import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorActionDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorDeleteDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorConfigModifyDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectConfigInfosVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectActionEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +/** + * @author zengqiao + * @date 22/10/17 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Connector自身-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX) +public class KafkaConnectorController { + + @Autowired + private ConnectorService connectorService; + + @Autowired + private ConnectorManager connectorManager; + + @Autowired + private PluginService pluginService; + + @ApiOperation(value = "创建Connector", notes = "") + @PostMapping(value = "connectors") + @ResponseBody + public Result createConnector(@Validated @RequestBody ConnectorCreateDTO dto) { + return connectorManager.createConnector(dto, HttpRequestUtil.getOperator()); + } + + @ApiOperation(value = "删除Connector", notes = "") + @DeleteMapping(value ="connectors") + @ResponseBody + public Result deleteConnectors(@Validated @RequestBody ConnectorDeleteDTO dto) { + return connectorService.deleteConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + } + + @ApiOperation(value = "操作Connector", notes = "") + @PutMapping(value ="connectors") + @ResponseBody + public Result operateConnectors(@Validated @RequestBody ConnectorActionDTO dto) { + if (ConnectActionEnum.RESTART.getValue().equals(dto.getAction())) { + return connectorService.restartConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + } else if (ConnectActionEnum.STOP.getValue().equals(dto.getAction())) { + return connectorService.stopConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + } else if (ConnectActionEnum.RESUME.getValue().equals(dto.getAction())) { + return connectorService.resumeConnector(dto.getConnectClusterId(), dto.getConnectorName(), HttpRequestUtil.getOperator()); + } + + return Result.buildFailure(ResultStatus.PARAM_ILLEGAL); + } + + @ApiOperation(value = "修改Connector配置", notes = "") + @PutMapping(value ="connectors-config") + @ResponseBody + public Result modifyConnectors(@Validated @RequestBody ConnectorConfigModifyDTO dto) { + return connectorManager.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), HttpRequestUtil.getOperator()); + } + + @ApiOperation(value = "校验Connector配置", notes = "") + @PutMapping(value ="connectors-config/validate") + @ResponseBody + public Result validateConnectors(@Validated @RequestBody ConnectorConfigModifyDTO dto) { + Result infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs()); + if (infoResult.failed()) { + return Result.buildFromIgnoreData(infoResult); + } + + return Result.buildSuc(ConvertUtil.obj2Obj(infoResult.getData(), ConnectConfigInfosVO.class)); + } +} diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorStateController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorStateController.java new file mode 100644 index 00000000..bd1513e3 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorStateController.java @@ -0,0 +1,98 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.connect; + +import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager; +import com.xiaojukeji.know.streaming.km.biz.connect.connector.WorkerConnectorManager; +import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicCombineExistVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter; +import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorMetricService; +import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Properties; + +/** + * @author zengqiao + * @date 22/10/17 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Connector状态-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX) +public class KafkaConnectorStateController { + + @Autowired + private ConnectorService connectorService; + + @Autowired + private ConnectorMetricService connectorMetricService; + + @Autowired + private WorkerConnectorManager workerConnectorManager; + + @Autowired + private ConnectorManager connectorManager; + + @Autowired + private ConnectClusterService connectClusterService; + + @ApiOperation(value = "Connectors基本信息", notes = "") + @GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/basic-combine-exist") + @ResponseBody + public Result getConnectorBasicCombineExist(@PathVariable Long connectClusterId, @PathVariable String connectorName) { + return Result.buildSuc( + ConnectConverter.convert2BasicVO( + connectClusterService.getById(connectClusterId), + connectorService.getConnectorFromDB(connectClusterId, connectorName) + ) + ); + } + + @ApiOperation(value = "Connector配置", notes = "") + @GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/config") + @ResponseBody + public Result getConnectorConfig(@PathVariable Long connectClusterId, @PathVariable String connectorName) { + Result connectorResult = connectorService.getConnectorInfoFromCluster(connectClusterId, connectorName); + if (connectorResult.failed()) { + return Result.buildFromIgnoreData(connectorResult); + } + + Properties props = new Properties(); + props.putAll(connectorResult.getData().getConfig()); + return Result.buildSuc(props); + } + + @ApiOperation(value = "获取Connector的Task列表", notes = "") + @GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/tasks") + @ResponseBody + public Result> getConnectorTasks(@PathVariable Long connectClusterId, @PathVariable String connectorName) { + return workerConnectorManager.getTaskOverview(connectClusterId, connectorName); + } + + @ApiOperation(value = "Connector近期指标") + @PostMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/latest-metrics") + @ResponseBody + public Result getConnectorLatestMetrics(@PathVariable Long connectClusterId, + @PathVariable String connectorName, + @RequestBody List metricsNames) { + return connectorMetricService.getLatestMetricsFromES(connectClusterId, connectorName, metricsNames); + } + + @ApiOperation(value = "获取Connector的状态", notes = "") + @GetMapping(value = "clusters/{connectClusterId}/connectors/{connectorName}/state") + @ResponseBody + public Result getConnectorStateVO(@PathVariable Long connectClusterId, @PathVariable String connectorName) { + return connectorManager.getConnectorStateVO(connectClusterId, connectorName); + } + +} diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaTaskController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaTaskController.java new file mode 100644 index 00000000..3b01c5c6 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaTaskController.java @@ -0,0 +1,33 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.connect; + + +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.task.TaskActionDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +/** + * @author zengqiao + * @date 22/10/17 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Connect-Task-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_CONNECT_PREFIX) +public class KafkaTaskController { + + @Autowired + private WorkerConnectorService workerConnectorService; + + @ApiOperation(value = "操作Task", notes = "") + @PutMapping(value ="tasks") + @ResponseBody + public Result actionTask(@Validated @RequestBody TaskActionDTO dto) { + return workerConnectorService.actionTask(dto); + } +}