From b68ba0bff6f560ba1e00832f7b292cdbf1082e85 Mon Sep 17 00:00:00 2001 From: "tangcongfa_v@didichuxing.com" Date: Tue, 11 May 2021 16:58:44 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E9=A2=9D=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/entity/dto/TopicAuthorityDTO.java | 30 ----- .../common/entity/dto/normal/TopicAddDTO.java | 101 --------------- .../entity/dto/normal/TopicExpandDTO.java | 57 --------- .../manager/service/service/TopicService.java | 22 ---- .../service/impl/TopicServiceImpl.java | 118 ------------------ .../normal/NormalTopicController.java | 43 ------- 6 files changed, 371 deletions(-) delete mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/TopicAuthorityDTO.java delete mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java delete mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicExpandDTO.java diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/TopicAuthorityDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/TopicAuthorityDTO.java deleted file mode 100644 index 7be62ff6..00000000 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/TopicAuthorityDTO.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.xiaojukeji.kafka.manager.common.entity.dto; - -import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; - -@ApiModel(description = "权限调整") -public class TopicAuthorityDTO extends ClusterTopicDTO{ - - @ApiModelProperty(value = "appId") - private String appId; - - @ApiModelProperty(value = "0:无权限, 1:读, 2:写, 3:读写") - private Integer access; - - public String getAppId() { - return appId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - public Integer getAccess() { - return access; - } - - public void setAccess(Integer access) { - this.access = access; - } -} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java deleted file mode 100644 index 4783f106..00000000 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.xiaojukeji.kafka.manager.common.entity.dto.normal; - -import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; -import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; - -import java.util.List; -import java.util.Properties; - -@ApiModel(description = "创建topic") -public class TopicAddDTO extends ClusterTopicDTO { - - @ApiModelProperty(value = "AppID") - private String appId; - - @ApiModelProperty(value = "分区数") - private Integer partitionNum; - - @ApiModelProperty(value = "副本数") - private Integer replicaNum; - - @ApiModelProperty(value = "消息保存时间(ms)") - private Long retentionTime; - - @ApiModelProperty(value = "brokerId列表") - private List brokerIdList; - - @ApiModelProperty(value = "RegionId") - private Long regionId; - - @ApiModelProperty(value = "备注") - private String description; - - @ApiModelProperty(value = "Topic属性列表") - private Properties properties; - - public String getAppId() { - return appId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - public Integer getPartitionNum() { - return partitionNum; - } - - public void setPartitionNum(Integer partitionNum) { - this.partitionNum = partitionNum; - } - - public Integer getReplicaNum() { - return replicaNum; - } - - public void setReplicaNum(Integer replicaNum) { - this.replicaNum = replicaNum; - } - - public Long getRetentionTime() { - return retentionTime; - } - - public void setRetentionTime(Long retentionTime) { - this.retentionTime = retentionTime; - } - - public List getBrokerIdList() { - return brokerIdList; - } - - public void setBrokerIdList(List brokerIdList) { - this.brokerIdList = brokerIdList; - } - - public Long getRegionId() { - return regionId; - } - - public void setRegionId(Long regionId) { - this.regionId = regionId; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public Properties getProperties() { - return properties; - } - - public void setProperties(Properties properties) { - this.properties = properties; - } - -} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicExpandDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicExpandDTO.java deleted file mode 100644 index 9dc6d5a1..00000000 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicExpandDTO.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.xiaojukeji.kafka.manager.common.entity.dto.normal; - -import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; -import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; -import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; - -import java.util.List; - -@ApiModel(description = "扩分区") -public class TopicExpandDTO extends ClusterTopicDTO { - - @ApiModelProperty(value = "regionId") - private Long regionId; - - @ApiModelProperty(value = "brokerId列表") - private List brokerIds; - - @ApiModelProperty(value = "新增分区数") - private Integer partitionNum; - - public Long getRegionId() { - return regionId; - } - - public void setRegionId(Long regionId) { - this.regionId = regionId; - } - - public List getBrokerIds() { - return brokerIds; - } - - public void setBrokerIds(List brokerIds) { - this.brokerIds = brokerIds; - } - - public Integer getPartitionNum() { - return partitionNum; - } - - public void setPartitionNum(Integer partitionNum) { - this.partitionNum = partitionNum; - } - - public boolean paramLegal() { - if (ValidateUtils.isNull(clusterId) - || ValidateUtils.isNull(topicName) - || ValidateUtils.isNull(partitionNum) || partitionNum <= 0) { - return false; - } - if (ValidateUtils.isEmptyList(brokerIds) && ValidateUtils.isNull(regionId)) { - return false; - } - return true; - } -} diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index 3646d6fa..cacfb9f0 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -4,11 +4,8 @@ import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ao.*; -import com.xiaojukeji.kafka.manager.common.entity.dto.TopicAuthorityDTO; -import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; -import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO; @@ -110,28 +107,9 @@ public interface TopicService { Result checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime); - /** - * 创建topic - */ - Result addTopic(TopicAddDTO dto); - - /** - * 删除topic - */ - Result deleteTopic(Long clusterId, String topicName); - /** * 配额调整 */ Result addTopicQuota(TopicQuotaDTO dto); - /** - * 扩分区 - */ - Result expandTopic(TopicExpandDTO dto); - - /** - * 权限调整 - */ - Result addAuthorityAdd(TopicAuthorityDTO dto); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 5e17386a..f6140cc9 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -1,13 +1,9 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; -import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; -import com.xiaojukeji.kafka.manager.common.entity.dto.TopicAuthorityDTO; -import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; -import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; @@ -19,8 +15,6 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics; -import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; -import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; @@ -843,59 +837,6 @@ public class TopicServiceImpl implements TopicService { return new Result<>(TopicOffsetChangedEnum.UNKNOWN); } - @Override - public Result addTopic(TopicAddDTO dto) { - //获取物理集群id - Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); - if (ValidateUtils.isNull(physicalClusterId)) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - //获取集群信息 - ClusterDO clusterDO = clusterService.getById(physicalClusterId); - if (ValidateUtils.isNull(clusterDO)) { - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - //判断topic是否存在 - TopicDO topic = topicManagerService.getByTopicName(physicalClusterId, dto.getTopicName()); - if (!ValidateUtils.isNull(topic)) { - return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST); - } - //构建topicDo - TopicDO topicDO = new TopicDO(); - topicDO.setAppId(dto.getAppId()); - topicDO.setClusterId(dto.getClusterId()); - topicDO.setTopicName(dto.getTopicName()); - topicDO.setDescription(dto.getDescription()); - //构建properties - Properties properties = dto.getProperties(); - if (ValidateUtils.isNull(properties)) { - properties = new Properties(); - } - properties.put(KafkaConstant.RETENTION_MS_KEY, String.valueOf(dto.getRetentionTime())); - //创建topic - ResultStatus rs = adminService.createTopic(clusterDO, topicDO, dto.getPartitionNum(), - dto.getReplicaNum(), dto.getRegionId(), dto.getBrokerIdList(), properties, SpringTool.getUserName(), - SpringTool.getUserName()); - return Result.buildFrom(rs); - } - - @Override - public Result deleteTopic(Long clusterId, String topicName) { - //获得物理集群id - Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId); - if (ValidateUtils.isNull(physicalClusterId)) { - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - //获取集群信息 - ClusterDO clusterDO = clusterService.getById(physicalClusterId); - if (ValidateUtils.isNull(clusterDO)) { - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - //删除topic - ResultStatus rs = adminService.deleteTopic(clusterDO, topicName, SpringTool.getUserName()); - return Result.buildFrom(rs); - } - @Override public Result addTopicQuota(TopicQuotaDTO dto) { //获取物理集群id @@ -918,65 +859,6 @@ public class TopicServiceImpl implements TopicService { return Result.buildFrom(ResultStatus.MYSQL_ERROR); } - @Override - public Result expandTopic(TopicExpandDTO dto) { - // 校验非空 - if (!dto.paramLegal()) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - //获取物理集群id - Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); - if (ValidateUtils.isNull(physicalClusterId)) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - //获取集群信息 - ClusterDO clusterDO = clusterService.getById(physicalClusterId); - if (ValidateUtils.isNull(clusterDO)) { - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - //扩分区 - ResultStatus resultStatus = adminService.expandPartitions(clusterDO, dto.getTopicName(), dto.getPartitionNum(), - dto.getRegionId(), dto.getBrokerIds(), SpringTool.getUserName()); - return Result.buildFrom(resultStatus); - } - - @Override - public Result addAuthorityAdd(TopicAuthorityDTO dto) { - //查询该用户拥有的应用 - List appDOs = appService.getByPrincipal(SpringTool.getUserName()); - if (ValidateUtils.isEmptyList(appDOs)) { - //该用户无应用,需要先申请应用 - return Result.buildFrom(ResultStatus.APP_NOT_EXIST); - } - List appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList()); - if (!appIds.contains(dto.getAccess())) { - //入参中的appId,该用户未拥有 - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - //获取物理集群id - Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); - if (ValidateUtils.isNull(physicalClusterId)) { - //集群不存在 - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - //获取集群信息 - ClusterDO clusterDO = clusterService.getById(physicalClusterId); - if (ValidateUtils.isNull(clusterDO)) { - //集群不存在 - return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); - } - //构建authorityDo - AuthorityDO authorityDO = new AuthorityDO(); - authorityDO.setClusterId(physicalClusterId); - authorityDO.setAppId(dto.getAppId()); - authorityDO.setTopicName(dto.getTopicName()); - authorityDO.setAccess(dto.getAccess()); - if (authorityDao.insert(authorityDO) > 0) { - return Result.buildFrom(ResultStatus.SUCCESS); - } - return Result.buildFrom(ResultStatus.MYSQL_ERROR); - } - private Result checkTopicOffsetChanged(ClusterDO clusterDO, String topicName, Map endOffsetMap) { diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index a742d487..0b6dfde6 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -6,10 +6,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicPartitionDTO; -import com.xiaojukeji.kafka.manager.common.entity.dto.TopicAuthorityDTO; -import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; -import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicExpandDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; @@ -364,37 +361,6 @@ public class NormalTopicController { return new Result<>(new TopicStatisticMetricsVO(maxAvgBytesIn)); } - @ApiOperation(value = "创建topic",notes = "创建topic") - @RequestMapping(value = {"/topics/add"},method = RequestMethod.POST) - @ResponseBody - public Result addTopic(@RequestBody TopicAddDTO dto) { - if (ValidateUtils.isNull(dto)) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - return topicService.addTopic(dto); - } - - @ApiOperation(value = "扩分区",notes = "扩分区") - @RequestMapping(value = "{/topics/expand}",method = RequestMethod.POST) - @ResponseBody - public Result expandTopic(@RequestBody TopicExpandDTO dto) { - if (ValidateUtils.isNull(dto)) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - return topicService.expandTopic(dto); - } - - @ApiOperation(value = "删除topic",notes = "删除topic") - @RequestMapping(value = {"{clusterId}/topics/{topicName}/delete"},method = RequestMethod.DELETE) - @ResponseBody - public Result deleteTopic(@PathVariable Long clusterId, - @PathVariable String topicName) { - if (ValidateUtils.isNull(clusterId) || ValidateUtils.isNull(topicName)) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - return topicService.deleteTopic(clusterId,topicName); - } - @ApiOperation(value = "配额调整",notes = "配额调整") @RequestMapping(value = "{topics/quota/add}",method = RequestMethod.POST) @ResponseBody @@ -405,13 +371,4 @@ public class NormalTopicController { return topicService.addTopicQuota(dto); } - @ApiOperation(value = "权限调整",notes = "权限调整") - @RequestMapping(value = "{topics/authority/add}",method = RequestMethod.POST) - @ResponseBody - public Result addAuthorityAdd(@RequestBody TopicAuthorityDTO dto) { - if (ValidateUtils.isNull(dto)) { - return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); - } - return topicService.addAuthorityAdd(dto); - } } \ No newline at end of file