From b5901a281990fd77a5754bff917803b591b6c618 Mon Sep 17 00:00:00 2001 From: "tangcongfa_v@didichuxing.com" Date: Wed, 28 Apr 2021 11:08:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9B=E5=BB=BAtopic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/TopicServiceImpl.java | 74 +++++++++++-------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 590081b5..6bbd2e7b 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -1,8 +1,11 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; +import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant; +import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicElemConfig; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; @@ -14,7 +17,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics; -import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; +import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; @@ -22,11 +25,9 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionMap; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.dao.TopicAppMetricsDao; -import com.xiaojukeji.kafka.manager.dao.TopicDao; import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao; import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao; import com.xiaojukeji.kafka.manager.common.entity.pojo.*; -import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao; import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool; import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache; import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; @@ -45,7 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; import java.util.*; @@ -93,10 +93,10 @@ public class TopicServiceImpl implements TopicService { private AbstractHealthScoreStrategy healthScoreStrategy; @Autowired - private TopicDao topicDao; + private ConfigService configService; @Autowired - private AuthorityDao authorityDao; + private AdminService adminService; @Override public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { @@ -835,36 +835,46 @@ public class TopicServiceImpl implements TopicService { return new Result<>(TopicOffsetChangedEnum.UNKNOWN); } - @Transactional @Override public Result addTopic(TopicAddDTO dto) { - TopicDO topicDO = topicManagerService.getByTopicName(dto.getClusterId(), dto.getTopicName()); - if (!ValidateUtils.isNull(topicDO)) { - // 该topic已存在 - return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST); - } - // 给创建topic的用户该topic权限 - AuthorityDO authorityDO = new AuthorityDO(); - authorityDO.setAppId(dto.getAppId()); - authorityDO.setClusterId(dto.getClusterId()); - authorityDO.setTopicName(dto.getTopicName()); - authorityDO.setAccess(3); - authorityDao.insert(authorityDO); - // 记录该topic - TopicDO topic = new TopicDO(); - topic.setAppId(dto.getAppId()); - topic.setClusterId(dto.getClusterId()); - topic.setDescription(dto.getDescription()); - topic.setTopicName(dto.getTopicName()); - topic.setPeakBytesIn(dto.getPeakBytesIn() * 1024 * 1024); - int insert = topicDao.insert(topic); - if (insert > 0) { - return Result.buildFrom(ResultStatus.SUCCESS); - } - return Result.buildFrom(ResultStatus.FAIL); + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + TopicDO topic = topicManagerService.getByTopicName(physicalClusterId, dto.getTopicName()); + if (!ValidateUtils.isNull(topic)) { + return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST); + } + CreateTopicElemConfig createTopicConfig = + configService.getCreateTopicConfig(physicalClusterId, SystemCodeConstant.KAFKA_MANAGER); + if (dto.getPeakBytesIn() > createTopicConfig.getAutoExecMaxPeakBytesInUnitB()) { + return Result.buildFailure("流量峰值设置过大,不超过30M"); + } + ClusterDO clusterDO = clusterService.getById(physicalClusterId); + if (ValidateUtils.isNull(clusterDO)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + List brokerList = regionService + .getIdleRegionBrokerList(clusterDO.getId(), createTopicConfig.getRegionIdList()); + if (ValidateUtils.isEmptyList(brokerList)) { + return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST); + } + TopicDO topicDO = new TopicDO(); + topicDO.setAppId(dto.getAppId()); + topicDO.setClusterId(clusterDO.getId()); + topicDO.setTopicName(dto.getTopicName()); + topicDO.setDescription(dto.getDescription()); + topicDO.setPeakBytesIn(dto.getPeakBytesIn()); + Long partitionNum = Math.max(1, dto.getPeakBytesIn() / (3 * 1024 * 1024)); + Properties properties = TopicCreationConstant + .createNewProperties(createTopicConfig.getRetentionTimeUnitHour() * 60 * 60 * 1000L); + ResultStatus rs = adminService.createTopic(clusterDO, topicDO, partitionNum.intValue(), + createTopicConfig.getReplicaNum(), null, brokerList, properties, null, + SpringTool.getUserName()); + return Result.buildFrom(rs); } - private Result checkTopicOffsetChanged(ClusterDO clusterDO, + private Result checkTopicOffsetChanged(ClusterDO clusterDO, String topicName, Map endOffsetMap) { if (ValidateUtils.isNull(clusterDO)