mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-07 06:02:07 +08:00
创建topic
This commit is contained in:
@@ -14,6 +14,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
|
|||||||
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
|
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
|
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
|
||||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
|
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
|
||||||
@@ -25,6 +26,7 @@ import com.xiaojukeji.kafka.manager.dao.TopicDao;
|
|||||||
import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao;
|
import com.xiaojukeji.kafka.manager.dao.TopicMetricsDao;
|
||||||
import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao;
|
import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.*;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.*;
|
||||||
|
import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao;
|
||||||
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
|
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
|
||||||
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
|
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
|
||||||
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
|
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
|
||||||
@@ -43,6 +45,7 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
@@ -92,6 +95,9 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TopicDao topicDao;
|
private TopicDao topicDao;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private AuthorityDao authorityDao;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<TopicMetricsDO> getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) {
|
public List<TopicMetricsDO> getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) {
|
||||||
try {
|
try {
|
||||||
@@ -829,6 +835,7 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
return new Result<>(TopicOffsetChangedEnum.UNKNOWN);
|
return new Result<>(TopicOffsetChangedEnum.UNKNOWN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
@Override
|
@Override
|
||||||
public Result addTopic(TopicAddDTO dto) {
|
public Result addTopic(TopicAddDTO dto) {
|
||||||
TopicDO topicDO = topicManagerService.getByTopicName(dto.getClusterId(), dto.getTopicName());
|
TopicDO topicDO = topicManagerService.getByTopicName(dto.getClusterId(), dto.getTopicName());
|
||||||
@@ -836,6 +843,14 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
// 该topic已存在
|
// 该topic已存在
|
||||||
return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST);
|
return Result.buildFrom(ResultStatus.TOPIC_ALREADY_EXIST);
|
||||||
}
|
}
|
||||||
|
// 给创建topic的用户该topic权限
|
||||||
|
AuthorityDO authorityDO = new AuthorityDO();
|
||||||
|
authorityDO.setAppId(dto.getAppId());
|
||||||
|
authorityDO.setClusterId(dto.getClusterId());
|
||||||
|
authorityDO.setTopicName(dto.getTopicName());
|
||||||
|
authorityDO.setAccess(3);
|
||||||
|
authorityDao.insert(authorityDO);
|
||||||
|
// 记录该topic
|
||||||
TopicDO topic = new TopicDO();
|
TopicDO topic = new TopicDO();
|
||||||
topic.setAppId(dto.getAppId());
|
topic.setAppId(dto.getAppId());
|
||||||
topic.setClusterId(dto.getClusterId());
|
topic.setClusterId(dto.getClusterId());
|
||||||
|
|||||||
Reference in New Issue
Block a user