创建topic

This commit is contained in:
tangcongfa_v@didichuxing.com
2021-04-28 11:08:01 +08:00
parent ee1ab30c2c
commit b5901a2819

View File

@@ -1,8 +1,11 @@
package com.xiaojukeji.kafka.manager.service.service.impl; package com.xiaojukeji.kafka.manager.service.service.impl;
import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum;
import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicElemConfig;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
@@ -14,7 +17,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics; import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
@@ -22,11 +25,9 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionMap;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao; import com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao;
import com.xiaojukeji.kafka.manager.dao.TopicDao;
import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao; import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao;
import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao; import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.*; import com.xiaojukeji.kafka.manager.common.entity.pojo.*;
import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao;
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool; import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache; import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
@@ -45,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.*; import java.util.*;
@@ -93,10 +93,10 @@ public class TopicServiceImpl implements TopicService {
private AbstractHealthScoreStrategy healthScoreStrategy; private AbstractHealthScoreStrategy healthScoreStrategy;
@Autowired @Autowired
private TopicDao topicDao; private ConfigService configService;
@Autowired @Autowired
private AuthorityDao authorityDao; private AdminService adminService;
@Override @Override
public List<TopicMetricsDO> getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { public List<TopicMetricsDO> getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) {
@@ -835,36 +835,46 @@ public class TopicServiceImpl implements TopicService {
return new Result<>(TopicOffsetChangedEnum.UNKNOWN); return new Result<>(TopicOffsetChangedEnum.UNKNOWN);
} }
@Transactional
@Override @Override
public Result addTopic(TopicAddDTO dto) { public Result addTopic(TopicAddDTO dto) {
TopicDO topicDO = topicManagerService.getByTopicName(dto.getClusterId(), dto.getTopicName()); Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId());
if (!ValidateUtils.isNull(topicDO)) { if (ValidateUtils.isNull(physicalClusterId)) {
// 该topic已存在 return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST); }
} TopicDO topic = topicManagerService.getByTopicName(physicalClusterId, dto.getTopicName());
// 给创建topic的用户该topic权限 if (!ValidateUtils.isNull(topic)) {
AuthorityDO authorityDO = new AuthorityDO(); return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST);
authorityDO.setAppId(dto.getAppId()); }
authorityDO.setClusterId(dto.getClusterId()); CreateTopicElemConfig createTopicConfig =
authorityDO.setTopicName(dto.getTopicName()); configService.getCreateTopicConfig(physicalClusterId, SystemCodeConstant.KAFKA_MANAGER);
authorityDO.setAccess(3); if (dto.getPeakBytesIn() > createTopicConfig.getAutoExecMaxPeakBytesInUnitB()) {
authorityDao.insert(authorityDO); return Result.buildFailure("流量峰值设置过大不超过30M");
// 记录该topic }
TopicDO topic = new TopicDO(); ClusterDO clusterDO = clusterService.getById(physicalClusterId);
topic.setAppId(dto.getAppId()); if (ValidateUtils.isNull(clusterDO)) {
topic.setClusterId(dto.getClusterId()); return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
topic.setDescription(dto.getDescription()); }
topic.setTopicName(dto.getTopicName()); List<Integer> brokerList = regionService
topic.setPeakBytesIn(dto.getPeakBytesIn() * 1024 * 1024); .getIdleRegionBrokerList(clusterDO.getId(), createTopicConfig.getRegionIdList());
int insert = topicDao.insert(topic); if (ValidateUtils.isEmptyList(brokerList)) {
if (insert > 0) { return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST);
return Result.buildFrom(ResultStatus.SUCCESS); }
} TopicDO topicDO = new TopicDO();
return Result.buildFrom(ResultStatus.FAIL); topicDO.setAppId(dto.getAppId());
topicDO.setClusterId(clusterDO.getId());
topicDO.setTopicName(dto.getTopicName());
topicDO.setDescription(dto.getDescription());
topicDO.setPeakBytesIn(dto.getPeakBytesIn());
Long partitionNum = Math.max(1, dto.getPeakBytesIn() / (3 * 1024 * 1024));
Properties properties = TopicCreationConstant
.createNewProperties(createTopicConfig.getRetentionTimeUnitHour() * 60 * 60 * 1000L);
ResultStatus rs = adminService.createTopic(clusterDO, topicDO, partitionNum.intValue(),
createTopicConfig.getReplicaNum(), null, brokerList, properties, null,
SpringTool.getUserName());
return Result.buildFrom(rs);
} }
private Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(ClusterDO clusterDO, private Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(ClusterDO clusterDO,
String topicName, String topicName,
Map<TopicPartition, Long> endOffsetMap) { Map<TopicPartition, Long> endOffsetMap) {
if (ValidateUtils.isNull(clusterDO) if (ValidateUtils.isNull(clusterDO)