diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java new file mode 100644 index 00000000..0db12952 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/normal/TopicAddDTO.java @@ -0,0 +1,43 @@ +package com.xiaojukeji.kafka.manager.common.entity.dto.normal; + + +import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel(description = "创建topic") +public class TopicAddDTO extends ClusterTopicDTO { + + @ApiModelProperty(value = "appId") + private String appId; + + @ApiModelProperty(value = "峰值流量") + private Long peakBytesIn; + + @ApiModelProperty(value = "备注信息") + private String description; + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public Long getPeakBytesIn() { + return peakBytesIn; + } + + public void setPeakBytesIn(Long peakBytesIn) { + this.peakBytesIn = peakBytesIn; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } +} diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index dacba4b0..39516fad 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -4,6 +4,7 @@ import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ao.*; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; @@ -105,4 +106,9 @@ public interface TopicService { List getTopicBrokerList(Long clusterId, String topicName); Result checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime); + + /** + * 创建topic + */ + Result addTopic(TopicAddDTO dto); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 5dea0561..1e2e66d3 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -3,6 +3,7 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; import com.xiaojukeji.kafka.manager.common.constant.Constant; @@ -20,6 +21,7 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionMap; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao; +import com.xiaojukeji.kafka.manager.dao.TopicDao; import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao; import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao; import com.xiaojukeji.kafka.manager.common.entity.pojo.*; @@ -87,6 +89,9 @@ public class TopicServiceImpl implements TopicService { @Autowired private AbstractHealthScoreStrategy healthScoreStrategy; + @Autowired + private TopicDao topicDao; + @Override public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { try { @@ -824,6 +829,26 @@ public class TopicServiceImpl implements TopicService { return new Result<>(TopicOffsetChangedEnum.UNKNOWN); } + @Override + public Result addTopic(TopicAddDTO dto) { + TopicDO topicDO = topicManagerService.getByTopicName(dto.getClusterId(), dto.getTopicName()); + if (!ValidateUtils.isNull(topicDO)) { + // 该topic已存在 + return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST); + } + TopicDO topic = new TopicDO(); + topic.setAppId(dto.getAppId()); + topic.setClusterId(dto.getClusterId()); + topic.setDescription(dto.getDescription()); + topic.setTopicName(dto.getTopicName()); + topic.setPeakBytesIn(dto.getPeakBytesIn() * 1024 * 1024); + int insert = topicDao.insert(topic); + if (insert > 0) { + return Result.buildFrom(ResultStatus.SUCCESS); + } + return Result.buildFrom(ResultStatus.FAIL); + } + private Result checkTopicOffsetChanged(ClusterDO clusterDO, String topicName, Map endOffsetMap) { diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index 6e59816b..f1b0b091 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -6,6 +6,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicPartitionDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; @@ -335,8 +336,18 @@ public class NormalTopicController { } return new Result<>(TopicModelConverter.convert2TopicMineAppVOList( - topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName())) + topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName())) ); } + @ApiOperation(value = "创建topic",notes = "创建topic") + @RequestMapping(value = {"/topics/add"},method = RequestMethod.POST) + @ResponseBody + public Result addTopic(@RequestBody TopicAddDTO dto) { + if (ValidateUtils.isNull(dto)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + return topicService.addTopic(dto); + } + } \ No newline at end of file