mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
配额调整
This commit is contained in:
@@ -1,5 +1,7 @@
|
|||||||
package com.xiaojukeji.kafka.manager.common.entity.ao.gateway;
|
package com.xiaojukeji.kafka.manager.common.entity.ao.gateway;
|
||||||
|
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author zhongyuankai
|
* @author zhongyuankai
|
||||||
* @date 2020/4/27
|
* @date 2020/4/27
|
||||||
@@ -65,4 +67,15 @@ public class TopicQuota {
|
|||||||
", consumeQuota=" + consumeQuota +
|
", consumeQuota=" + consumeQuota +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static TopicQuota buildFrom(TopicQuotaDTO dto) {
|
||||||
|
TopicQuota topicQuota = new TopicQuota();
|
||||||
|
topicQuota.setAppId(dto.getAppId());
|
||||||
|
topicQuota.setClusterId(dto.getClusterId());
|
||||||
|
topicQuota.setTopicName(dto.getTopicName());
|
||||||
|
topicQuota.setProduceQuota(dto.getProduceQuota());
|
||||||
|
topicQuota.setConsumeQuota(dto.getConsumeQuota());
|
||||||
|
return topicQuota;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.xiaojukeji.kafka.manager.common.entity.dto.normal;
|
package com.xiaojukeji.kafka.manager.common.entity.dto.gateway;
|
||||||
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
@@ -6,19 +6,14 @@ import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
|||||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGroupVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGroupVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorizedAppVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorizedAppVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||||
import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO;
|
|
||||||
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO;
|
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
@@ -26,8 +21,6 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
|
|||||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.*;
|
import com.xiaojukeji.kafka.manager.service.service.*;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
|
|
||||||
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
|
|
||||||
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
|
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
|
||||||
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
|
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
|
||||||
import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter;
|
import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter;
|
||||||
@@ -40,7 +33,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author zengqiao
|
* @author zengqiao
|
||||||
@@ -64,18 +56,12 @@ public class ThirdPartTopicController {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TopicManagerService topicManagerService;
|
private TopicManagerService topicManagerService;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private AuthorityService authorityService;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private QuotaService quotaService;
|
private QuotaService quotaService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private LogicalClusterMetadataManager logicalClusterMetadataManager;
|
private LogicalClusterMetadataManager logicalClusterMetadataManager;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private AppService appService;
|
|
||||||
|
|
||||||
@ApiOperation(value = "Topic元信息", notes = "LogX调用")
|
@ApiOperation(value = "Topic元信息", notes = "LogX调用")
|
||||||
@RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET)
|
@RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET)
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
@@ -171,78 +157,12 @@ public class ThirdPartTopicController {
|
|||||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||||
}
|
}
|
||||||
//权限判断(access 0:无权限, 1:读, 2:写, 3:读写)
|
dto.setClusterId(physicalClusterId);
|
||||||
AuthorityDO authority = authorityService.getAuthority(physicalClusterId, dto.getTopicName(), dto.getAppId());
|
// 添加配额
|
||||||
if (ValidateUtils.isNull(authority) || authority.getAccess() == 0) {
|
if (quotaService.addTopicQuota(TopicQuota.buildFrom(dto)) > 0) {
|
||||||
return Result.buildFrom(ResultStatus.USER_WITHOUT_AUTHORITY);
|
|
||||||
}
|
|
||||||
if (authority.getAccess() == 1) {
|
|
||||||
//可以消费
|
|
||||||
dto.setProduceQuota(null);
|
|
||||||
}
|
|
||||||
if (authority.getAccess() == 2) {
|
|
||||||
//可以生产
|
|
||||||
dto.setConsumeQuota(null);
|
|
||||||
}
|
|
||||||
//构建topicquota
|
|
||||||
TopicQuota topicQuotaDO = new TopicQuota();
|
|
||||||
topicQuotaDO.setAppId(dto.getAppId());
|
|
||||||
topicQuotaDO.setClusterId(physicalClusterId);
|
|
||||||
topicQuotaDO.setTopicName(dto.getTopicName());
|
|
||||||
topicQuotaDO.setConsumeQuota(dto.getConsumeQuota());
|
|
||||||
topicQuotaDO.setProduceQuota(dto.getProduceQuota());
|
|
||||||
//添加配额
|
|
||||||
if (quotaService.addTopicQuota(topicQuotaDO) > 0) {
|
|
||||||
return Result.buildFrom(ResultStatus.SUCCESS);
|
return Result.buildFrom(ResultStatus.SUCCESS);
|
||||||
}
|
}
|
||||||
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
|
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiOperation(value = "权限调整",notes = "权限调整")
|
|
||||||
@RequestMapping(value = "{topics/authority/add}",method = RequestMethod.POST)
|
|
||||||
@ResponseBody
|
|
||||||
public Result addAuthority(@RequestBody TopicAuthorityDTO dto) {
|
|
||||||
//非空校验
|
|
||||||
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
|
|
||||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
|
||||||
}
|
|
||||||
//查询该用户拥有的应用
|
|
||||||
List<AppDO> appDOs = appService.getByPrincipal(SpringTool.getUserName());
|
|
||||||
if (ValidateUtils.isEmptyList(appDOs)) {
|
|
||||||
//该用户无应用,需要先申请应用
|
|
||||||
return Result.buildFrom(ResultStatus.APP_NOT_EXIST);
|
|
||||||
}
|
|
||||||
List<Long> appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList());
|
|
||||||
if (!appIds.contains(dto.getAccess())) {
|
|
||||||
//入参中的appId,该用户未拥有
|
|
||||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
|
||||||
}
|
|
||||||
//获取物理集群id
|
|
||||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId());
|
|
||||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
|
||||||
//集群不存在
|
|
||||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
|
||||||
}
|
|
||||||
//获取集群信息
|
|
||||||
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
|
|
||||||
if (ValidateUtils.isNull(clusterDO)) {
|
|
||||||
//集群不存在
|
|
||||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
|
||||||
}
|
|
||||||
TopicDO topic = topicManagerService.getByTopicName(physicalClusterId, dto.getTopicName());
|
|
||||||
if (ValidateUtils.isNull(topic)) {
|
|
||||||
//topic不存在
|
|
||||||
return Result.buildFrom(ResultStatus.TOPIC_NOT_EXIST);
|
|
||||||
}
|
|
||||||
//构建authorityDo
|
|
||||||
AuthorityDO authorityDO = new AuthorityDO();
|
|
||||||
authorityDO.setClusterId(physicalClusterId);
|
|
||||||
authorityDO.setAppId(dto.getAppId());
|
|
||||||
authorityDO.setTopicName(dto.getTopicName());
|
|
||||||
authorityDO.setAccess(dto.getAccess());
|
|
||||||
if (authorityService.addAuthority(authorityDO) > 0) {
|
|
||||||
return Result.buildFrom(ResultStatus.SUCCESS);
|
|
||||||
}
|
|
||||||
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user