From eb8fe7758235e09da72cf2d7fadf2c597d43c6b9 Mon Sep 17 00:00:00 2001 From: "tangcongfa_v@didichuxing.com" Date: Wed, 12 May 2021 11:31:41 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E9=A2=9D=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entity/dto/normal/TopicQuotaDTO.java | 10 ++++ .../manager/service/service/TopicService.java | 6 -- .../service/impl/TopicServiceImpl.java | 35 ----------- .../normal/NormalTopicController.java | 11 ---- .../thirdpart/ThirdPartTopicController.java | 58 ++++++++++++++++++- 5 files changed, 67 insertions(+), 53 deletions(-) diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java index aeb86814..99399624 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java @@ -1,6 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.dto.normal; import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -39,4 +40,13 @@ public class TopicQuotaDTO extends ClusterTopicDTO { public void setConsumeQuota(Long consumeQuota) { this.consumeQuota = consumeQuota; } + + public boolean paramLegal() { + if (ValidateUtils.isNull(clusterId) + || ValidateUtils.isNull(topicName) + || ValidateUtils.isNull(appId)) { + return false; + } + return true; + } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index cacfb9f0..9e4c244c 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -6,7 +6,6 @@ import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ao.*; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; -import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBrokerDTO; @@ -107,9 +106,4 @@ public interface TopicService { Result checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime); - /** - * 配额调整 - */ - Result addTopicQuota(TopicQuotaDTO dto); - } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index f6140cc9..63191888 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -3,8 +3,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; -import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; -import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; import com.xiaojukeji.kafka.manager.common.constant.Constant; @@ -25,14 +23,12 @@ import com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao; import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao; import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao; import com.xiaojukeji.kafka.manager.common.entity.pojo.*; -import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao; import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool; import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache; import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; -import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy; import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils; import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils; @@ -91,15 +87,6 @@ public class TopicServiceImpl implements TopicService { @Autowired private AbstractHealthScoreStrategy healthScoreStrategy; - @Autowired - private AdminService adminService; - - @Autowired - private QuotaService quotaService; - - @Autowired - private AuthorityDao authorityDao; - @Override public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { try { @@ -837,28 +824,6 @@ public class TopicServiceImpl implements TopicService { return new Result<>(TopicOffsetChangedEnum.UNKNOWN); } - @Override - public Result addTopicQuota(TopicQuotaDTO dto) { - //获取物理集群id - Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); - if (ValidateUtils.isNull(physicalClusterId)) { - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - //构建配额 - TopicQuota topicQuota = new TopicQuota(); - topicQuota.setClusterId(physicalClusterId); - topicQuota.setAppId(dto.getAppId()); - topicQuota.setTopicName(dto.getTopicName()); - topicQuota.setProduceQuota(dto.getProduceQuota()); - topicQuota.setConsumeQuota(dto.getConsumeQuota()); - //配额调整 - int result = quotaService.addTopicQuota(topicQuota); - if (result > 0) { - return Result.buildFrom(ResultStatus.SUCCESS); - } - return Result.buildFrom(ResultStatus.MYSQL_ERROR); - } - private Result checkTopicOffsetChanged(ClusterDO clusterDO, String topicName, Map endOffsetMap) { diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index 0b6dfde6..aaac290f 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -7,7 +7,6 @@ import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicPartitionDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; -import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.TopicBusinessInfoVO; @@ -361,14 +360,4 @@ public class NormalTopicController { return new Result<>(new TopicStatisticMetricsVO(maxAvgBytesIn)); } - @ApiOperation(value = "配额调整",notes = "配额调整") - @RequestMapping(value = "{topics/quota/add}",method = RequestMethod.POST) - @ResponseBody - public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) { - if (ValidateUtils.isNull(dto)) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - return topicService.addTopicQuota(dto); - } - } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java index b247cdb8..ee39b39a 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java @@ -5,7 +5,10 @@ import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGroupVO; @@ -15,9 +18,12 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; +import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter; import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter; import com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter; @@ -52,6 +58,15 @@ public class ThirdPartTopicController { @Autowired private TopicManagerService topicManagerService; + @Autowired + private AuthorityService authorityService; + + @Autowired + private QuotaService quotaService; + + @Autowired + private LogicalClusterMetadataManager logicalClusterMetadataManager; + @ApiOperation(value = "Topic元信息", notes = "LogX调用") @RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET) @ResponseBody @@ -133,4 +148,45 @@ public class ThirdPartTopicController { topicManagerService.getTopicAuthorizedApps(physicalClusterId, topicName)) ); } -} \ No newline at end of file + + @ApiOperation(value = "配额调整",notes = "配额调整") + @RequestMapping(value = "{topics/quota/add}",method = RequestMethod.POST) + @ResponseBody + public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) { + //非空校验 + if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + //获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + //权限判断(access 0:无权限, 1:读, 2:写, 3:读写) + AuthorityDO authority = authorityService.getAuthority(physicalClusterId, dto.getTopicName(), dto.getAppId()); + if (ValidateUtils.isNull(authority) || authority.getAccess() == 0) { + return Result.buildFrom(ResultStatus.USER_WITHOUT_AUTHORITY); + } + if (authority.getAccess() == 1) { + //可以消费 + dto.setProduceQuota(null); + } + if (authority.getAccess() == 2) { + //可以生产 + dto.setConsumeQuota(null); + } + //构建topicquota + TopicQuota topicQuotaDO = new TopicQuota(); + topicQuotaDO.setAppId(dto.getAppId()); + topicQuotaDO.setClusterId(physicalClusterId); + topicQuotaDO.setTopicName(dto.getTopicName()); + topicQuotaDO.setConsumeQuota(dto.getConsumeQuota()); + topicQuotaDO.setProduceQuota(dto.getProduceQuota()); + //添加配额 + if (quotaService.addTopicQuota(topicQuotaDO) > 0) { + return Result.buildFrom(ResultStatus.SUCCESS); + } + return Result.buildFrom(ResultStatus.MYSQL_ERROR); + } + +}