调整配额与权限

This commit is contained in:
tangcongfa_v@didichuxing.com
2021-05-18 18:25:03 +08:00
parent 38bdc173e8
commit 611f8b8865
7 changed files with 65 additions and 55 deletions

View File

@@ -11,10 +11,10 @@ public class TopicQuotaDTO extends ClusterTopicDTO {
@ApiModelProperty(value = "appId") @ApiModelProperty(value = "appId")
private String appId; private String appId;
@ApiModelProperty(value = "发送数据速率") @ApiModelProperty(value = "发送数据速率B/s")
private Long produceQuota; private Long produceQuota;
@ApiModelProperty(value = "消费数据速率") @ApiModelProperty(value = "消费数据速率B/s")
private Long consumeQuota; private Long consumeQuota;
public String getAppId() { public String getAppId() {
@@ -42,9 +42,9 @@ public class TopicQuotaDTO extends ClusterTopicDTO {
} }
public boolean paramLegal() { public boolean paramLegal() {
if (ValidateUtils.isNull(clusterId) if (ValidateUtils.isNullOrLessThanZero(clusterId)
|| ValidateUtils.isNull(topicName) || ValidateUtils.isBlank(topicName)
|| ValidateUtils.isNull(appId)) { || ValidateUtils.isBlank(appId)) {
return false; return false;
} }
return true; return true;

View File

@@ -3,7 +3,6 @@ package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic; import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicDTO;
@@ -125,13 +124,6 @@ public interface TopicManagerService {
TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName); TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName);
/**
* topic配额调整
* @param topicQuota topic配额
* @return
*/
ResultStatus addTopicQuota(TopicQuota topicQuota);
/** /**
* topic权限调整 * topic权限调整
* @param authorityDO topic权限 * @param authorityDO topic权限

View File

@@ -1,5 +1,6 @@
package com.xiaojukeji.kafka.manager.service.service.gateway; package com.xiaojukeji.kafka.manager.service.service.gateway;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
/** /**
@@ -34,4 +35,11 @@ public interface QuotaService {
TopicQuota getQuotaFromZk(Long clusterId, String topicName, String appId); TopicQuota getQuotaFromZk(Long clusterId, String topicName, String appId);
Boolean modifyProduceQuota(Long clusterId, String topicName, String appId, Long produceQuota); Boolean modifyProduceQuota(Long clusterId, String topicName, String appId, Long produceQuota);
/**
* topic配额调整
* @param topicQuota topic配额
* @return
*/
ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota);
} }

View File

@@ -1,11 +1,16 @@
package com.xiaojukeji.kafka.manager.service.service.gateway.impl; package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.service.strategy.AbstractAllocateQuotaStrategy; import com.xiaojukeji.kafka.manager.service.strategy.AbstractAllocateQuotaStrategy;
@@ -28,6 +33,12 @@ public class QuotaServiceImpl implements QuotaService {
@Autowired @Autowired
private AbstractAllocateQuotaStrategy allocateQuotaStrategy; private AbstractAllocateQuotaStrategy allocateQuotaStrategy;
@Autowired
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@Autowired
private AuthorityService authorityService;
@Override @Override
public int addTopicQuota(TopicQuota topicQuotaDO) { public int addTopicQuota(TopicQuota topicQuotaDO) {
return KafkaZookeeperUtils.setTopicQuota( return KafkaZookeeperUtils.setTopicQuota(
@@ -78,4 +89,34 @@ public class QuotaServiceImpl implements QuotaService {
} }
return Boolean.TRUE; return Boolean.TRUE;
} }
@Override
public ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota) {
// 获取物理集群id
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId());
if (ValidateUtils.isNull(physicalClusterId)) {
return ResultStatus.CLUSTER_NOT_EXIST;
}
// 权限判断(access 0:无权限, 1:读, 2:写, 3:读写4:可管理)
AuthorityDO authority = authorityService.getAuthority(physicalClusterId,
topicQuota.getTopicName(), topicQuota.getAppId());
if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) {
return ResultStatus.USER_WITHOUT_AUTHORITY;
}
if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) {
// 可以消费
topicQuota.setProduceQuota(null);
}
if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) {
// 可以生产
topicQuota.setConsumeQuota(null);
}
// 设置物理集群id
topicQuota.setClusterId(physicalClusterId);
// 添加配额
if (addTopicQuota(topicQuota) > 0) {
return ResultStatus.SUCCESS;
}
return ResultStatus.ZOOKEEPER_WRITE_FAILED;
}
} }

View File

@@ -10,7 +10,6 @@ import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic; import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo;
@@ -36,7 +35,6 @@ import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager
import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils; import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -89,9 +87,6 @@ public class TopicManagerServiceImpl implements TopicManagerService {
@Autowired @Autowired
private OperateRecordService operateRecordService; private OperateRecordService operateRecordService;
@Autowired
private QuotaService quotaService;
@Override @Override
public List<TopicDO> listAll() { public List<TopicDO> listAll() {
try { try {
@@ -624,36 +619,6 @@ public class TopicManagerServiceImpl implements TopicManagerService {
return topicBusinessInfo; return topicBusinessInfo;
} }
@Override
public ResultStatus addTopicQuota(TopicQuota topicQuota) {
// 获取物理集群id
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId());
if (ValidateUtils.isNull(physicalClusterId)) {
return ResultStatus.CLUSTER_NOT_EXIST;
}
// 权限判断(access 0:无权限, 1:读, 2:写, 3:读写4:可管理)
AuthorityDO authority = authorityService.getAuthority(physicalClusterId,
topicQuota.getTopicName(), topicQuota.getAppId());
if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) {
return ResultStatus.USER_WITHOUT_AUTHORITY;
}
if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) {
// 可以消费
topicQuota.setProduceQuota(null);
}
if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) {
// 可以生产
topicQuota.setConsumeQuota(null);
}
// 设置物理集群id
topicQuota.setClusterId(physicalClusterId);
// 添加配额
if (quotaService.addTopicQuota(topicQuota) > 0) {
return ResultStatus.SUCCESS;
}
return ResultStatus.MYSQL_ERROR;
}
@Override @Override
public ResultStatus addAuthority(AuthorityDO authorityDO) { public ResultStatus addAuthority(AuthorityDO authorityDO) {
// 查询该用户拥有的应用 // 查询该用户拥有的应用

View File

@@ -32,10 +32,10 @@ public class TopicAuthorityDTO extends ClusterTopicDTO {
@Override @Override
public boolean paramLegal() { public boolean paramLegal() {
if (ValidateUtils.isNull(clusterId) if (ValidateUtils.isNullOrLessThanZero(clusterId)
|| ValidateUtils.isNull(topicName) || ValidateUtils.isBlank(topicName)
|| ValidateUtils.isNull(appId) || ValidateUtils.isBlank(appId)
|| ValidateUtils.isNull(access)) { || ValidateUtils.isNullOrLessThanZero(access)) {
return false; return false;
} }
return true; return true;

View File

@@ -21,6 +21,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter; import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter;
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter; import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter; import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter;
@@ -56,6 +57,9 @@ public class ThirdPartTopicController {
@Autowired @Autowired
private TopicManagerService topicManagerService; private TopicManagerService topicManagerService;
@Autowired
private QuotaService quotaService;
@ApiOperation(value = "Topic元信息", notes = "LogX调用") @ApiOperation(value = "Topic元信息", notes = "LogX调用")
@RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET) @RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET)
@ResponseBody @ResponseBody
@@ -139,18 +143,18 @@ public class ThirdPartTopicController {
} }
@ApiOperation(value = "配额调整",notes = "配额调整") @ApiOperation(value = "配额调整",notes = "配额调整")
@RequestMapping(value = "{topics/quota/add}",method = RequestMethod.POST) @RequestMapping(value = "{topics/quota}",method = RequestMethod.POST)
@ResponseBody @ResponseBody
public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) { public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) {
// 非空校验 // 非空校验
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
} }
return Result.buildFrom(topicManagerService.addTopicQuota(TopicQuota.buildFrom(dto))); return Result.buildFrom(quotaService.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto)));
} }
@ApiOperation(value = "权限调整",notes = "权限调整") @ApiOperation(value = "权限调整",notes = "权限调整")
@RequestMapping(value = "{topics/authority/add}",method = RequestMethod.POST) @RequestMapping(value = "{topics/authority}",method = RequestMethod.POST)
@ResponseBody @ResponseBody
public Result addAuthority(@RequestBody TopicAuthorityDTO dto) { public Result addAuthority(@RequestBody TopicAuthorityDTO dto) {
//非空校验 //非空校验