增加Topic同步任务&Bug修复

This commit is contained in:
zengqiao
2021-01-16 16:26:38 +08:00
parent 3c091a88d4
commit d5680ffd5d
88 changed files with 2230 additions and 404 deletions

View File

@@ -0,0 +1,51 @@
package com.xiaojukeji.kafka.manager.task.config;
public class SyncTopic2DBConfig {
/**
* 默认的App
*/
private String defaultAppId;
/**
* 进行同步的集群
*/
private Long clusterId;
/**
* 是否增加权限信息, 默认不增加
*/
private boolean addAuthority;
public String getDefaultAppId() {
return defaultAppId;
}
public void setDefaultAppId(String defaultAppId) {
this.defaultAppId = defaultAppId;
}
public Long getClusterId() {
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public boolean isAddAuthority() {
return addAuthority;
}
public void setAddAuthority(boolean addAuthority) {
this.addAuthority = addAuthority;
}
@Override
public String toString() {
return "SyncTopic2DBConfig{" +
"defaultAppId='" + defaultAppId + '\'' +
", clusterId=" + clusterId +
", addAuthority=" + addAuthority +
'}';
}
}

View File

@@ -2,7 +2,7 @@ package com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect;
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroup;
import com.xiaojukeji.kafka.manager.common.entity.metrics.ConsumerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.events.ConsumerMetricsCollectedEvent;
@@ -105,7 +105,7 @@ public class CollectAndPublishCGData extends AbstractScheduledTask<ClusterDO> {
private List<ConsumerMetrics> getTopicConsumerMetrics(ClusterDO clusterDO,
String topicName,
long startTimeUnitMs) {
List<ConsumerGroupDTO> consumerGroupDTOList = consumerService.getConsumerGroupList(clusterDO.getId(), topicName);
List<ConsumerGroup> consumerGroupDTOList = consumerService.getConsumerGroupList(clusterDO.getId(), topicName);
if (ValidateUtils.isEmptyList(consumerGroupDTOList)) {
// 重试
consumerGroupDTOList = consumerService.getConsumerGroupList(clusterDO.getId(), topicName);
@@ -131,7 +131,7 @@ public class CollectAndPublishCGData extends AbstractScheduledTask<ClusterDO> {
partitionOffsetMap.put(entry.getKey().partition(), entry.getValue());
}
for (ConsumerGroupDTO consumerGroupDTO: consumerGroupDTOList) {
for (ConsumerGroup consumerGroupDTO: consumerGroupDTOList) {
try {
ConsumerMetrics consumerMetrics =
getTopicConsumerMetrics(clusterDO, topicName, consumerGroupDTO, partitionOffsetMap, startTimeUnitMs);
@@ -150,20 +150,20 @@ public class CollectAndPublishCGData extends AbstractScheduledTask<ClusterDO> {
private ConsumerMetrics getTopicConsumerMetrics(ClusterDO clusterDO,
String topicName,
ConsumerGroupDTO consumerGroupDTO,
ConsumerGroup consumerGroup,
Map<Integer, Long> partitionOffsetMap,
long startTimeUnitMs) {
Map<Integer, Long> consumerOffsetMap =
consumerService.getConsumerOffset(clusterDO, topicName, consumerGroupDTO);
consumerService.getConsumerOffset(clusterDO, topicName, consumerGroup);
if (ValidateUtils.isEmptyMap(consumerOffsetMap)) {
return null;
}
ConsumerMetrics metrics = new ConsumerMetrics();
metrics.setClusterId(clusterDO.getId());
metrics.setTopicName(topicName);
metrics.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
metrics.setLocation(consumerGroupDTO.getOffsetStoreLocation().location);
metrics.setConsumerGroup(consumerGroup.getConsumerGroup());
metrics.setLocation(consumerGroup.getOffsetStoreLocation().location);
metrics.setPartitionOffsetMap(partitionOffsetMap);
metrics.setConsumeOffsetMap(consumerOffsetMap);
metrics.setTimestampUnitMs(startTimeUnitMs);

View File

@@ -15,6 +15,7 @@ import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.*;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
@@ -109,6 +110,11 @@ public class AutoHandleTopicOrder extends AbstractScheduledTask<EmptyEntry> {
return false;
}
if (PhysicalClusterMetadataManager.isTopicExist(physicalClusterId, dto.getTopicName())) {
rejectForRepeatedTopicName(orderDO);
return false;
}
if (ValidateUtils.isNull(dto.isPhysicalClusterId()) || !dto.isPhysicalClusterId()) {
return handleApplyTopicOrderByLogicalClusterId(clusterDO, orderDO, dto, createConfig);
}
@@ -117,6 +123,13 @@ public class AutoHandleTopicOrder extends AbstractScheduledTask<EmptyEntry> {
return handleApplyTopicOrderByPhysicalClusterId(clusterDO, orderDO, dto, createConfig);
}
private void rejectForRepeatedTopicName(OrderDO orderDO) {
orderDO.setApplicant(Constant.AUTO_HANDLE_USER_NAME);
orderDO.setStatus(OrderStatusEnum.REFUSED.getCode());
orderDO.setOpinion("驳回:该 Topic 已被别人申请并生效");
orderService.updateOrderById(orderDO);
}
/**
* 逻辑集群申请单
*/

View File

@@ -0,0 +1,163 @@
package com.xiaojukeji.kafka.manager.task.dispatch.op;
import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.ConfigService;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
import com.xiaojukeji.kafka.manager.task.config.SyncTopic2DBConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.stream.Collectors;
/**
* 定期将未落盘的Topic刷新到DB中, 仅存储对应的关系, 并不会增加权限等信息
* @author zengqiao
* @date 19/12/29
*/
@Component
@CustomScheduled(name = "syncTopic2DB", cron = "0 0/2 * * * ?", threadNum = 1)
@ConditionalOnProperty(prefix = "task.op", name = "sync-topic-enabled", havingValue = "true", matchIfMissing = false)
public class SyncTopic2DB extends AbstractScheduledTask<EmptyEntry> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
private static final String SYNC_TOPIC_2_DB_CONFIG_KEY = "SYNC_TOPIC_2_DB_CONFIG_KEY";
@Autowired
private AppService appService;
@Autowired
private ConfigService configService;
@Autowired
private ClusterService clusterService;
@Autowired
private AuthorityService authorityService;
@Autowired
private TopicManagerService topicManagerService;
@Override
public List<EmptyEntry> listAllTasks() {
EmptyEntry emptyEntry = new EmptyEntry();
emptyEntry.setId(System.currentTimeMillis() / 1000);
return Arrays.asList(emptyEntry);
}
@Override
public void processTask(EmptyEntry entryEntry) {
Map<Long, SyncTopic2DBConfig> clusterIdConfigMap = getConfig();
if (ValidateUtils.isEmptyMap(clusterIdConfigMap)) {
LOGGER.warn("class=SyncTopic2DB||method=processTask||msg=without config or config illegal");
return;
}
LOGGER.info("class=SyncTopic2DB||method=processTask||data={}||msg=start sync", JsonUtils.toJSONString(clusterIdConfigMap));
List<ClusterDO> clusterDOList = clusterService.list();
if (ValidateUtils.isEmptyList(clusterDOList)) {
return;
}
for (ClusterDO clusterDO: clusterDOList) {
if (!clusterIdConfigMap.containsKey(clusterDO.getId())) {
continue;
}
try {
syncTopic2DB(clusterDO.getId(), clusterIdConfigMap.get(clusterDO.getId()));
} catch (Exception e) {
LOGGER.error("class=SyncTopic2DB||method=processTask||clusterId={}||errMsg={}||msg=sync failed", clusterDO.getId(), e.getMessage());
}
}
}
private void syncTopic2DB(Long clusterId, SyncTopic2DBConfig syncTopic2DBConfig) {
List<TopicDO> doList = topicManagerService.getByClusterId(clusterId);
if (ValidateUtils.isNull(doList)) {
doList = new ArrayList<>();
}
Set<String> existedTopicNameSet = doList.stream().map(elem -> elem.getTopicName()).collect(Collectors.toSet());
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
if (existedTopicNameSet.contains(topicName)
|| KafkaConstant.COORDINATOR_TOPIC_NAME.equals(topicName)
|| KafkaConstant.TRANSACTION_TOPIC_NAME.equals(topicName)) {
continue;
}
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
if (ValidateUtils.isNull(topicMetadata)) {
continue;
}
// 新创建10分钟内的Topic不进行同步, 避免KM平台上新建的, 有所属应用的Topic被错误的同步了
if (System.currentTimeMillis() - topicMetadata.getCreateTime() < 10 * 60 * 1000) {
continue;
}
TopicDO topicDO = new TopicDO();
topicDO.setAppId(syncTopic2DBConfig.getDefaultAppId());
topicDO.setClusterId(clusterId);
topicDO.setTopicName(topicName);
topicDO.setDescription("定期同步至DB中的无主Topic");
topicDO.setPeakBytesIn(TopicCreationConstant.DEFAULT_QUOTA);
topicManagerService.addTopic(topicDO);
if (ValidateUtils.isNull(syncTopic2DBConfig.isAddAuthority()) || !syncTopic2DBConfig.isAddAuthority()) {
// 不增加权限信息, 则直接忽略
return;
}
// TODO 当前添加 Topic 和 添加 Authority 是非事务的, 中间出现异常之后, 会导致数据错误, 后续还需要优化一下
AuthorityDO authorityDO = new AuthorityDO();
authorityDO.setAppId(syncTopic2DBConfig.getDefaultAppId());
authorityDO.setClusterId(clusterId);
authorityDO.setTopicName(topicName);
authorityDO.setAccess(TopicAuthorityEnum.READ_WRITE.getCode());
authorityService.addAuthority(authorityDO);
}
}
private Map<Long, SyncTopic2DBConfig> getConfig() {
List<SyncTopic2DBConfig> configList = configService.getArrayByKey(SYNC_TOPIC_2_DB_CONFIG_KEY, SyncTopic2DBConfig.class);
if (ValidateUtils.isEmptyList(configList)) {
return Collections.EMPTY_MAP;
}
Map<Long, SyncTopic2DBConfig> clusterIdConfigMap = new HashMap<>();
for (SyncTopic2DBConfig syncTopic2DBConfig: configList) {
if (ValidateUtils.isNullOrLessThanZero(syncTopic2DBConfig.getClusterId())
|| ValidateUtils.isBlank(syncTopic2DBConfig.getDefaultAppId())) {
continue;
}
AppDO appDO = appService.getByAppId(syncTopic2DBConfig.getDefaultAppId());
if (ValidateUtils.isNull(appDO)) {
continue;
}
clusterIdConfigMap.put(syncTopic2DBConfig.getClusterId(), syncTopic2DBConfig);
}
return clusterIdConfigMap;
}
}

View File

@@ -50,8 +50,7 @@ public class FlushBKConsumerGroupMetadata {
private void flush(Long clusterId) {
// 获取消费组列表
Set<String> consumerGroupSet = new HashSet<>();
Map<String, List<String>> consumerGroupAppIdMap = new HashMap<>();
collectAndSaveConsumerGroup(clusterId, consumerGroupSet, consumerGroupAppIdMap);
collectAndSaveConsumerGroup(clusterId, consumerGroupSet);
// 获取消费组summary信息
Map<String, Set<String>> topicNameConsumerGroupMap = new HashMap<>();
@@ -67,15 +66,12 @@ public class FlushBKConsumerGroupMetadata {
new ConsumerMetadata(
consumerGroupSet,
topicNameConsumerGroupMap,
consumerGroupSummary,
consumerGroupAppIdMap
consumerGroupSummary
)
);
}
private void collectAndSaveConsumerGroup(Long clusterId,
Set<String> consumerGroupSet,
Map<String, List<String>> consumerGroupAppIdMap) {
private void collectAndSaveConsumerGroup(Long clusterId, Set<String> consumerGroupSet) {
try {
AdminClient adminClient = KafkaClientPool.getAdminClient(clusterId);
@@ -83,20 +79,14 @@ public class FlushBKConsumerGroupMetadata {
for (scala.collection.immutable.List<kafka.coordinator.GroupOverview> brokerGroup : JavaConversions.asJavaMap(brokerGroupMap).values()) {
List<kafka.coordinator.GroupOverview> lists = JavaConversions.asJavaList(brokerGroup);
for (kafka.coordinator.GroupOverview groupOverview : lists) {
String consumerGroup = groupOverview.groupId();
List<String> appIdList = new ArrayList<>();
if (consumerGroup != null && consumerGroup.contains("#")) {
String[] splitArray = consumerGroup.split("#");
consumerGroup = splitArray[splitArray.length - 1];
appIdList = Arrays.asList(splitArray).subList(0, splitArray.length - 1);
}
consumerGroupAppIdMap.put(consumerGroup, appIdList);
consumerGroupSet.add(consumerGroup);
}
}
return ;
} catch (Exception e) {
LOGGER.error("collect consumerGroup failed, clusterId:{}.", clusterId, e);
}

View File

@@ -55,7 +55,7 @@ public class FlushZKConsumerGroupMetadata {
collectTopicAndConsumerGroupMap(clusterId, new ArrayList<>(consumerGroupSet));
ConsumerMetadataCache.putConsumerMetadataInZK(
clusterId,
new ConsumerMetadata(consumerGroupSet, topicNameConsumerGroupMap, new HashMap<>(0), new HashMap<>(0))
new ConsumerMetadata(consumerGroupSet, topicNameConsumerGroupMap, new HashMap<>(0))
);
}