diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java index 8dc0e0c1..a19d8220 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java @@ -3,6 +3,7 @@ package com.xiaojukeji.kafka.manager.service.service; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic; +import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicDTO; @@ -122,5 +123,12 @@ public interface TopicManagerService { List getTopicStatistic(Long clusterId, String topicName, Date startTime, Date endTime); TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName); + + /** + * topic配额调整 + * @param topicQuota topic配额 + * @return + */ + ResultStatus addTopicQuota(TopicQuota topicQuota); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index bce5fbe7..36f6f593 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic; +import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo; @@ -34,6 +35,7 @@ import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; +import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +88,9 @@ public class TopicManagerServiceImpl implements TopicManagerService { @Autowired private OperateRecordService operateRecordService; + @Autowired + private QuotaService quotaService; + @Override public List listAll() { try { @@ -618,6 +623,36 @@ public class TopicManagerServiceImpl implements TopicManagerService { return topicBusinessInfo; } + @Override + public ResultStatus addTopicQuota(TopicQuota topicQuota) { + // 获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + return ResultStatus.CLUSTER_NOT_EXIST; + } + // 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理) + AuthorityDO authority = authorityService.getAuthority(physicalClusterId, + topicQuota.getTopicName(), topicQuota.getAppId()); + if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) { + return ResultStatus.USER_WITHOUT_AUTHORITY; + } + if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) { + // 可以消费 + topicQuota.setProduceQuota(null); + } + if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) { + // 可以生产 + topicQuota.setConsumeQuota(null); + } + // 设置物理集群id + topicQuota.setClusterId(physicalClusterId); + // 添加配额 + if (quotaService.addTopicQuota(topicQuota) > 0) { + return ResultStatus.SUCCESS; + } + return ResultStatus.MYSQL_ERROR; + } + private RdTopicBasic convert2RdTopicBasic(ClusterDO clusterDO, String topicName, TopicDO topicDO, diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java index a1940e4c..762ed3a9 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java @@ -17,11 +17,9 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; -import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; -import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter; import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter; import com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter; @@ -56,12 +54,6 @@ public class ThirdPartTopicController { @Autowired private TopicManagerService topicManagerService; - @Autowired - private QuotaService quotaService; - - @Autowired - private LogicalClusterMetadataManager logicalClusterMetadataManager; - @ApiOperation(value = "Topic元信息", notes = "LogX调用") @RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET) @ResponseBody @@ -148,21 +140,11 @@ public class ThirdPartTopicController { @RequestMapping(value = "{topics/quota/add}",method = RequestMethod.POST) @ResponseBody public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) { - //非空校验 + // 非空校验 if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); } - //获取物理集群id - Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); - if (ValidateUtils.isNull(physicalClusterId)) { - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - dto.setClusterId(physicalClusterId); - // 添加配额 - if (quotaService.addTopicQuota(TopicQuota.buildFrom(dto)) > 0) { - return Result.buildFrom(ResultStatus.SUCCESS); - } - return Result.buildFrom(ResultStatus.MYSQL_ERROR); + return Result.buildFrom(topicManagerService.addTopicQuota(TopicQuota.buildFrom(dto))); } }