mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
配额调整
This commit is contained in:
@@ -1,30 +0,0 @@
|
||||
package com.xiaojukeji.kafka.manager.common.entity.dto;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
@ApiModel(description = "权限调整")
|
||||
public class TopicAuthorityDTO extends ClusterTopicDTO{
|
||||
|
||||
@ApiModelProperty(value = "appId")
|
||||
private String appId;
|
||||
|
||||
@ApiModelProperty(value = "0:无权限, 1:读, 2:写, 3:读写")
|
||||
private Integer access;
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public void setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public Integer getAccess() {
|
||||
return access;
|
||||
}
|
||||
|
||||
public void setAccess(Integer access) {
|
||||
this.access = access;
|
||||
}
|
||||
}
|
||||
@@ -1,101 +0,0 @@
|
||||
package com.xiaojukeji.kafka.manager.common.entity.dto.normal;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
@ApiModel(description = "创建topic")
|
||||
public class TopicAddDTO extends ClusterTopicDTO {
|
||||
|
||||
@ApiModelProperty(value = "AppID")
|
||||
private String appId;
|
||||
|
||||
@ApiModelProperty(value = "分区数")
|
||||
private Integer partitionNum;
|
||||
|
||||
@ApiModelProperty(value = "副本数")
|
||||
private Integer replicaNum;
|
||||
|
||||
@ApiModelProperty(value = "消息保存时间(ms)")
|
||||
private Long retentionTime;
|
||||
|
||||
@ApiModelProperty(value = "brokerId列表")
|
||||
private List<Integer> brokerIdList;
|
||||
|
||||
@ApiModelProperty(value = "RegionId")
|
||||
private Long regionId;
|
||||
|
||||
@ApiModelProperty(value = "备注")
|
||||
private String description;
|
||||
|
||||
@ApiModelProperty(value = "Topic属性列表")
|
||||
private Properties properties;
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public void setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public Integer getPartitionNum() {
|
||||
return partitionNum;
|
||||
}
|
||||
|
||||
public void setPartitionNum(Integer partitionNum) {
|
||||
this.partitionNum = partitionNum;
|
||||
}
|
||||
|
||||
public Integer getReplicaNum() {
|
||||
return replicaNum;
|
||||
}
|
||||
|
||||
public void setReplicaNum(Integer replicaNum) {
|
||||
this.replicaNum = replicaNum;
|
||||
}
|
||||
|
||||
public Long getRetentionTime() {
|
||||
return retentionTime;
|
||||
}
|
||||
|
||||
public void setRetentionTime(Long retentionTime) {
|
||||
this.retentionTime = retentionTime;
|
||||
}
|
||||
|
||||
public List<Integer> getBrokerIdList() {
|
||||
return brokerIdList;
|
||||
}
|
||||
|
||||
public void setBrokerIdList(List<Integer> brokerIdList) {
|
||||
this.brokerIdList = brokerIdList;
|
||||
}
|
||||
|
||||
public Long getRegionId() {
|
||||
return regionId;
|
||||
}
|
||||
|
||||
public void setRegionId(Long regionId) {
|
||||
this.regionId = regionId;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public void setDescription(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public Properties getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public void setProperties(Properties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,57 +0,0 @@
|
||||
package com.xiaojukeji.kafka.manager.common.entity.dto.normal;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ApiModel(description = "扩分区")
|
||||
public class TopicExpandDTO extends ClusterTopicDTO {
|
||||
|
||||
@ApiModelProperty(value = "regionId")
|
||||
private Long regionId;
|
||||
|
||||
@ApiModelProperty(value = "brokerId列表")
|
||||
private List<Integer> brokerIds;
|
||||
|
||||
@ApiModelProperty(value = "新增分区数")
|
||||
private Integer partitionNum;
|
||||
|
||||
public Long getRegionId() {
|
||||
return regionId;
|
||||
}
|
||||
|
||||
public void setRegionId(Long regionId) {
|
||||
this.regionId = regionId;
|
||||
}
|
||||
|
||||
public List<Integer> getBrokerIds() {
|
||||
return brokerIds;
|
||||
}
|
||||
|
||||
public void setBrokerIds(List<Integer> brokerIds) {
|
||||
this.brokerIds = brokerIds;
|
||||
}
|
||||
|
||||
public Integer getPartitionNum() {
|
||||
return partitionNum;
|
||||
}
|
||||
|
||||
public void setPartitionNum(Integer partitionNum) {
|
||||
this.partitionNum = partitionNum;
|
||||
}
|
||||
|
||||
public boolean paramLegal() {
|
||||
if (ValidateUtils.isNull(clusterId)
|
||||
|| ValidateUtils.isNull(topicName)
|
||||
|| ValidateUtils.isNull(partitionNum) || partitionNum <= 0) {
|
||||
return false;
|
||||
}
|
||||
if (ValidateUtils.isEmptyList(brokerIds) && ValidateUtils.isNull(regionId)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -4,11 +4,8 @@ import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.*;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.TopicAuthorityDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO;
|
||||
@@ -110,28 +107,9 @@ public interface TopicService {
|
||||
|
||||
Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime);
|
||||
|
||||
/**
|
||||
* 创建topic
|
||||
*/
|
||||
Result addTopic(TopicAddDTO dto);
|
||||
|
||||
/**
|
||||
* 删除topic
|
||||
*/
|
||||
Result deleteTopic(Long clusterId, String topicName);
|
||||
|
||||
/**
|
||||
* 配额调整
|
||||
*/
|
||||
Result addTopicQuota(TopicQuotaDTO dto);
|
||||
|
||||
/**
|
||||
* 扩分区
|
||||
*/
|
||||
Result expandTopic(TopicExpandDTO dto);
|
||||
|
||||
/**
|
||||
* 权限调整
|
||||
*/
|
||||
Result addAuthorityAdd(TopicAuthorityDTO dto);
|
||||
}
|
||||
|
||||
@@ -1,13 +1,9 @@
|
||||
package com.xiaojukeji.kafka.manager.service.service.impl;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.TopicAuthorityDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
|
||||
@@ -19,8 +15,6 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
|
||||
@@ -843,59 +837,6 @@ public class TopicServiceImpl implements TopicService {
|
||||
return new Result<>(TopicOffsetChangedEnum.UNKNOWN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result addTopic(TopicAddDTO dto) {
|
||||
//获取物理集群id
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId());
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
//获取集群信息
|
||||
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
//判断topic是否存在
|
||||
TopicDO topic = topicManagerService.getByTopicName(physicalClusterId, dto.getTopicName());
|
||||
if (!ValidateUtils.isNull(topic)) {
|
||||
return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST);
|
||||
}
|
||||
//构建topicDo
|
||||
TopicDO topicDO = new TopicDO();
|
||||
topicDO.setAppId(dto.getAppId());
|
||||
topicDO.setClusterId(dto.getClusterId());
|
||||
topicDO.setTopicName(dto.getTopicName());
|
||||
topicDO.setDescription(dto.getDescription());
|
||||
//构建properties
|
||||
Properties properties = dto.getProperties();
|
||||
if (ValidateUtils.isNull(properties)) {
|
||||
properties = new Properties();
|
||||
}
|
||||
properties.put(KafkaConstant.RETENTION_MS_KEY, String.valueOf(dto.getRetentionTime()));
|
||||
//创建topic
|
||||
ResultStatus rs = adminService.createTopic(clusterDO, topicDO, dto.getPartitionNum(),
|
||||
dto.getReplicaNum(), dto.getRegionId(), dto.getBrokerIdList(), properties, SpringTool.getUserName(),
|
||||
SpringTool.getUserName());
|
||||
return Result.buildFrom(rs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result deleteTopic(Long clusterId, String topicName) {
|
||||
//获得物理集群id
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId);
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
//获取集群信息
|
||||
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
//删除topic
|
||||
ResultStatus rs = adminService.deleteTopic(clusterDO, topicName, SpringTool.getUserName());
|
||||
return Result.buildFrom(rs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result addTopicQuota(TopicQuotaDTO dto) {
|
||||
//获取物理集群id
|
||||
@@ -918,65 +859,6 @@ public class TopicServiceImpl implements TopicService {
|
||||
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result expandTopic(TopicExpandDTO dto) {
|
||||
// 校验非空
|
||||
if (!dto.paramLegal()) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
//获取物理集群id
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId());
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
//获取集群信息
|
||||
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
//扩分区
|
||||
ResultStatus resultStatus = adminService.expandPartitions(clusterDO, dto.getTopicName(), dto.getPartitionNum(),
|
||||
dto.getRegionId(), dto.getBrokerIds(), SpringTool.getUserName());
|
||||
return Result.buildFrom(resultStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result addAuthorityAdd(TopicAuthorityDTO dto) {
|
||||
//查询该用户拥有的应用
|
||||
List<AppDO> appDOs = appService.getByPrincipal(SpringTool.getUserName());
|
||||
if (ValidateUtils.isEmptyList(appDOs)) {
|
||||
//该用户无应用,需要先申请应用
|
||||
return Result.buildFrom(ResultStatus.APP_NOT_EXIST);
|
||||
}
|
||||
List<Long> appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList());
|
||||
if (!appIds.contains(dto.getAccess())) {
|
||||
//入参中的appId,该用户未拥有
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
//获取物理集群id
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId());
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
//集群不存在
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
//获取集群信息
|
||||
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
//集群不存在
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
//构建authorityDo
|
||||
AuthorityDO authorityDO = new AuthorityDO();
|
||||
authorityDO.setClusterId(physicalClusterId);
|
||||
authorityDO.setAppId(dto.getAppId());
|
||||
authorityDO.setTopicName(dto.getTopicName());
|
||||
authorityDO.setAccess(dto.getAccess());
|
||||
if (authorityDao.insert(authorityDO) > 0) {
|
||||
return Result.buildFrom(ResultStatus.SUCCESS);
|
||||
}
|
||||
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
|
||||
}
|
||||
|
||||
private Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(ClusterDO clusterDO,
|
||||
String topicName,
|
||||
Map<TopicPartition, Long> endOffsetMap) {
|
||||
|
||||
@@ -6,10 +6,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicPartitionDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.TopicAuthorityDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
|
||||
@@ -364,37 +361,6 @@ public class NormalTopicController {
|
||||
return new Result<>(new TopicStatisticMetricsVO(maxAvgBytesIn));
|
||||
}
|
||||
|
||||
@ApiOperation(value = "创建topic",notes = "创建topic")
|
||||
@RequestMapping(value = {"/topics/add"},method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
public Result addTopic(@RequestBody TopicAddDTO dto) {
|
||||
if (ValidateUtils.isNull(dto)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
return topicService.addTopic(dto);
|
||||
}
|
||||
|
||||
@ApiOperation(value = "扩分区",notes = "扩分区")
|
||||
@RequestMapping(value = "{/topics/expand}",method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
public Result expandTopic(@RequestBody TopicExpandDTO dto) {
|
||||
if (ValidateUtils.isNull(dto)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
return topicService.expandTopic(dto);
|
||||
}
|
||||
|
||||
@ApiOperation(value = "删除topic",notes = "删除topic")
|
||||
@RequestMapping(value = {"{clusterId}/topics/{topicName}/delete"},method = RequestMethod.DELETE)
|
||||
@ResponseBody
|
||||
public Result deleteTopic(@PathVariable Long clusterId,
|
||||
@PathVariable String topicName) {
|
||||
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isNull(topicName)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
return topicService.deleteTopic(clusterId,topicName);
|
||||
}
|
||||
|
||||
@ApiOperation(value = "配额调整",notes = "配额调整")
|
||||
@RequestMapping(value = "{topics/quota/add}",method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
@@ -405,13 +371,4 @@ public class NormalTopicController {
|
||||
return topicService.addTopicQuota(dto);
|
||||
}
|
||||
|
||||
@ApiOperation(value = "权限调整",notes = "权限调整")
|
||||
@RequestMapping(value = "{topics/authority/add}",method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
public Result addAuthorityAdd(@RequestBody TopicAuthorityDTO dto) {
|
||||
if (ValidateUtils.isNull(dto)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
return topicService.addAuthorityAdd(dto);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user