创建topic

This commit is contained in:
tangcongfa_v@didichuxing.com
2021-04-26 11:16:53 +08:00
parent 0bd3e28348
commit f551674860
4 changed files with 86 additions and 1 deletions

View File

@@ -4,6 +4,7 @@ import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ao.*;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
@@ -105,4 +106,9 @@ public interface TopicService {
List<TopicBrokerDTO> getTopicBrokerList(Long clusterId, String topicName);
Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime);
/**
* 创建topic
*/
Result addTopic(TopicAddDTO dto);
}

View File

@@ -3,6 +3,7 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
@@ -20,6 +21,7 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionMap;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao;
import com.xiaojukeji.kafka.manager.dao.TopicDao;
import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao;
import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.*;
@@ -87,6 +89,9 @@ public class TopicServiceImpl implements TopicService {
@Autowired
private AbstractHealthScoreStrategy healthScoreStrategy;
@Autowired
private TopicDao topicDao;
@Override
public List<TopicMetricsDO> getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) {
try {
@@ -824,6 +829,26 @@ public class TopicServiceImpl implements TopicService {
return new Result<>(TopicOffsetChangedEnum.UNKNOWN);
}
@Override
public Result addTopic(TopicAddDTO dto) {
TopicDO topicDO = topicManagerService.getByTopicName(dto.getClusterId(), dto.getTopicName());
if (!ValidateUtils.isNull(topicDO)) {
// 该topic已存在
return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST);
}
TopicDO topic = new TopicDO();
topic.setAppId(dto.getAppId());
topic.setClusterId(dto.getClusterId());
topic.setDescription(dto.getDescription());
topic.setTopicName(dto.getTopicName());
topic.setPeakBytesIn(dto.getPeakBytesIn() * 1024 * 1024);
int insert = topicDao.insert(topic);
if (insert > 0) {
return Result.buildFrom(ResultStatus.SUCCESS);
}
return Result.buildFrom(ResultStatus.FAIL);
}
private Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(ClusterDO clusterDO,
String topicName,
Map<TopicPartition, Long> endOffsetMap) {