mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-03 19:38:20 +08:00
split op util controller to topic controller and leader controller, and add authority controller, quota controller
This commit is contained in:
@@ -7,46 +7,41 @@ import io.swagger.annotations.ApiModelProperty;
|
|||||||
|
|
||||||
@ApiModel(description = "配额调整")
|
@ApiModel(description = "配额调整")
|
||||||
public class TopicQuotaDTO extends ClusterTopicDTO {
|
public class TopicQuotaDTO extends ClusterTopicDTO {
|
||||||
|
@ApiModelProperty(value = "appId")
|
||||||
|
private String appId;
|
||||||
|
|
||||||
@ApiModelProperty(value = "appId")
|
@ApiModelProperty(value = "发送数据速率B/s")
|
||||||
private String appId;
|
private Long produceQuota;
|
||||||
|
|
||||||
@ApiModelProperty(value = "发送数据速率B/s")
|
@ApiModelProperty(value = "消费数据速率B/s")
|
||||||
private Long produceQuota;
|
private Long consumeQuota;
|
||||||
|
|
||||||
@ApiModelProperty(value = "消费数据速率B/s")
|
public String getAppId() {
|
||||||
private Long consumeQuota;
|
|
||||||
|
|
||||||
public String getAppId() {
|
|
||||||
return appId;
|
return appId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setAppId(String appId) {
|
public void setAppId(String appId) {
|
||||||
this.appId = appId;
|
this.appId = appId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Long getProduceQuota() {
|
public Long getProduceQuota() {
|
||||||
return produceQuota;
|
return produceQuota;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setProduceQuota(Long produceQuota) {
|
public void setProduceQuota(Long produceQuota) {
|
||||||
this.produceQuota = produceQuota;
|
this.produceQuota = produceQuota;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Long getConsumeQuota() {
|
public Long getConsumeQuota() {
|
||||||
return consumeQuota;
|
return consumeQuota;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setConsumeQuota(Long consumeQuota) {
|
public void setConsumeQuota(Long consumeQuota) {
|
||||||
this.consumeQuota = consumeQuota;
|
this.consumeQuota = consumeQuota;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean paramLegal() {
|
@Override
|
||||||
if (ValidateUtils.isNullOrLessThanZero(clusterId)
|
public boolean paramLegal() {
|
||||||
|| ValidateUtils.isBlank(topicName)
|
return !ValidateUtils.isNullOrLessThanZero(clusterId) && !ValidateUtils.isBlank(topicName) && !ValidateUtils.isBlank(appId);
|
||||||
|| ValidateUtils.isBlank(appId)) {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,37 +7,33 @@ import io.swagger.annotations.ApiModelProperty;
|
|||||||
|
|
||||||
@ApiModel(description = "权限调整")
|
@ApiModel(description = "权限调整")
|
||||||
public class TopicAuthorityDTO extends ClusterTopicDTO {
|
public class TopicAuthorityDTO extends ClusterTopicDTO {
|
||||||
|
@ApiModelProperty(value = "appId")
|
||||||
|
private String appId;
|
||||||
|
|
||||||
@ApiModelProperty(value = "appId")
|
@ApiModelProperty(value = "0:无权限, 1:读, 2:写, 3:读写, 4:可管理")
|
||||||
private String appId;
|
private Integer access;
|
||||||
|
|
||||||
@ApiModelProperty(value = "0:无权限, 1:读, 2:写, 3:读写, 4:可管理")
|
public String getAppId() {
|
||||||
private Integer access;
|
return appId;
|
||||||
|
}
|
||||||
public String getAppId() {
|
|
||||||
return appId;
|
public void setAppId(String appId) {
|
||||||
}
|
this.appId = appId;
|
||||||
|
}
|
||||||
public void setAppId(String appId) {
|
|
||||||
this.appId = appId;
|
public Integer getAccess() {
|
||||||
}
|
return access;
|
||||||
|
}
|
||||||
public Integer getAccess() {
|
|
||||||
return access;
|
public void setAccess(Integer access) {
|
||||||
}
|
this.access = access;
|
||||||
|
}
|
||||||
public void setAccess(Integer access) {
|
|
||||||
this.access = access;
|
@Override
|
||||||
}
|
public boolean paramLegal() {
|
||||||
|
return !ValidateUtils.isNullOrLessThanZero(clusterId)
|
||||||
@Override
|
&& !ValidateUtils.isBlank(topicName)
|
||||||
public boolean paramLegal() {
|
&& !ValidateUtils.isBlank(appId)
|
||||||
if (ValidateUtils.isNullOrLessThanZero(clusterId)
|
&& !ValidateUtils.isNullOrLessThanZero(access);
|
||||||
|| ValidateUtils.isBlank(topicName)
|
|
||||||
|| ValidateUtils.isBlank(appId)
|
|
||||||
|| ValidateUtils.isNullOrLessThanZero(access)) {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,35 @@
|
|||||||
|
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
|
||||||
|
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
|
import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
|
||||||
|
import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter;
|
||||||
|
import io.swagger.annotations.Api;
|
||||||
|
import io.swagger.annotations.ApiOperation;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Authority操作相关接口
|
||||||
|
* @author zengqiao
|
||||||
|
* @date 21/5/18
|
||||||
|
*/
|
||||||
|
@Api(tags = "OP-Authority操作相关接口(REST)")
|
||||||
|
@RestController
|
||||||
|
public class OpAuthorityController {
|
||||||
|
@Autowired
|
||||||
|
private TopicManagerService topicManagerService;
|
||||||
|
|
||||||
|
@ApiOperation(value = "权限调整",notes = "权限调整")
|
||||||
|
@PostMapping(value = "topic-authorities")
|
||||||
|
@ResponseBody
|
||||||
|
public Result addAuthority(@RequestBody TopicAuthorityDTO dto) {
|
||||||
|
//非空校验
|
||||||
|
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
|
||||||
|
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||||
|
}
|
||||||
|
return Result.buildFrom(topicManagerService.addAuthority(AuthorityConverter.convert2AuthorityDO(dto)));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,11 +1,13 @@
|
|||||||
package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart;
|
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
|
||||||
|
|
||||||
import com.xiaojukeji.kafka.manager.common.bizenum.RebalanceDimensionEnum;
|
import com.xiaojukeji.kafka.manager.common.bizenum.RebalanceDimensionEnum;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.op.RebalanceDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.op.RebalanceDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.AdminService;
|
import com.xiaojukeji.kafka.manager.service.service.AdminService;
|
||||||
@@ -16,22 +18,35 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Leader操作[选举|切换]相关接口
|
||||||
* @author zengqiao
|
* @author zengqiao
|
||||||
* @date 20/9/23
|
* @date 21/5/18
|
||||||
*/
|
*/
|
||||||
@Api(tags = "开放接口-OP相关接口(REST)")
|
@Api(tags = "OP-Leader操作相关接口(REST)")
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX)
|
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
|
||||||
public class ThirdPartOpUtilController {
|
public class OpLeaderController {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private AdminService adminService;
|
private AdminService adminService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ClusterService clusterService;
|
private ClusterService clusterService;
|
||||||
|
|
||||||
|
@ApiOperation(value = "优先副本选举状态")
|
||||||
|
@RequestMapping(value = {"leaders/preferred-replica-election-status", "utils/rebalance-status"}, method = RequestMethod.GET)
|
||||||
|
@ResponseBody
|
||||||
|
public Result preferredReplicaElectStatus(@RequestParam("clusterId") Long clusterId) {
|
||||||
|
ClusterDO clusterDO = clusterService.getById(clusterId);
|
||||||
|
if (ValidateUtils.isNull(clusterDO)) {
|
||||||
|
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskStatusEnum statusEnum = adminService.preferredReplicaElectionStatus(clusterDO);
|
||||||
|
return new Result<>(JsonUtils.toJson(statusEnum));
|
||||||
|
}
|
||||||
|
|
||||||
@ApiOperation(value = "优先副本选举")
|
@ApiOperation(value = "优先副本选举")
|
||||||
@RequestMapping(value = "op/rebalance", method = RequestMethod.POST)
|
@RequestMapping(value = {"leaders/preferred-replica-election", "utils/rebalance"}, method = RequestMethod.POST)
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
public Result preferredReplicaElect(@RequestBody RebalanceDTO reqObj) {
|
public Result preferredReplicaElect(@RequestBody RebalanceDTO reqObj) {
|
||||||
if (!reqObj.paramLegal()) {
|
if (!reqObj.paramLegal()) {
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
|
||||||
|
|
||||||
|
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
|
||||||
|
import io.swagger.annotations.Api;
|
||||||
|
import io.swagger.annotations.ApiOperation;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Quota操作相关接口
|
||||||
|
* @author zengqiao
|
||||||
|
* @date 21/5/18
|
||||||
|
*/
|
||||||
|
@Api(tags = "OP-Quota操作相关接口(REST)")
|
||||||
|
@RestController
|
||||||
|
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
|
||||||
|
public class OpQuotaController {
|
||||||
|
@Autowired
|
||||||
|
private QuotaService quotaService;
|
||||||
|
|
||||||
|
@ApiOperation(value = "配额调整",notes = "配额调整")
|
||||||
|
@RequestMapping(value = "topic-quotas",method = RequestMethod.POST)
|
||||||
|
@ResponseBody
|
||||||
|
public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) {
|
||||||
|
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
|
||||||
|
// 非空校验
|
||||||
|
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||||
|
}
|
||||||
|
return Result.buildFrom(quotaService.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto)));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,24 +1,23 @@
|
|||||||
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
|
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
|
||||||
|
|
||||||
import com.xiaojukeji.kafka.manager.common.bizenum.RebalanceDimensionEnum;
|
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
||||||
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.op.RebalanceDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicCreationDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.*;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicDeletionDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicExpansionDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicModificationDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.AdminService;
|
import com.xiaojukeji.kafka.manager.service.service.AdminService;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
|
||||||
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
|
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
|
|
||||||
import com.xiaojukeji.kafka.manager.service.utils.TopicCommands;
|
import com.xiaojukeji.kafka.manager.service.utils.TopicCommands;
|
||||||
import io.swagger.annotations.Api;
|
import io.swagger.annotations.Api;
|
||||||
import io.swagger.annotations.ApiOperation;
|
import io.swagger.annotations.ApiOperation;
|
||||||
@@ -30,25 +29,25 @@ import java.util.List;
|
|||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 运维工具类
|
* Topic操作相关接口
|
||||||
* @author zengqiao
|
* @author zengqiao
|
||||||
* @date 20/4/2
|
* @date 21/5/18
|
||||||
*/
|
*/
|
||||||
@Api(tags = "OP-Utils相关接口(REST)")
|
@Api(tags = "OP-Topic操作相关接口(REST)")
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
|
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
|
||||||
public class OpUtilsController {
|
public class OpTopicController {
|
||||||
@Autowired
|
|
||||||
private ClusterService clusterService;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private AdminService adminService;
|
private AdminService adminService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ClusterService clusterService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TopicManagerService topicManagerService;
|
private TopicManagerService topicManagerService;
|
||||||
|
|
||||||
@ApiOperation(value = "创建Topic")
|
@ApiOperation(value = "创建Topic")
|
||||||
@RequestMapping(value = {"utils/topics"}, method = RequestMethod.POST)
|
@RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.POST)
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
public Result createCommonTopic(@RequestBody TopicCreationDTO dto) {
|
public Result createCommonTopic(@RequestBody TopicCreationDTO dto) {
|
||||||
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
|
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
|
||||||
@@ -76,8 +75,66 @@ public class OpUtilsController {
|
|||||||
return Result.buildFrom(rs);
|
return Result.buildFrom(rs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ApiOperation(value = "Topic删除", notes = "单次不允许超过10个Topic")
|
||||||
|
@RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.DELETE)
|
||||||
|
@ResponseBody
|
||||||
|
public Result<List<TopicOperationResult>> deleteTopics(@RequestBody List<TopicDeletionDTO> dtoList) {
|
||||||
|
if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) {
|
||||||
|
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||||
|
}
|
||||||
|
String operator = SpringTool.getUserName();
|
||||||
|
|
||||||
|
List<TopicOperationResult> resultList = new ArrayList<>();
|
||||||
|
for (TopicDeletionDTO dto: dtoList) {
|
||||||
|
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
|
||||||
|
if (rc.getCode() != ResultStatus.SUCCESS.getCode()) {
|
||||||
|
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), rc));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 参数检查合法, 开始删除Topic
|
||||||
|
ResultStatus statusEnum = adminService.deleteTopic(rc.getData(), dto.getTopicName(), operator);
|
||||||
|
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), statusEnum));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (TopicOperationResult operationResult: resultList) {
|
||||||
|
if (!Constant.SUCCESS.equals(operationResult.getCode())) {
|
||||||
|
return Result.buildFrom(ResultStatus.OPERATION_FAILED, resultList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new Result<>(resultList);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ApiOperation(value = "修改Topic", notes = "")
|
||||||
|
@RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.PUT)
|
||||||
|
@ResponseBody
|
||||||
|
public Result modifyTopic(@RequestBody TopicModificationDTO dto) {
|
||||||
|
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
|
||||||
|
if (rc.getCode() != ResultStatus.SUCCESS.getCode()) {
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
ClusterDO clusterDO = rc.getData();
|
||||||
|
|
||||||
|
// 获取属性
|
||||||
|
Properties properties = dto.getProperties();
|
||||||
|
if (ValidateUtils.isNull(properties)) {
|
||||||
|
properties = new Properties();
|
||||||
|
}
|
||||||
|
properties.put(KafkaConstant.RETENTION_MS_KEY, String.valueOf(dto.getRetentionTime()));
|
||||||
|
|
||||||
|
// 操作修改
|
||||||
|
String operator = SpringTool.getUserName();
|
||||||
|
ResultStatus rs = TopicCommands.modifyTopicConfig(clusterDO, dto.getTopicName(), properties);
|
||||||
|
if (!ResultStatus.SUCCESS.equals(rs)) {
|
||||||
|
return Result.buildFrom(rs);
|
||||||
|
}
|
||||||
|
topicManagerService.modifyTopicByOp(dto.getClusterId(), dto.getTopicName(), dto.getAppId(), dto.getDescription(), operator);
|
||||||
|
return new Result();
|
||||||
|
}
|
||||||
|
|
||||||
@ApiOperation(value = "Topic扩分区", notes = "")
|
@ApiOperation(value = "Topic扩分区", notes = "")
|
||||||
@RequestMapping(value = {"utils/expand-partitions"}, method = RequestMethod.PUT)
|
@RequestMapping(value = {"topics/expand-partitions", "utils/expand-partitions"}, method = RequestMethod.PUT)
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
public Result<List<TopicOperationResult>> expandTopics(@RequestBody List<TopicExpansionDTO> dtoList) {
|
public Result<List<TopicOperationResult>> expandTopics(@RequestBody List<TopicExpansionDTO> dtoList) {
|
||||||
if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) {
|
if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) {
|
||||||
@@ -112,108 +169,6 @@ public class OpUtilsController {
|
|||||||
return new Result<>(resultList);
|
return new Result<>(resultList);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiOperation(value = "Topic删除", notes = "单次不允许超过10个Topic")
|
|
||||||
@RequestMapping(value = {"utils/topics"}, method = RequestMethod.DELETE)
|
|
||||||
@ResponseBody
|
|
||||||
public Result<List<TopicOperationResult>> deleteTopics(@RequestBody List<TopicDeletionDTO> dtoList) {
|
|
||||||
if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) {
|
|
||||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
|
||||||
}
|
|
||||||
String operator = SpringTool.getUserName();
|
|
||||||
|
|
||||||
List<TopicOperationResult> resultList = new ArrayList<>();
|
|
||||||
for (TopicDeletionDTO dto: dtoList) {
|
|
||||||
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
|
|
||||||
if (rc.getCode() != ResultStatus.SUCCESS.getCode()) {
|
|
||||||
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), rc));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 参数检查合法, 开始删除Topic
|
|
||||||
ResultStatus statusEnum = adminService.deleteTopic(rc.getData(), dto.getTopicName(), operator);
|
|
||||||
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), statusEnum));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (TopicOperationResult operationResult: resultList) {
|
|
||||||
if (!Constant.SUCCESS.equals(operationResult.getCode())) {
|
|
||||||
return Result.buildFrom(ResultStatus.OPERATION_FAILED, resultList);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new Result<>(resultList);
|
|
||||||
}
|
|
||||||
|
|
||||||
@ApiOperation(value = "修改Topic", notes = "")
|
|
||||||
@RequestMapping(value = {"utils/topics"}, method = RequestMethod.PUT)
|
|
||||||
@ResponseBody
|
|
||||||
public Result modifyTopic(@RequestBody TopicModificationDTO dto) {
|
|
||||||
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
|
|
||||||
if (rc.getCode() != ResultStatus.SUCCESS.getCode()) {
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
ClusterDO clusterDO = rc.getData();
|
|
||||||
|
|
||||||
// 获取属性
|
|
||||||
Properties properties = dto.getProperties();
|
|
||||||
if (ValidateUtils.isNull(properties)) {
|
|
||||||
properties = new Properties();
|
|
||||||
}
|
|
||||||
properties.put(KafkaConstant.RETENTION_MS_KEY, String.valueOf(dto.getRetentionTime()));
|
|
||||||
|
|
||||||
// 操作修改
|
|
||||||
String operator = SpringTool.getUserName();
|
|
||||||
ResultStatus rs = TopicCommands.modifyTopicConfig(clusterDO, dto.getTopicName(), properties);
|
|
||||||
if (!ResultStatus.SUCCESS.equals(rs)) {
|
|
||||||
return Result.buildFrom(rs);
|
|
||||||
}
|
|
||||||
topicManagerService.modifyTopicByOp(dto.getClusterId(), dto.getTopicName(), dto.getAppId(), dto.getDescription(), operator);
|
|
||||||
return new Result();
|
|
||||||
}
|
|
||||||
|
|
||||||
@ApiOperation(value = "优先副本选举状态")
|
|
||||||
@RequestMapping(value = "utils/rebalance-status", method = RequestMethod.GET)
|
|
||||||
@ResponseBody
|
|
||||||
public Result preferredReplicaElectStatus(@RequestParam("clusterId") Long clusterId) {
|
|
||||||
ClusterDO clusterDO = clusterService.getById(clusterId);
|
|
||||||
if (ValidateUtils.isNull(clusterDO)) {
|
|
||||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
|
||||||
}
|
|
||||||
|
|
||||||
TaskStatusEnum statusEnum = adminService.preferredReplicaElectionStatus(clusterDO);
|
|
||||||
return new Result<>(JsonUtils.toJson(statusEnum));
|
|
||||||
}
|
|
||||||
|
|
||||||
@ApiOperation(value = "优先副本选举")
|
|
||||||
@RequestMapping(value = "utils/rebalance", method = RequestMethod.POST)
|
|
||||||
@ResponseBody
|
|
||||||
public Result preferredReplicaElect(@RequestBody RebalanceDTO reqObj) {
|
|
||||||
if (!reqObj.paramLegal()) {
|
|
||||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
|
||||||
}
|
|
||||||
ClusterDO clusterDO = clusterService.getById(reqObj.getClusterId());
|
|
||||||
if (ValidateUtils.isNull(clusterDO)) {
|
|
||||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
|
||||||
}
|
|
||||||
|
|
||||||
ResultStatus rs = null;
|
|
||||||
if (RebalanceDimensionEnum.CLUSTER.getCode().equals(reqObj.getDimension())) {
|
|
||||||
// 按照Cluster纬度均衡
|
|
||||||
rs = adminService.preferredReplicaElection(clusterDO, SpringTool.getUserName());
|
|
||||||
} else if (RebalanceDimensionEnum.BROKER.getCode().equals(reqObj.getDimension())) {
|
|
||||||
// 按照Broker纬度均衡
|
|
||||||
rs = adminService.preferredReplicaElection(clusterDO, reqObj.getBrokerId(), SpringTool.getUserName());
|
|
||||||
} else if (RebalanceDimensionEnum.TOPIC.getCode().equals(reqObj.getDimension())) {
|
|
||||||
// 按照Topic纬度均衡
|
|
||||||
rs = adminService.preferredReplicaElection(clusterDO, reqObj.getTopicName(), SpringTool.getUserName());
|
|
||||||
} else if (RebalanceDimensionEnum.PARTITION.getCode().equals(reqObj.getDimension())) {
|
|
||||||
// 按照Partition纬度均衡
|
|
||||||
rs = adminService.preferredReplicaElection(clusterDO, reqObj.getTopicName(), reqObj.getPartitionId(), SpringTool.getUserName());
|
|
||||||
} else {
|
|
||||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
|
||||||
}
|
|
||||||
return Result.buildFrom(rs);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Result<ClusterDO> checkParamAndGetClusterDO(ClusterTopicDTO dto) {
|
private Result<ClusterDO> checkParamAndGetClusterDO(ClusterTopicDTO dto) {
|
||||||
if (!dto.paramLegal()) {
|
if (!dto.paramLegal()) {
|
||||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||||
@@ -226,4 +181,3 @@ public class OpUtilsController {
|
|||||||
return new Result<>(clusterDO);
|
return new Result<>(clusterDO);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5,8 +5,6 @@ import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
|||||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO;
|
||||||
@@ -14,15 +12,12 @@ import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGro
|
|||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorizedAppVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorizedAppVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||||
import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO;
|
|
||||||
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO;
|
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.*;
|
import com.xiaojukeji.kafka.manager.service.service.*;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
|
|
||||||
import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter;
|
|
||||||
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
|
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
|
||||||
import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter;
|
import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter;
|
||||||
import com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter;
|
import com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter;
|
||||||
@@ -57,9 +52,6 @@ public class ThirdPartTopicController {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TopicManagerService topicManagerService;
|
private TopicManagerService topicManagerService;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private QuotaService quotaService;
|
|
||||||
|
|
||||||
@ApiOperation(value = "Topic元信息", notes = "LogX调用")
|
@ApiOperation(value = "Topic元信息", notes = "LogX调用")
|
||||||
@RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET)
|
@RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET)
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
@@ -141,26 +133,4 @@ public class ThirdPartTopicController {
|
|||||||
topicManagerService.getTopicAuthorizedApps(physicalClusterId, topicName))
|
topicManagerService.getTopicAuthorizedApps(physicalClusterId, topicName))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiOperation(value = "配额调整",notes = "配额调整")
|
|
||||||
@RequestMapping(value = "{topics/quota}",method = RequestMethod.POST)
|
|
||||||
@ResponseBody
|
|
||||||
public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) {
|
|
||||||
// 非空校验
|
|
||||||
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
|
|
||||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
|
||||||
}
|
|
||||||
return Result.buildFrom(quotaService.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@ApiOperation(value = "权限调整",notes = "权限调整")
|
|
||||||
@RequestMapping(value = "{topics/authority}",method = RequestMethod.POST)
|
|
||||||
@ResponseBody
|
|
||||||
public Result addAuthority(@RequestBody TopicAuthorityDTO dto) {
|
|
||||||
//非空校验
|
|
||||||
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
|
|
||||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
|
||||||
}
|
|
||||||
return Result.buildFrom(topicManagerService.addAuthority(AuthorityConverter.convert2AuthorityDO(dto)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user