diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java index 7b3bc979..6b734348 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java @@ -1,5 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.ao.gateway; +import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO; + /** * @author zhongyuankai * @date 2020/4/27 @@ -65,4 +67,15 @@ public class TopicQuota { ", consumeQuota=" + consumeQuota + '}'; } + + public static TopicQuota buildFrom(TopicQuotaDTO dto) { + TopicQuota topicQuota = new TopicQuota(); + topicQuota.setAppId(dto.getAppId()); + topicQuota.setClusterId(dto.getClusterId()); + topicQuota.setTopicName(dto.getTopicName()); + topicQuota.setProduceQuota(dto.getProduceQuota()); + topicQuota.setConsumeQuota(dto.getConsumeQuota()); + return topicQuota; + } + } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java new file mode 100644 index 00000000..bff415a5 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java @@ -0,0 +1,52 @@ +package com.xiaojukeji.kafka.manager.common.entity.dto.gateway; + +import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel(description = "配额调整") +public class TopicQuotaDTO extends ClusterTopicDTO { + + @ApiModelProperty(value = "appId") + private String appId; + + @ApiModelProperty(value = "发送数据速率B/s") + private Long produceQuota; + + @ApiModelProperty(value = "消费数据速率B/s") + private Long consumeQuota; + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public Long getProduceQuota() { + return produceQuota; + } + + public void setProduceQuota(Long produceQuota) { + this.produceQuota = produceQuota; + } + + public Long getConsumeQuota() { + return consumeQuota; + } + + public void setConsumeQuota(Long consumeQuota) { + this.consumeQuota = consumeQuota; + } + + public boolean paramLegal() { + if (ValidateUtils.isNullOrLessThanZero(clusterId) + || ValidateUtils.isBlank(topicName) + || ValidateUtils.isBlank(appId)) { + return false; + } + return true; + } +} diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java index 8dc0e0c1..79524204 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java @@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import java.util.Date; import java.util.List; @@ -122,5 +123,12 @@ public interface TopicManagerService { List getTopicStatistic(Long clusterId, String topicName, Date startTime, Date endTime); TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName); + + /** + * topic权限调整 + * @param authorityDO topic权限 + * @return + */ + ResultStatus addAuthority(AuthorityDO authorityDO); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index dacba4b0..9e4c244c 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -105,4 +105,5 @@ public interface TopicService { List getTopicBrokerList(Long clusterId, String topicName); Result checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime); + } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java index 6a78e4f4..225c67b0 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.service.service.gateway; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; /** @@ -34,4 +35,11 @@ public interface QuotaService { TopicQuota getQuotaFromZk(Long clusterId, String topicName, String appId); Boolean modifyProduceQuota(Long clusterId, String topicName, String appId, Long produceQuota); + + /** + * topic配额调整 + * @param topicQuota topic配额 + * @return + */ + ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java index 2ce5facf..8baf61cc 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java @@ -1,11 +1,16 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl; +import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; import com.xiaojukeji.kafka.manager.service.strategy.AbstractAllocateQuotaStrategy; @@ -28,6 +33,12 @@ public class QuotaServiceImpl implements QuotaService { @Autowired private AbstractAllocateQuotaStrategy allocateQuotaStrategy; + @Autowired + private LogicalClusterMetadataManager logicalClusterMetadataManager; + + @Autowired + private AuthorityService authorityService; + @Override public int addTopicQuota(TopicQuota topicQuotaDO) { return KafkaZookeeperUtils.setTopicQuota( @@ -78,4 +89,34 @@ public class QuotaServiceImpl implements QuotaService { } return Boolean.TRUE; } + + @Override + public ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota) { + // 获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + return ResultStatus.CLUSTER_NOT_EXIST; + } + // 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理) + AuthorityDO authority = authorityService.getAuthority(physicalClusterId, + topicQuota.getTopicName(), topicQuota.getAppId()); + if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) { + return ResultStatus.USER_WITHOUT_AUTHORITY; + } + if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) { + // 可以消费 + topicQuota.setProduceQuota(null); + } + if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) { + // 可以生产 + topicQuota.setConsumeQuota(null); + } + // 设置物理集群id + topicQuota.setClusterId(physicalClusterId); + // 添加配额 + if (addTopicQuota(topicQuota) > 0) { + return ResultStatus.SUCCESS; + } + return ResultStatus.ZOOKEEPER_WRITE_FAILED; + } } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index bce5fbe7..4a8f501f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -20,6 +20,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import com.xiaojukeji.kafka.manager.common.utils.DateUtils; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; +import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData; @@ -618,6 +619,38 @@ public class TopicManagerServiceImpl implements TopicManagerService { return topicBusinessInfo; } + @Override + public ResultStatus addAuthority(AuthorityDO authorityDO) { + // 查询该用户拥有的应用 + List appDOs = appService.getByPrincipal(SpringTool.getUserName()); + if (ValidateUtils.isEmptyList(appDOs)) { + // 该用户无应用,需要先申请应用 + return ResultStatus.APP_NOT_EXIST; + } + List appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList()); + if (!appIds.contains(authorityDO.getAppId())) { + // 入参中的appId,该用户未拥有 + return ResultStatus.APP_NOT_EXIST; + } + // 获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(authorityDO.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + // 集群不存在 + return ResultStatus.CLUSTER_NOT_EXIST; + } + TopicDO topic = getByTopicName(physicalClusterId, authorityDO.getTopicName()); + if (ValidateUtils.isNull(topic)) { + // topic不存在 + return ResultStatus.TOPIC_NOT_EXIST; + } + // 设置物理集群id + authorityDO.setClusterId(physicalClusterId); + if (authorityService.addAuthority(authorityDO) > 0) { + return ResultStatus.SUCCESS; + } + return ResultStatus.MYSQL_ERROR; + } + private RdTopicBasic convert2RdTopicBasic(ClusterDO clusterDO, String topicName, TopicDO topicDO, diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java new file mode 100644 index 00000000..bb596989 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/TopicAuthorityDTO.java @@ -0,0 +1,43 @@ +package com.xiaojukeji.kafka.manager.openapi.common.dto; + +import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel(description = "权限调整") +public class TopicAuthorityDTO extends ClusterTopicDTO { + + @ApiModelProperty(value = "appId") + private String appId; + + @ApiModelProperty(value = "0:无权限, 1:读, 2:写, 3:读写, 4:可管理") + private Integer access; + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public Integer getAccess() { + return access; + } + + public void setAccess(Integer access) { + this.access = access; + } + + @Override + public boolean paramLegal() { + if (ValidateUtils.isNullOrLessThanZero(clusterId) + || ValidateUtils.isBlank(topicName) + || ValidateUtils.isBlank(appId) + || ValidateUtils.isNullOrLessThanZero(access)) { + return false; + } + return true; + } +} diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index 7b5d97c3..aaac290f 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -337,7 +337,7 @@ public class NormalTopicController { } return new Result<>(TopicModelConverter.convert2TopicMineAppVOList( - topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName())) + topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName())) ); } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java index b247cdb8..0fa615b5 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java @@ -5,6 +5,8 @@ import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; +import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO; @@ -12,12 +14,15 @@ import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGro import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorizedAppVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; +import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO; import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; +import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter; import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter; import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter; import com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter; @@ -52,6 +57,9 @@ public class ThirdPartTopicController { @Autowired private TopicManagerService topicManagerService; + @Autowired + private QuotaService quotaService; + @ApiOperation(value = "Topic元信息", notes = "LogX调用") @RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET) @ResponseBody @@ -133,4 +141,26 @@ public class ThirdPartTopicController { topicManagerService.getTopicAuthorizedApps(physicalClusterId, topicName)) ); } -} \ No newline at end of file + + @ApiOperation(value = "配额调整",notes = "配额调整") + @RequestMapping(value = "{topics/quota}",method = RequestMethod.POST) + @ResponseBody + public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) { + // 非空校验 + if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + return Result.buildFrom(quotaService.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto))); + } + + @ApiOperation(value = "权限调整",notes = "权限调整") + @RequestMapping(value = "{topics/authority}",method = RequestMethod.POST) + @ResponseBody + public Result addAuthority(@RequestBody TopicAuthorityDTO dto) { + //非空校验 + if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + return Result.buildFrom(topicManagerService.addAuthority(AuthorityConverter.convert2AuthorityDO(dto))); + } +} diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/AuthorityConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/AuthorityConverter.java new file mode 100644 index 00000000..227a16a2 --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/AuthorityConverter.java @@ -0,0 +1,15 @@ +package com.xiaojukeji.kafka.manager.web.converters; + +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; +import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO; + +public class AuthorityConverter { + public static AuthorityDO convert2AuthorityDO(TopicAuthorityDTO dto) { + AuthorityDO authorityDO = new AuthorityDO(); + authorityDO.setAppId(dto.getAppId()); + authorityDO.setClusterId(dto.getClusterId()); + authorityDO.setTopicName(dto.getTopicName()); + authorityDO.setAccess(dto.getAccess()); + return authorityDO; + } +}