扩分区

This commit is contained in:
tangcongfa_v@didichuxing.com
2021-05-08 11:23:06 +08:00
parent 473fc27b49
commit f0b3b9f7f4
4 changed files with 99 additions and 1 deletions

View File

@@ -0,0 +1,57 @@
package com.xiaojukeji.kafka.manager.common.entity.dto.normal;
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.List;
@ApiModel(description = "扩分区")
public class TopicExpandDTO extends ClusterTopicDTO {
@ApiModelProperty(value = "regionId")
private Long regionId;
@ApiModelProperty(value = "brokerId列表")
private List<Integer> brokerIds;
@ApiModelProperty(value = "新增分区数")
private Integer partitionNum;
public Long getRegionId() {
return regionId;
}
public void setRegionId(Long regionId) {
this.regionId = regionId;
}
public List<Integer> getBrokerIds() {
return brokerIds;
}
public void setBrokerIds(List<Integer> brokerIds) {
this.brokerIds = brokerIds;
}
public Integer getPartitionNum() {
return partitionNum;
}
public void setPartitionNum(Integer partitionNum) {
this.partitionNum = partitionNum;
}
public boolean paramLegal() {
if (ValidateUtils.isNull(clusterId)
|| ValidateUtils.isNull(topicName)
|| ValidateUtils.isNull(partitionNum) || partitionNum <= 0) {
return false;
}
if (ValidateUtils.isEmptyList(brokerIds) && ValidateUtils.isNull(regionId)) {
return false;
}
return true;
}
}

View File

@@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.*;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO;
@@ -122,4 +123,9 @@ public interface TopicService {
* 配额调整
*/
Result addTopicQuota(TopicQuotaDTO dto);
/**
* 扩分区
*/
Result expandTopic(TopicExpandDTO dto);
}

View File

@@ -6,6 +6,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
@@ -911,7 +912,29 @@ public class TopicServiceImpl implements TopicService {
return Result.buildFrom(ResultStatus.FAIL);
}
private Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(ClusterDO clusterDO,
@Override
public Result expandTopic(TopicExpandDTO dto) {
// 校验非空
if (!dto.paramLegal()) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
//获取物理集群id
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId());
if (ValidateUtils.isNull(physicalClusterId)) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
//获取集群信息
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
//扩分区
ResultStatus resultStatus = adminService.expandPartitions(clusterDO, dto.getTopicName(), dto.getPartitionNum(),
dto.getRegionId(), dto.getBrokerIds(), SpringTool.getUserName());
return Result.buildFrom(resultStatus);
}
private Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(ClusterDO clusterDO,
String topicName,
Map<TopicPartition, Long> endOffsetMap) {
if (ValidateUtils.isNull(clusterDO)

View File

@@ -8,6 +8,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicPartitionDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
@@ -351,6 +352,17 @@ public class NormalTopicController {
return topicService.addTopic(dto);
}
@ApiOperation(value = "扩分区",notes = "扩分区")
@RequestMapping(value = "{/topics/expand}",method = RequestMethod.POST)
@ResponseBody
public Result expandTopic(@RequestBody TopicExpandDTO dto) {
if (ValidateUtils.isNull(dto)) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
return topicService.expandTopic(dto);
}
@ApiOperation(value = "删除topic",notes = "删除topic")
@RequestMapping(value = {"{clusterId}/topics/{topicName}/delete"},method = RequestMethod.DELETE)
@ResponseBody