mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
@@ -1,5 +1,7 @@
|
||||
package com.xiaojukeji.kafka.manager.common.entity.ao.gateway;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
|
||||
|
||||
/**
|
||||
* @author zhongyuankai
|
||||
* @date 2020/4/27
|
||||
@@ -65,4 +67,15 @@ public class TopicQuota {
|
||||
", consumeQuota=" + consumeQuota +
|
||||
'}';
|
||||
}
|
||||
|
||||
public static TopicQuota buildFrom(TopicQuotaDTO dto) {
|
||||
TopicQuota topicQuota = new TopicQuota();
|
||||
topicQuota.setAppId(dto.getAppId());
|
||||
topicQuota.setClusterId(dto.getClusterId());
|
||||
topicQuota.setTopicName(dto.getTopicName());
|
||||
topicQuota.setProduceQuota(dto.getProduceQuota());
|
||||
topicQuota.setConsumeQuota(dto.getConsumeQuota());
|
||||
return topicQuota;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
package com.xiaojukeji.kafka.manager.common.entity.dto.gateway;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
@ApiModel(description = "配额调整")
|
||||
public class TopicQuotaDTO extends ClusterTopicDTO {
|
||||
|
||||
@ApiModelProperty(value = "appId")
|
||||
private String appId;
|
||||
|
||||
@ApiModelProperty(value = "发送数据速率B/s")
|
||||
private Long produceQuota;
|
||||
|
||||
@ApiModelProperty(value = "消费数据速率B/s")
|
||||
private Long consumeQuota;
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public void setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public Long getProduceQuota() {
|
||||
return produceQuota;
|
||||
}
|
||||
|
||||
public void setProduceQuota(Long produceQuota) {
|
||||
this.produceQuota = produceQuota;
|
||||
}
|
||||
|
||||
public Long getConsumeQuota() {
|
||||
return consumeQuota;
|
||||
}
|
||||
|
||||
public void setConsumeQuota(Long consumeQuota) {
|
||||
this.consumeQuota = consumeQuota;
|
||||
}
|
||||
|
||||
public boolean paramLegal() {
|
||||
if (ValidateUtils.isNullOrLessThanZero(clusterId)
|
||||
|| ValidateUtils.isBlank(topicName)
|
||||
|| ValidateUtils.isBlank(appId)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
@@ -122,5 +123,12 @@ public interface TopicManagerService {
|
||||
List<TopicStatisticsDO> getTopicStatistic(Long clusterId, String topicName, Date startTime, Date endTime);
|
||||
|
||||
TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName);
|
||||
|
||||
/**
|
||||
* topic权限调整
|
||||
* @param authorityDO topic权限
|
||||
* @return
|
||||
*/
|
||||
ResultStatus addAuthority(AuthorityDO authorityDO);
|
||||
}
|
||||
|
||||
|
||||
@@ -105,4 +105,5 @@ public interface TopicService {
|
||||
List<TopicBrokerDTO> getTopicBrokerList(Long clusterId, String topicName);
|
||||
|
||||
Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime);
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.service.service.gateway;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
||||
|
||||
/**
|
||||
@@ -34,4 +35,11 @@ public interface QuotaService {
|
||||
TopicQuota getQuotaFromZk(Long clusterId, String topicName, String appId);
|
||||
|
||||
Boolean modifyProduceQuota(Long clusterId, String topicName, String appId, Long produceQuota);
|
||||
|
||||
/**
|
||||
* topic配额调整
|
||||
* @param topicQuota topic配额
|
||||
* @return
|
||||
*/
|
||||
ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota);
|
||||
}
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
|
||||
import com.xiaojukeji.kafka.manager.service.strategy.AbstractAllocateQuotaStrategy;
|
||||
@@ -28,6 +33,12 @@ public class QuotaServiceImpl implements QuotaService {
|
||||
@Autowired
|
||||
private AbstractAllocateQuotaStrategy allocateQuotaStrategy;
|
||||
|
||||
@Autowired
|
||||
private LogicalClusterMetadataManager logicalClusterMetadataManager;
|
||||
|
||||
@Autowired
|
||||
private AuthorityService authorityService;
|
||||
|
||||
@Override
|
||||
public int addTopicQuota(TopicQuota topicQuotaDO) {
|
||||
return KafkaZookeeperUtils.setTopicQuota(
|
||||
@@ -78,4 +89,34 @@ public class QuotaServiceImpl implements QuotaService {
|
||||
}
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota) {
|
||||
// 获取物理集群id
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId());
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
return ResultStatus.CLUSTER_NOT_EXIST;
|
||||
}
|
||||
// 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理)
|
||||
AuthorityDO authority = authorityService.getAuthority(physicalClusterId,
|
||||
topicQuota.getTopicName(), topicQuota.getAppId());
|
||||
if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) {
|
||||
return ResultStatus.USER_WITHOUT_AUTHORITY;
|
||||
}
|
||||
if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) {
|
||||
// 可以消费
|
||||
topicQuota.setProduceQuota(null);
|
||||
}
|
||||
if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) {
|
||||
// 可以生产
|
||||
topicQuota.setConsumeQuota(null);
|
||||
}
|
||||
// 设置物理集群id
|
||||
topicQuota.setClusterId(physicalClusterId);
|
||||
// 添加配额
|
||||
if (addTopicQuota(topicQuota) > 0) {
|
||||
return ResultStatus.SUCCESS;
|
||||
}
|
||||
return ResultStatus.ZOOKEEPER_WRITE_FAILED;
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData;
|
||||
@@ -618,6 +619,38 @@ public class TopicManagerServiceImpl implements TopicManagerService {
|
||||
return topicBusinessInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultStatus addAuthority(AuthorityDO authorityDO) {
|
||||
// 查询该用户拥有的应用
|
||||
List<AppDO> appDOs = appService.getByPrincipal(SpringTool.getUserName());
|
||||
if (ValidateUtils.isEmptyList(appDOs)) {
|
||||
// 该用户无应用,需要先申请应用
|
||||
return ResultStatus.APP_NOT_EXIST;
|
||||
}
|
||||
List<Long> appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList());
|
||||
if (!appIds.contains(authorityDO.getAppId())) {
|
||||
// 入参中的appId,该用户未拥有
|
||||
return ResultStatus.APP_NOT_EXIST;
|
||||
}
|
||||
// 获取物理集群id
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(authorityDO.getClusterId());
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
// 集群不存在
|
||||
return ResultStatus.CLUSTER_NOT_EXIST;
|
||||
}
|
||||
TopicDO topic = getByTopicName(physicalClusterId, authorityDO.getTopicName());
|
||||
if (ValidateUtils.isNull(topic)) {
|
||||
// topic不存在
|
||||
return ResultStatus.TOPIC_NOT_EXIST;
|
||||
}
|
||||
// 设置物理集群id
|
||||
authorityDO.setClusterId(physicalClusterId);
|
||||
if (authorityService.addAuthority(authorityDO) > 0) {
|
||||
return ResultStatus.SUCCESS;
|
||||
}
|
||||
return ResultStatus.MYSQL_ERROR;
|
||||
}
|
||||
|
||||
private RdTopicBasic convert2RdTopicBasic(ClusterDO clusterDO,
|
||||
String topicName,
|
||||
TopicDO topicDO,
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.dto;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
@ApiModel(description = "权限调整")
|
||||
public class TopicAuthorityDTO extends ClusterTopicDTO {
|
||||
|
||||
@ApiModelProperty(value = "appId")
|
||||
private String appId;
|
||||
|
||||
@ApiModelProperty(value = "0:无权限, 1:读, 2:写, 3:读写, 4:可管理")
|
||||
private Integer access;
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public void setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public Integer getAccess() {
|
||||
return access;
|
||||
}
|
||||
|
||||
public void setAccess(Integer access) {
|
||||
this.access = access;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean paramLegal() {
|
||||
if (ValidateUtils.isNullOrLessThanZero(clusterId)
|
||||
|| ValidateUtils.isBlank(topicName)
|
||||
|| ValidateUtils.isBlank(appId)
|
||||
|| ValidateUtils.isNullOrLessThanZero(access)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -337,7 +337,7 @@ public class NormalTopicController {
|
||||
}
|
||||
|
||||
return new Result<>(TopicModelConverter.convert2TopicMineAppVOList(
|
||||
topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName()))
|
||||
topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName()))
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO;
|
||||
@@ -12,12 +14,15 @@ import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGro
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorizedAppVO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||
import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO;
|
||||
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.*;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
||||
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
|
||||
import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter;
|
||||
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
|
||||
import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter;
|
||||
import com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter;
|
||||
@@ -52,6 +57,9 @@ public class ThirdPartTopicController {
|
||||
@Autowired
|
||||
private TopicManagerService topicManagerService;
|
||||
|
||||
@Autowired
|
||||
private QuotaService quotaService;
|
||||
|
||||
@ApiOperation(value = "Topic元信息", notes = "LogX调用")
|
||||
@RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
@@ -133,4 +141,26 @@ public class ThirdPartTopicController {
|
||||
topicManagerService.getTopicAuthorizedApps(physicalClusterId, topicName))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ApiOperation(value = "配额调整",notes = "配额调整")
|
||||
@RequestMapping(value = "{topics/quota}",method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) {
|
||||
// 非空校验
|
||||
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
return Result.buildFrom(quotaService.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto)));
|
||||
}
|
||||
|
||||
@ApiOperation(value = "权限调整",notes = "权限调整")
|
||||
@RequestMapping(value = "{topics/authority}",method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
public Result addAuthority(@RequestBody TopicAuthorityDTO dto) {
|
||||
//非空校验
|
||||
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
return Result.buildFrom(topicManagerService.addAuthority(AuthorityConverter.convert2AuthorityDO(dto)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.xiaojukeji.kafka.manager.web.converters;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
|
||||
import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO;
|
||||
|
||||
public class AuthorityConverter {
|
||||
public static AuthorityDO convert2AuthorityDO(TopicAuthorityDTO dto) {
|
||||
AuthorityDO authorityDO = new AuthorityDO();
|
||||
authorityDO.setAppId(dto.getAppId());
|
||||
authorityDO.setClusterId(dto.getClusterId());
|
||||
authorityDO.setTopicName(dto.getTopicName());
|
||||
authorityDO.setAccess(dto.getAccess());
|
||||
return authorityDO;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user