From 473fc27b49b3ddb3ac732c276621b0ba82312f8c Mon Sep 17 00:00:00 2001 From: "tangcongfa_v@didichuxing.com" Date: Wed, 28 Apr 2021 18:05:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9B=E5=BB=BAtopic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/entity/dto/normal/TopicAddDTO.java | 76 ++++++++++++++++--- .../entity/dto/normal/TopicQuotaDTO.java | 42 ++++++++++ .../manager/service/service/TopicService.java | 6 ++ .../service/impl/TopicServiceImpl.java | 75 +++++++++++------- .../normal/NormalTopicController.java | 11 +++ 5 files changed, 175 insertions(+), 35 deletions(-) create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java index 0db12952..4783f106 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java @@ -1,22 +1,39 @@ package com.xiaojukeji.kafka.manager.common.entity.dto.normal; - import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import java.util.List; +import java.util.Properties; + @ApiModel(description = "创建topic") public class TopicAddDTO extends ClusterTopicDTO { - @ApiModelProperty(value = "appId") + @ApiModelProperty(value = "AppID") private String appId; - @ApiModelProperty(value = "峰值流量") - private Long peakBytesIn; + @ApiModelProperty(value = "分区数") + private Integer partitionNum; - @ApiModelProperty(value = "备注信息") + @ApiModelProperty(value = "副本数") + private Integer replicaNum; + + @ApiModelProperty(value = "消息保存时间(ms)") + private Long retentionTime; + + @ApiModelProperty(value = "brokerId列表") + private List brokerIdList; + + @ApiModelProperty(value = "RegionId") + private Long regionId; + + @ApiModelProperty(value = "备注") private String description; + @ApiModelProperty(value = "Topic属性列表") + private Properties properties; + public String getAppId() { return appId; } @@ -25,12 +42,44 @@ public class TopicAddDTO extends ClusterTopicDTO { this.appId = appId; } - public Long getPeakBytesIn() { - return peakBytesIn; + public Integer getPartitionNum() { + return partitionNum; } - public void setPeakBytesIn(Long peakBytesIn) { - this.peakBytesIn = peakBytesIn; + public void setPartitionNum(Integer partitionNum) { + this.partitionNum = partitionNum; + } + + public Integer getReplicaNum() { + return replicaNum; + } + + public void setReplicaNum(Integer replicaNum) { + this.replicaNum = replicaNum; + } + + public Long getRetentionTime() { + return retentionTime; + } + + public void setRetentionTime(Long retentionTime) { + this.retentionTime = retentionTime; + } + + public List getBrokerIdList() { + return brokerIdList; + } + + public void setBrokerIdList(List brokerIdList) { + this.brokerIdList = brokerIdList; + } + + public Long getRegionId() { + return regionId; + } + + public void setRegionId(Long regionId) { + this.regionId = regionId; } public String getDescription() { @@ -40,4 +89,13 @@ public class TopicAddDTO extends ClusterTopicDTO { public void setDescription(String description) { this.description = description; } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java new file mode 100644 index 00000000..aeb86814 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicQuotaDTO.java @@ -0,0 +1,42 @@ +package com.xiaojukeji.kafka.manager.common.entity.dto.normal; + +import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel(description = "配额调整") +public class TopicQuotaDTO extends ClusterTopicDTO { + + @ApiModelProperty(value = "appId") + private String appId; + + @ApiModelProperty(value = "发送数据速率") + private Long produceQuota; + + @ApiModelProperty(value = "消费数据速率") + private Long consumeQuota; + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public Long getProduceQuota() { + return produceQuota; + } + + public void setProduceQuota(Long produceQuota) { + this.produceQuota = produceQuota; + } + + public Long getConsumeQuota() { + return consumeQuota; + } + + public void setConsumeQuota(Long consumeQuota) { + this.consumeQuota = consumeQuota; + } +} diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index f15332ff..0a9b14bf 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.*; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBrokerDTO; @@ -116,4 +117,9 @@ public interface TopicService { * 删除topic */ Result deleteTopic(Long clusterId, String topicName); + + /** + * 配额调整 + */ + Result addTopicQuota(TopicQuotaDTO dto); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 0fffc4d8..b02a2495 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -1,12 +1,12 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; -import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant; -import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; +import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; -import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicElemConfig; +import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; import com.xiaojukeji.kafka.manager.common.constant.Constant; @@ -34,6 +34,7 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; +import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy; import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils; import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils; @@ -93,10 +94,10 @@ public class TopicServiceImpl implements TopicService { private AbstractHealthScoreStrategy healthScoreStrategy; @Autowired - private ConfigService configService; + private AdminService adminService; @Autowired - private AdminService adminService; + private QuotaService quotaService; @Override public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { @@ -837,58 +838,80 @@ public class TopicServiceImpl implements TopicService { @Override public Result addTopic(TopicAddDTO dto) { + //获取物理集群id Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); if (ValidateUtils.isNull(physicalClusterId)) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); } - TopicDO topic = topicManagerService.getByTopicName(physicalClusterId, dto.getTopicName()); - if (!ValidateUtils.isNull(topic)) { - return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST); - } - CreateTopicElemConfig createTopicConfig = - configService.getCreateTopicConfig(physicalClusterId, SystemCodeConstant.KAFKA_MANAGER); - if (dto.getPeakBytesIn() > createTopicConfig.getAutoExecMaxPeakBytesInUnitB()) { - return Result.buildFailure("流量峰值设置过大,不超过30M"); - } + //获取集群信息 ClusterDO clusterDO = clusterService.getById(physicalClusterId); if (ValidateUtils.isNull(clusterDO)) { return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); } - List brokerList = regionService - .getIdleRegionBrokerList(clusterDO.getId(), createTopicConfig.getRegionIdList()); - if (ValidateUtils.isEmptyList(brokerList)) { - return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST); + //判断topic是否存在 + TopicDO topic = topicManagerService.getByTopicName(physicalClusterId, dto.getTopicName()); + if (!ValidateUtils.isNull(topic)) { + return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST); } + //构建topicDo TopicDO topicDO = new TopicDO(); topicDO.setAppId(dto.getAppId()); - topicDO.setClusterId(clusterDO.getId()); + topicDO.setClusterId(dto.getClusterId()); topicDO.setTopicName(dto.getTopicName()); topicDO.setDescription(dto.getDescription()); - topicDO.setPeakBytesIn(dto.getPeakBytesIn()); - Long partitionNum = Math.max(1, dto.getPeakBytesIn() / (3 * 1024 * 1024)); - Properties properties = TopicCreationConstant - .createNewProperties(createTopicConfig.getRetentionTimeUnitHour() * 60 * 60 * 1000L); - ResultStatus rs = adminService.createTopic(clusterDO, topicDO, partitionNum.intValue(), - createTopicConfig.getReplicaNum(), null, brokerList, properties, null, + //构建properties + Properties properties = dto.getProperties(); + if (ValidateUtils.isNull(properties)) { + properties = new Properties(); + } + properties.put(KafkaConstant.RETENTION_MS_KEY, String.valueOf(dto.getRetentionTime())); + //创建topic + ResultStatus rs = adminService.createTopic(clusterDO, topicDO, dto.getPartitionNum(), + dto.getReplicaNum(), dto.getRegionId(), dto.getBrokerIdList(), properties, SpringTool.getUserName(), SpringTool.getUserName()); return Result.buildFrom(rs); } @Override public Result deleteTopic(Long clusterId, String topicName) { + //获得物理集群id Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId); if (ValidateUtils.isNull(physicalClusterId)) { return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); } + //获取集群信息 ClusterDO clusterDO = clusterService.getById(physicalClusterId); if (ValidateUtils.isNull(clusterDO)) { return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); } + //删除topic ResultStatus rs = adminService.deleteTopic(clusterDO, topicName, SpringTool.getUserName()); return Result.buildFrom(rs); } - private Result checkTopicOffsetChanged(ClusterDO clusterDO, + @Override + public Result addTopicQuota(TopicQuotaDTO dto) { + //获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + //构建配额 + TopicQuota topicQuota = new TopicQuota(); + topicQuota.setClusterId(physicalClusterId); + topicQuota.setAppId(dto.getAppId()); + topicQuota.setTopicName(dto.getTopicName()); + topicQuota.setProduceQuota(dto.getProduceQuota()); + topicQuota.setConsumeQuota(dto.getConsumeQuota()); + //配额调整 + int result = quotaService.addTopicQuota(topicQuota); + if (result > 0) { + return Result.buildFrom(ResultStatus.SUCCESS); + } + return Result.buildFrom(ResultStatus.FAIL); + } + + private Result checkTopicOffsetChanged(ClusterDO clusterDO, String topicName, Map endOffsetMap) { if (ValidateUtils.isNull(clusterDO) diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index 3ce2b163..75d97e68 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -8,6 +8,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicPartitionDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.TopicBusinessInfoVO; @@ -360,4 +361,14 @@ public class NormalTopicController { } return topicService.deleteTopic(clusterId,topicName); } + + @ApiOperation(value = "配额调整",notes = "配额调整") + @RequestMapping(value = "{topics/quota/add}",method = RequestMethod.POST) + @ResponseBody + public Result addTopicQuota(@RequestBody TopicQuotaDTO dto) { + if (ValidateUtils.isNull(dto)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + return topicService.addTopicQuota(dto); + } } \ No newline at end of file