diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java index dd95a3ba..bff415a5 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java @@ -11,10 +11,10 @@ public class TopicQuotaDTO extends ClusterTopicDTO { @ApiModelProperty(value = "appId") private String appId; - @ApiModelProperty(value = "发送数据速率") + @ApiModelProperty(value = "发送数据速率B/s") private Long produceQuota; - @ApiModelProperty(value = "消费数据速率") + @ApiModelProperty(value = "消费数据速率B/s") private Long consumeQuota; public String getAppId() { @@ -42,9 +42,9 @@ public class TopicQuotaDTO extends ClusterTopicDTO { } public boolean paramLegal() { - if (ValidateUtils.isNull(clusterId) - || ValidateUtils.isNull(topicName) - || ValidateUtils.isNull(appId)) { + if (ValidateUtils.isNullOrLessThanZero(clusterId) + || ValidateUtils.isBlank(topicName) + || ValidateUtils.isBlank(appId)) { return false; } return true; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java index 5e7e7a75..79524204 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java @@ -3,7 +3,6 @@ package com.xiaojukeji.kafka.manager.service.service; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic; -import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicDTO; @@ -125,13 +124,6 @@ public interface TopicManagerService { TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName); - /** - * topic配额调整 - * @param topicQuota topic配额 - * @return - */ - ResultStatus addTopicQuota(TopicQuota topicQuota); - /** * topic权限调整 * @param authorityDO topic权限 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java index 6a78e4f4..225c67b0 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.service.service.gateway; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; /** @@ -34,4 +35,11 @@ public interface QuotaService { TopicQuota getQuotaFromZk(Long clusterId, String topicName, String appId); Boolean modifyProduceQuota(Long clusterId, String topicName, String appId, Long produceQuota); + + /** + * topic配额调整 + * @param topicQuota topic配额 + * @return + */ + ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java index 2ce5facf..8baf61cc 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java @@ -1,11 +1,16 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl; +import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; import com.xiaojukeji.kafka.manager.service.strategy.AbstractAllocateQuotaStrategy; @@ -28,6 +33,12 @@ public class QuotaServiceImpl implements QuotaService { @Autowired private AbstractAllocateQuotaStrategy allocateQuotaStrategy; + @Autowired + private LogicalClusterMetadataManager logicalClusterMetadataManager; + + @Autowired + private AuthorityService authorityService; + @Override public int addTopicQuota(TopicQuota topicQuotaDO) { return KafkaZookeeperUtils.setTopicQuota( @@ -78,4 +89,34 @@ public class QuotaServiceImpl implements QuotaService { } return Boolean.TRUE; } + + @Override + public ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota) { + // 获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + return ResultStatus.CLUSTER_NOT_EXIST; + } + // 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理) + AuthorityDO authority = authorityService.getAuthority(physicalClusterId, + topicQuota.getTopicName(), topicQuota.getAppId()); + if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) { + return ResultStatus.USER_WITHOUT_AUTHORITY; + } + if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) { + // 可以消费 + topicQuota.setProduceQuota(null); + } + if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) { + // 可以生产 + topicQuota.setConsumeQuota(null); + } + // 设置物理集群id + topicQuota.setClusterId(physicalClusterId); + // 添加配额 + if (addTopicQuota(topicQuota) > 0) { + return ResultStatus.SUCCESS; + } + return ResultStatus.ZOOKEEPER_WRITE_FAILED; + } } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index a5d459b4..4a8f501f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -10,7 +10,6 @@ import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic; -import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo; @@ -36,7 +35,6 @@ import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; -import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,9 +87,6 @@ public class TopicManagerServiceImpl implements TopicManagerService { @Autowired private OperateRecordService operateRecordService; - @Autowired - private QuotaService quotaService; - @Override public List listAll() { try { @@ -624,36 +619,6 @@ public class TopicManagerServiceImpl implements TopicManagerService { return topicBusinessInfo; } - @Override - public ResultStatus addTopicQuota(TopicQuota topicQuota) { - // 获取物理集群id - Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId()); - if (ValidateUtils.isNull(physicalClusterId)) { - return ResultStatus.CLUSTER_NOT_EXIST; - } - // 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理) - AuthorityDO authority = authorityService.getAuthority(physicalClusterId, - topicQuota.getTopicName(), topicQuota.getAppId()); - if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) { - return ResultStatus.USER_WITHOUT_AUTHORITY; - } - if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) { - // 可以消费 - topicQuota.setProduceQuota(null); - } - if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) { - // 可以生产 - topicQuota.setConsumeQuota(null); - } - // 设置物理集群id - topicQuota.setClusterId(physicalClusterId); - // 添加配额 - if (quotaService.addTopicQuota(topicQuota) > 0) { - return ResultStatus.SUCCESS; - } - return ResultStatus.MYSQL_ERROR; - } - @Override public ResultStatus addAuthority(AuthorityDO authorityDO) { // 查询该用户拥有的应用 diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java index b5394f6b..bb596989 100644 --- a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java @@ -32,10 +32,10 @@ public class TopicAuthorityDTO extends ClusterTopicDTO { @Override public boolean paramLegal() { - if (ValidateUtils.isNull(clusterId) - || ValidateUtils.isNull(topicName) - || ValidateUtils.isNull(appId) - || ValidateUtils.isNull(access)) { + if (ValidateUtils.isNullOrLessThanZero(clusterId) + || ValidateUtils.isBlank(topicName) + || ValidateUtils.isBlank(appId) + || ValidateUtils.isNullOrLessThanZero(access)) { return false; } return true; diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java index 93907371..0fa615b5 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java @@ -21,6 +21,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter; import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter; import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter; @@ -56,6 +57,9 @@ public class ThirdPartTopicController { @Autowired private TopicManagerService topicManagerService; + @Autowired + private QuotaService quotaService; + @ApiOperation(value = "Topic元信息", notes = "LogX调用") @RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET) @ResponseBody @@ -139,18 +143,18 @@ public class ThirdPartTopicController { } @ApiOperation(value = "配额调整",notes = "配额调整") - @RequestMapping(value = "{topics/quota/add}",method = RequestMethod.POST) + @RequestMapping(value = "{topics/quota}",method = RequestMethod.POST) @ResponseBody public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) { // 非空校验 if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); } - return Result.buildFrom(topicManagerService.addTopicQuota(TopicQuota.buildFrom(dto))); + return Result.buildFrom(quotaService.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto))); } @ApiOperation(value = "权限调整",notes = "权限调整") - @RequestMapping(value = "{topics/authority/add}",method = RequestMethod.POST) + @RequestMapping(value = "{topics/authority}",method = RequestMethod.POST) @ResponseBody public Result addAuthority(@RequestBody TopicAuthorityDTO dto) { //非空校验