mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-04 20:02:07 +08:00
创建topic
This commit is contained in:
@@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.*;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBrokerDTO;
|
||||
@@ -116,4 +117,9 @@ public interface TopicService {
|
||||
* 删除topic
|
||||
*/
|
||||
Result deleteTopic(Long clusterId, String topicName);
|
||||
|
||||
/**
|
||||
* 配额调整
|
||||
*/
|
||||
Result addTopicQuota(TopicQuotaDTO dto);
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
package com.xiaojukeji.kafka.manager.service.service.impl;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicElemConfig;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicAddDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicQuotaDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
@@ -34,6 +34,7 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.*;
|
||||
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
|
||||
import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.KafkaZookeeperUtils;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
|
||||
@@ -93,10 +94,10 @@ public class TopicServiceImpl implements TopicService {
|
||||
private AbstractHealthScoreStrategy healthScoreStrategy;
|
||||
|
||||
@Autowired
|
||||
private ConfigService configService;
|
||||
private AdminService adminService;
|
||||
|
||||
@Autowired
|
||||
private AdminService adminService;
|
||||
private QuotaService quotaService;
|
||||
|
||||
@Override
|
||||
public List<TopicMetricsDO> getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) {
|
||||
@@ -837,58 +838,80 @@ public class TopicServiceImpl implements TopicService {
|
||||
|
||||
@Override
|
||||
public Result addTopic(TopicAddDTO dto) {
|
||||
//获取物理集群id
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId());
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
TopicDO topic = topicManagerService.getByTopicName(physicalClusterId, dto.getTopicName());
|
||||
if (!ValidateUtils.isNull(topic)) {
|
||||
return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST);
|
||||
}
|
||||
CreateTopicElemConfig createTopicConfig =
|
||||
configService.getCreateTopicConfig(physicalClusterId, SystemCodeConstant.KAFKA_MANAGER);
|
||||
if (dto.getPeakBytesIn() > createTopicConfig.getAutoExecMaxPeakBytesInUnitB()) {
|
||||
return Result.buildFailure("流量峰值设置过大,不超过30M");
|
||||
}
|
||||
//获取集群信息
|
||||
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
List<Integer> brokerList = regionService
|
||||
.getIdleRegionBrokerList(clusterDO.getId(), createTopicConfig.getRegionIdList());
|
||||
if (ValidateUtils.isEmptyList(brokerList)) {
|
||||
return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST);
|
||||
//判断topic是否存在
|
||||
TopicDO topic = topicManagerService.getByTopicName(physicalClusterId, dto.getTopicName());
|
||||
if (!ValidateUtils.isNull(topic)) {
|
||||
return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST);
|
||||
}
|
||||
//构建topicDo
|
||||
TopicDO topicDO = new TopicDO();
|
||||
topicDO.setAppId(dto.getAppId());
|
||||
topicDO.setClusterId(clusterDO.getId());
|
||||
topicDO.setClusterId(dto.getClusterId());
|
||||
topicDO.setTopicName(dto.getTopicName());
|
||||
topicDO.setDescription(dto.getDescription());
|
||||
topicDO.setPeakBytesIn(dto.getPeakBytesIn());
|
||||
Long partitionNum = Math.max(1, dto.getPeakBytesIn() / (3 * 1024 * 1024));
|
||||
Properties properties = TopicCreationConstant
|
||||
.createNewProperties(createTopicConfig.getRetentionTimeUnitHour() * 60 * 60 * 1000L);
|
||||
ResultStatus rs = adminService.createTopic(clusterDO, topicDO, partitionNum.intValue(),
|
||||
createTopicConfig.getReplicaNum(), null, brokerList, properties, null,
|
||||
//构建properties
|
||||
Properties properties = dto.getProperties();
|
||||
if (ValidateUtils.isNull(properties)) {
|
||||
properties = new Properties();
|
||||
}
|
||||
properties.put(KafkaConstant.RETENTION_MS_KEY, String.valueOf(dto.getRetentionTime()));
|
||||
//创建topic
|
||||
ResultStatus rs = adminService.createTopic(clusterDO, topicDO, dto.getPartitionNum(),
|
||||
dto.getReplicaNum(), dto.getRegionId(), dto.getBrokerIdList(), properties, SpringTool.getUserName(),
|
||||
SpringTool.getUserName());
|
||||
return Result.buildFrom(rs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result deleteTopic(Long clusterId, String topicName) {
|
||||
//获得物理集群id
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId);
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
//获取集群信息
|
||||
ClusterDO clusterDO = clusterService.getById(physicalClusterId);
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
//删除topic
|
||||
ResultStatus rs = adminService.deleteTopic(clusterDO, topicName, SpringTool.getUserName());
|
||||
return Result.buildFrom(rs);
|
||||
}
|
||||
|
||||
private Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(ClusterDO clusterDO,
|
||||
@Override
|
||||
public Result addTopicQuota(TopicQuotaDTO dto) {
|
||||
//获取物理集群id
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(dto.getClusterId());
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
//构建配额
|
||||
TopicQuota topicQuota = new TopicQuota();
|
||||
topicQuota.setClusterId(physicalClusterId);
|
||||
topicQuota.setAppId(dto.getAppId());
|
||||
topicQuota.setTopicName(dto.getTopicName());
|
||||
topicQuota.setProduceQuota(dto.getProduceQuota());
|
||||
topicQuota.setConsumeQuota(dto.getConsumeQuota());
|
||||
//配额调整
|
||||
int result = quotaService.addTopicQuota(topicQuota);
|
||||
if (result > 0) {
|
||||
return Result.buildFrom(ResultStatus.SUCCESS);
|
||||
}
|
||||
return Result.buildFrom(ResultStatus.FAIL);
|
||||
}
|
||||
|
||||
private Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(ClusterDO clusterDO,
|
||||
String topicName,
|
||||
Map<TopicPartition, Long> endOffsetMap) {
|
||||
if (ValidateUtils.isNull(clusterDO)
|
||||
|
||||
Reference in New Issue
Block a user