From f0b3b9f7f421e28ae453ceb5e920cc7dbf3d69f2 Mon Sep 17 00:00:00 2001 From: "tangcongfa_v@didichuxing.com" Date: Sat, 8 May 2021 11:23:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A9=E5=88=86=E5=8C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entity/dto/normal/TopicExpandDTO.java | 57 +++++++++++++++++++ .../manager/service/service/TopicService.java | 6 ++ .../service/impl/TopicServiceImpl.java | 25 +++++++- .../normal/NormalTopicController.java | 12 ++++ 4 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicExpandDTO.java diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicExpandDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicExpandDTO.java new file mode 100644 index 00000000..9dc6d5a1 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicExpandDTO.java @@ -0,0 +1,57 @@ +package com.xiaojukeji.kafka.manager.common.entity.dto.normal; + +import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +import java.util.List; + +@ApiModel(description = "扩分区") +public class TopicExpandDTO extends ClusterTopicDTO { + + @ApiModelProperty(value = "regionId") + private Long regionId; + + @ApiModelProperty(value = "brokerId列表") + private List brokerIds; + + @ApiModelProperty(value = "新增分区数") + private Integer partitionNum; + + public Long getRegionId() { + return regionId; + } + + public void setRegionId(Long regionId) { + this.regionId = regionId; + } + + public List getBrokerIds() { + return brokerIds; + } + + public void setBrokerIds(List brokerIds) { + this.brokerIds = brokerIds; + } + + public Integer getPartitionNum() { + return partitionNum; + } + + public void setPartitionNum(Integer partitionNum) { + this.partitionNum = partitionNum; + } + + public boolean paramLegal() { + if (ValidateUtils.isNull(clusterId) + || ValidateUtils.isNull(topicName) + || ValidateUtils.isNull(partitionNum) || partitionNum <= 0) { + return false; + } + if (ValidateUtils.isEmptyList(brokerIds) && ValidateUtils.isNull(regionId)) { + return false; + } + return true; + } +} diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index 0a9b14bf..f850d1ad 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.*; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO; @@ -122,4 +123,9 @@ public interface TopicService { * 配额调整 */ Result addTopicQuota(TopicQuotaDTO dto); + + /** + * 扩分区 + */ + Result expandTopic(TopicExpandDTO dto); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index b02a2495..8c215d8b 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -6,6 +6,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; @@ -911,7 +912,29 @@ public class TopicServiceImpl implements TopicService { return Result.buildFrom(ResultStatus.FAIL); } - private Result checkTopicOffsetChanged(ClusterDO clusterDO, + @Override + public Result expandTopic(TopicExpandDTO dto) { + // 校验非空 + if (!dto.paramLegal()) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + //获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + //获取集群信息 + ClusterDO clusterDO = clusterService.getById(physicalClusterId); + if (ValidateUtils.isNull(clusterDO)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + //扩分区 + ResultStatus resultStatus = adminService.expandPartitions(clusterDO, dto.getTopicName(), dto.getPartitionNum(), + dto.getRegionId(), dto.getBrokerIds(), SpringTool.getUserName()); + return Result.buildFrom(resultStatus); + } + + private Result checkTopicOffsetChanged(ClusterDO clusterDO, String topicName, Map endOffsetMap) { if (ValidateUtils.isNull(clusterDO) diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index 75d97e68..269529fc 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -8,6 +8,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicPartitionDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; @@ -351,6 +352,17 @@ public class NormalTopicController { return topicService.addTopic(dto); } + @ApiOperation(value = "扩分区",notes = "扩分区") + @RequestMapping(value = "{/topics/expand}",method = RequestMethod.POST) + @ResponseBody + public Result expandTopic(@RequestBody TopicExpandDTO dto) { + if (ValidateUtils.isNull(dto)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + return topicService.expandTopic(dto); + } + + @ApiOperation(value = "删除topic",notes = "删除topic") @RequestMapping(value = {"{clusterId}/topics/{topicName}/delete"},method = RequestMethod.DELETE) @ResponseBody