diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java index 7b3bc979..6b734348 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java @@ -1,5 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.ao.gateway; +import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO; + /** * @author zhongyuankai * @date 2020/4/27 @@ -65,4 +67,15 @@ public class TopicQuota { ", consumeQuota=" + consumeQuota + '}'; } + + public static TopicQuota buildFrom(TopicQuotaDTO dto) { + TopicQuota topicQuota = new TopicQuota(); + topicQuota.setAppId(dto.getAppId()); + topicQuota.setClusterId(dto.getClusterId()); + topicQuota.setTopicName(dto.getTopicName()); + topicQuota.setProduceQuota(dto.getProduceQuota()); + topicQuota.setConsumeQuota(dto.getConsumeQuota()); + return topicQuota; + } + } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java new file mode 100644 index 00000000..5719cd28 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.kafka.manager.common.entity.dto.gateway; + +import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel(description = "配额调整") +public class TopicQuotaDTO extends ClusterTopicDTO { + @ApiModelProperty(value = "appId") + private String appId; + + @ApiModelProperty(value = "发送数据速率B/s") + private Long produceQuota; + + @ApiModelProperty(value = "消费数据速率B/s") + private Long consumeQuota; + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public Long getProduceQuota() { + return produceQuota; + } + + public void setProduceQuota(Long produceQuota) { + this.produceQuota = produceQuota; + } + + public Long getConsumeQuota() { + return consumeQuota; + } + + public void setConsumeQuota(Long consumeQuota) { + this.consumeQuota = consumeQuota; + } + + @Override + public boolean paramLegal() { + return !ValidateUtils.isNullOrLessThanZero(clusterId) && !ValidateUtils.isBlank(topicName) && !ValidateUtils.isBlank(appId); + } +} diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java index 8dc0e0c1..79524204 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java @@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import java.util.Date; import java.util.List; @@ -122,5 +123,12 @@ public interface TopicManagerService { List getTopicStatistic(Long clusterId, String topicName, Date startTime, Date endTime); TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName); + + /** + * topic权限调整 + * @param authorityDO topic权限 + * @return + */ + ResultStatus addAuthority(AuthorityDO authorityDO); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index dacba4b0..9e4c244c 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -105,4 +105,5 @@ public interface TopicService { List getTopicBrokerList(Long clusterId, String topicName); Result checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime); + } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java index 6a78e4f4..225c67b0 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.service.service.gateway; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; /** @@ -34,4 +35,11 @@ public interface QuotaService { TopicQuota getQuotaFromZk(Long clusterId, String topicName, String appId); Boolean modifyProduceQuota(Long clusterId, String topicName, String appId, Long produceQuota); + + /** + * topic配额调整 + * @param topicQuota topic配额 + * @return + */ + ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java index 2ce5facf..8baf61cc 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java @@ -1,11 +1,16 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl; +import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; import com.xiaojukeji.kafka.manager.service.strategy.AbstractAllocateQuotaStrategy; @@ -28,6 +33,12 @@ public class QuotaServiceImpl implements QuotaService { @Autowired private AbstractAllocateQuotaStrategy allocateQuotaStrategy; + @Autowired + private LogicalClusterMetadataManager logicalClusterMetadataManager; + + @Autowired + private AuthorityService authorityService; + @Override public int addTopicQuota(TopicQuota topicQuotaDO) { return KafkaZookeeperUtils.setTopicQuota( @@ -78,4 +89,34 @@ public class QuotaServiceImpl implements QuotaService { } return Boolean.TRUE; } + + @Override + public ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota) { + // 获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + return ResultStatus.CLUSTER_NOT_EXIST; + } + // 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理) + AuthorityDO authority = authorityService.getAuthority(physicalClusterId, + topicQuota.getTopicName(), topicQuota.getAppId()); + if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) { + return ResultStatus.USER_WITHOUT_AUTHORITY; + } + if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) { + // 可以消费 + topicQuota.setProduceQuota(null); + } + if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) { + // 可以生产 + topicQuota.setConsumeQuota(null); + } + // 设置物理集群id + topicQuota.setClusterId(physicalClusterId); + // 添加配额 + if (addTopicQuota(topicQuota) > 0) { + return ResultStatus.SUCCESS; + } + return ResultStatus.ZOOKEEPER_WRITE_FAILED; + } } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index bce5fbe7..4a8f501f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -20,6 +20,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import com.xiaojukeji.kafka.manager.common.utils.DateUtils; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; +import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData; @@ -618,6 +619,38 @@ public class TopicManagerServiceImpl implements TopicManagerService { return topicBusinessInfo; } + @Override + public ResultStatus addAuthority(AuthorityDO authorityDO) { + // 查询该用户拥有的应用 + List appDOs = appService.getByPrincipal(SpringTool.getUserName()); + if (ValidateUtils.isEmptyList(appDOs)) { + // 该用户无应用,需要先申请应用 + return ResultStatus.APP_NOT_EXIST; + } + List appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList()); + if (!appIds.contains(authorityDO.getAppId())) { + // 入参中的appId,该用户未拥有 + return ResultStatus.APP_NOT_EXIST; + } + // 获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(authorityDO.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + // 集群不存在 + return ResultStatus.CLUSTER_NOT_EXIST; + } + TopicDO topic = getByTopicName(physicalClusterId, authorityDO.getTopicName()); + if (ValidateUtils.isNull(topic)) { + // topic不存在 + return ResultStatus.TOPIC_NOT_EXIST; + } + // 设置物理集群id + authorityDO.setClusterId(physicalClusterId); + if (authorityService.addAuthority(authorityDO) > 0) { + return ResultStatus.SUCCESS; + } + return ResultStatus.MYSQL_ERROR; + } + private RdTopicBasic convert2RdTopicBasic(ClusterDO clusterDO, String topicName, TopicDO topicDO, diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java new file mode 100644 index 00000000..30ceecb2 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java @@ -0,0 +1,39 @@ +package com.xiaojukeji.kafka.manager.openapi.common.dto; + +import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel(description = "权限调整") +public class TopicAuthorityDTO extends ClusterTopicDTO { + @ApiModelProperty(value = "appId") + private String appId; + + @ApiModelProperty(value = "0:无权限, 1:读, 2:写, 3:读写, 4:可管理") + private Integer access; + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public Integer getAccess() { + return access; + } + + public void setAccess(Integer access) { + this.access = access; + } + + @Override + public boolean paramLegal() { + return !ValidateUtils.isNullOrLessThanZero(clusterId) + && !ValidateUtils.isBlank(topicName) + && !ValidateUtils.isBlank(appId) + && !ValidateUtils.isNullOrLessThanZero(access); + } +} diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index 7b5d97c3..aaac290f 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -337,7 +337,7 @@ public class NormalTopicController { } return new Result<>(TopicModelConverter.convert2TopicMineAppVOList( - topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName())) + topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName())) ); } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpAuthorityController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpAuthorityController.java new file mode 100644 index 00000000..bd415527 --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpAuthorityController.java @@ -0,0 +1,35 @@ +package com.xiaojukeji.kafka.manager.web.api.versionone.op; + +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO; +import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; +import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +/** + * Authority操作相关接口 + * @author zengqiao + * @date 21/5/18 + */ +@Api(tags = "OP-Authority操作相关接口(REST)") +@RestController +public class OpAuthorityController { + @Autowired + private TopicManagerService topicManagerService; + + @ApiOperation(value = "权限调整",notes = "权限调整") + @PostMapping(value = "topic-authorities") + @ResponseBody + public Result addAuthority(@RequestBody TopicAuthorityDTO dto) { + //非空校验 + if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + return Result.buildFrom(topicManagerService.addAuthority(AuthorityConverter.convert2AuthorityDO(dto))); + } +} diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartOpUtilController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpLeaderController.java similarity index 69% rename from kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartOpUtilController.java rename to kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpLeaderController.java index b0d8ffd6..6b344831 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartOpUtilController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpLeaderController.java @@ -1,11 +1,13 @@ -package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart; +package com.xiaojukeji.kafka.manager.web.api.versionone.op; import com.xiaojukeji.kafka.manager.common.bizenum.RebalanceDimensionEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum; import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.dto.op.RebalanceDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.service.service.AdminService; @@ -16,22 +18,35 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** + * Leader操作[选举|切换]相关接口 * @author zengqiao - * @date 20/9/23 + * @date 21/5/18 */ -@Api(tags = "开放接口-OP相关接口(REST)") +@Api(tags = "OP-Leader操作相关接口(REST)") @RestController -@RequestMapping(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX) -public class ThirdPartOpUtilController { - +@RequestMapping(ApiPrefix.API_V1_OP_PREFIX) +public class OpLeaderController { @Autowired private AdminService adminService; @Autowired private ClusterService clusterService; + @ApiOperation(value = "优先副本选举状态") + @RequestMapping(value = {"leaders/preferred-replica-election-status", "utils/rebalance-status"}, method = RequestMethod.GET) + @ResponseBody + public Result preferredReplicaElectStatus(@RequestParam("clusterId") Long clusterId) { + ClusterDO clusterDO = clusterService.getById(clusterId); + if (ValidateUtils.isNull(clusterDO)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + + TaskStatusEnum statusEnum = adminService.preferredReplicaElectionStatus(clusterDO); + return new Result<>(JsonUtils.toJson(statusEnum)); + } + @ApiOperation(value = "优先副本选举") - @RequestMapping(value = "op/rebalance", method = RequestMethod.POST) + @RequestMapping(value = {"leaders/preferred-replica-election", "utils/rebalance"}, method = RequestMethod.POST) @ResponseBody public Result preferredReplicaElect(@RequestBody RebalanceDTO reqObj) { if (!reqObj.paramLegal()) { diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpQuotaController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpQuotaController.java new file mode 100644 index 00000000..7d9c70d7 --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpQuotaController.java @@ -0,0 +1,37 @@ +package com.xiaojukeji.kafka.manager.web.api.versionone.op; + +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; +import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +/** + * Quota操作相关接口 + * @author zengqiao + * @date 21/5/18 + */ +@Api(tags = "OP-Quota操作相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V1_OP_PREFIX) +public class OpQuotaController { + @Autowired + private QuotaService quotaService; + + @ApiOperation(value = "配额调整",notes = "配额调整") + @RequestMapping(value = "topic-quotas",method = RequestMethod.POST) + @ResponseBody + public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) { + if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { + // 非空校验 + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + return Result.buildFrom(quotaService.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto))); + } +} diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpUtilsController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpTopicController.java similarity index 69% rename from kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpUtilsController.java rename to kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpTopicController.java index 6d9e7a74..bf7a1340 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpUtilsController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpTopicController.java @@ -1,24 +1,23 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.op; -import com.xiaojukeji.kafka.manager.common.bizenum.RebalanceDimensionEnum; -import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum; +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult; import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; -import com.xiaojukeji.kafka.manager.common.entity.dto.op.RebalanceDTO; -import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.*; +import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicCreationDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicDeletionDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicExpansionDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicModificationDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.service.service.AdminService; import com.xiaojukeji.kafka.manager.service.service.ClusterService; -import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; -import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; -import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; -import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult; import com.xiaojukeji.kafka.manager.service.utils.TopicCommands; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -30,25 +29,25 @@ import java.util.List; import java.util.Properties; /** - * 运维工具类 + * Topic操作相关接口 * @author zengqiao - * @date 20/4/2 + * @date 21/5/18 */ -@Api(tags = "OP-Utils相关接口(REST)") +@Api(tags = "OP-Topic操作相关接口(REST)") @RestController @RequestMapping(ApiPrefix.API_V1_OP_PREFIX) -public class OpUtilsController { - @Autowired - private ClusterService clusterService; - +public class OpTopicController { @Autowired private AdminService adminService; + @Autowired + private ClusterService clusterService; + @Autowired private TopicManagerService topicManagerService; @ApiOperation(value = "创建Topic") - @RequestMapping(value = {"utils/topics"}, method = RequestMethod.POST) + @RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.POST) @ResponseBody public Result createCommonTopic(@RequestBody TopicCreationDTO dto) { Result rc = checkParamAndGetClusterDO(dto); @@ -76,8 +75,66 @@ public class OpUtilsController { return Result.buildFrom(rs); } + @ApiOperation(value = "Topic删除", notes = "单次不允许超过10个Topic") + @RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.DELETE) + @ResponseBody + public Result> deleteTopics(@RequestBody List dtoList) { + if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + String operator = SpringTool.getUserName(); + + List resultList = new ArrayList<>(); + for (TopicDeletionDTO dto: dtoList) { + Result rc = checkParamAndGetClusterDO(dto); + if (rc.getCode() != ResultStatus.SUCCESS.getCode()) { + resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), rc)); + continue; + } + + // 参数检查合法, 开始删除Topic + ResultStatus statusEnum = adminService.deleteTopic(rc.getData(), dto.getTopicName(), operator); + resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), statusEnum)); + } + + for (TopicOperationResult operationResult: resultList) { + if (!Constant.SUCCESS.equals(operationResult.getCode())) { + return Result.buildFrom(ResultStatus.OPERATION_FAILED, resultList); + } + } + return new Result<>(resultList); + } + + @ApiOperation(value = "修改Topic", notes = "") + @RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.PUT) + @ResponseBody + public Result modifyTopic(@RequestBody TopicModificationDTO dto) { + Result rc = checkParamAndGetClusterDO(dto); + if (rc.getCode() != ResultStatus.SUCCESS.getCode()) { + return rc; + } + + ClusterDO clusterDO = rc.getData(); + + // 获取属性 + Properties properties = dto.getProperties(); + if (ValidateUtils.isNull(properties)) { + properties = new Properties(); + } + properties.put(KafkaConstant.RETENTION_MS_KEY, String.valueOf(dto.getRetentionTime())); + + // 操作修改 + String operator = SpringTool.getUserName(); + ResultStatus rs = TopicCommands.modifyTopicConfig(clusterDO, dto.getTopicName(), properties); + if (!ResultStatus.SUCCESS.equals(rs)) { + return Result.buildFrom(rs); + } + topicManagerService.modifyTopicByOp(dto.getClusterId(), dto.getTopicName(), dto.getAppId(), dto.getDescription(), operator); + return new Result(); + } + @ApiOperation(value = "Topic扩分区", notes = "") - @RequestMapping(value = {"utils/expand-partitions"}, method = RequestMethod.PUT) + @RequestMapping(value = {"topics/expand-partitions", "utils/expand-partitions"}, method = RequestMethod.PUT) @ResponseBody public Result> expandTopics(@RequestBody List dtoList) { if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) { @@ -112,108 +169,6 @@ public class OpUtilsController { return new Result<>(resultList); } - @ApiOperation(value = "Topic删除", notes = "单次不允许超过10个Topic") - @RequestMapping(value = {"utils/topics"}, method = RequestMethod.DELETE) - @ResponseBody - public Result> deleteTopics(@RequestBody List dtoList) { - if (ValidateUtils.isNull(dtoList) || dtoList.size() > Constant.MAX_TOPIC_OPERATION_SIZE_PER_REQUEST) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - String operator = SpringTool.getUserName(); - - List resultList = new ArrayList<>(); - for (TopicDeletionDTO dto: dtoList) { - Result rc = checkParamAndGetClusterDO(dto); - if (rc.getCode() != ResultStatus.SUCCESS.getCode()) { - resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), rc)); - continue; - } - - // 参数检查合法, 开始删除Topic - ResultStatus statusEnum = adminService.deleteTopic(rc.getData(), dto.getTopicName(), operator); - resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), statusEnum)); - } - - for (TopicOperationResult operationResult: resultList) { - if (!Constant.SUCCESS.equals(operationResult.getCode())) { - return Result.buildFrom(ResultStatus.OPERATION_FAILED, resultList); - } - } - return new Result<>(resultList); - } - - @ApiOperation(value = "修改Topic", notes = "") - @RequestMapping(value = {"utils/topics"}, method = RequestMethod.PUT) - @ResponseBody - public Result modifyTopic(@RequestBody TopicModificationDTO dto) { - Result rc = checkParamAndGetClusterDO(dto); - if (rc.getCode() != ResultStatus.SUCCESS.getCode()) { - return rc; - } - - ClusterDO clusterDO = rc.getData(); - - // 获取属性 - Properties properties = dto.getProperties(); - if (ValidateUtils.isNull(properties)) { - properties = new Properties(); - } - properties.put(KafkaConstant.RETENTION_MS_KEY, String.valueOf(dto.getRetentionTime())); - - // 操作修改 - String operator = SpringTool.getUserName(); - ResultStatus rs = TopicCommands.modifyTopicConfig(clusterDO, dto.getTopicName(), properties); - if (!ResultStatus.SUCCESS.equals(rs)) { - return Result.buildFrom(rs); - } - topicManagerService.modifyTopicByOp(dto.getClusterId(), dto.getTopicName(), dto.getAppId(), dto.getDescription(), operator); - return new Result(); - } - - @ApiOperation(value = "优先副本选举状态") - @RequestMapping(value = "utils/rebalance-status", method = RequestMethod.GET) - @ResponseBody - public Result preferredReplicaElectStatus(@RequestParam("clusterId") Long clusterId) { - ClusterDO clusterDO = clusterService.getById(clusterId); - if (ValidateUtils.isNull(clusterDO)) { - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - - TaskStatusEnum statusEnum = adminService.preferredReplicaElectionStatus(clusterDO); - return new Result<>(JsonUtils.toJson(statusEnum)); - } - - @ApiOperation(value = "优先副本选举") - @RequestMapping(value = "utils/rebalance", method = RequestMethod.POST) - @ResponseBody - public Result preferredReplicaElect(@RequestBody RebalanceDTO reqObj) { - if (!reqObj.paramLegal()) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - ClusterDO clusterDO = clusterService.getById(reqObj.getClusterId()); - if (ValidateUtils.isNull(clusterDO)) { - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - - ResultStatus rs = null; - if (RebalanceDimensionEnum.CLUSTER.getCode().equals(reqObj.getDimension())) { - // 按照Cluster纬度均衡 - rs = adminService.preferredReplicaElection(clusterDO, SpringTool.getUserName()); - } else if (RebalanceDimensionEnum.BROKER.getCode().equals(reqObj.getDimension())) { - // 按照Broker纬度均衡 - rs = adminService.preferredReplicaElection(clusterDO, reqObj.getBrokerId(), SpringTool.getUserName()); - } else if (RebalanceDimensionEnum.TOPIC.getCode().equals(reqObj.getDimension())) { - // 按照Topic纬度均衡 - rs = adminService.preferredReplicaElection(clusterDO, reqObj.getTopicName(), SpringTool.getUserName()); - } else if (RebalanceDimensionEnum.PARTITION.getCode().equals(reqObj.getDimension())) { - // 按照Partition纬度均衡 - rs = adminService.preferredReplicaElection(clusterDO, reqObj.getTopicName(), reqObj.getPartitionId(), SpringTool.getUserName()); - } else { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - return Result.buildFrom(rs); - } - private Result checkParamAndGetClusterDO(ClusterTopicDTO dto) { if (!dto.paramLegal()) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); @@ -226,4 +181,3 @@ public class OpUtilsController { return new Result<>(clusterDO); } } - diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java index b247cdb8..a84c01af 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java @@ -133,4 +133,4 @@ public class ThirdPartTopicController { topicManagerService.getTopicAuthorizedApps(physicalClusterId, topicName)) ); } -} \ No newline at end of file +} diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/AuthorityConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/AuthorityConverter.java new file mode 100644 index 00000000..227a16a2 --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/AuthorityConverter.java @@ -0,0 +1,15 @@ +package com.xiaojukeji.kafka.manager.web.converters; + +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; +import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO; + +public class AuthorityConverter { + public static AuthorityDO convert2AuthorityDO(TopicAuthorityDTO dto) { + AuthorityDO authorityDO = new AuthorityDO(); + authorityDO.setAppId(dto.getAppId()); + authorityDO.setClusterId(dto.getClusterId()); + authorityDO.setTopicName(dto.getTopicName()); + authorityDO.setAccess(dto.getAccess()); + return authorityDO; + } +}