[Bugfix] 修复新建Topic后,立即查看Topic-Messages信息会提示Topic不存在的问题 (#697)

This commit is contained in:
renxiangde
2022-10-24 23:28:47 +08:00
committed by EricZeng
parent 127b5be651
commit 88b5833f77
3 changed files with 73 additions and 19 deletions

View File

@@ -10,14 +10,18 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReplicaAssignUtil;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.OpTopicService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import kafka.admin.AdminUtils;
@@ -52,6 +56,9 @@ public class OpTopicManagerImpl implements OpTopicManager {
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private PartitionService partitionService;
@Override
public Result<Void> createTopic(TopicCreateDTO dto, String operator) {
log.info("method=createTopic||param={}||operator={}.", dto, operator);
@@ -80,7 +87,7 @@ public class OpTopicManagerImpl implements OpTopicManager {
);
// 创建Topic
return opTopicService.createTopic(
Result<Void> createTopicRes = opTopicService.createTopic(
new TopicCreateParam(
dto.getClusterId(),
dto.getTopicName(),
@@ -90,6 +97,21 @@ public class OpTopicManagerImpl implements OpTopicManager {
),
operator
);
if (createTopicRes.successful()){
try{
FutureUtil.quickStartupFutureUtil.submitTask(() -> {
BackoffUtils.backoff(3000);
Result<List<Partition>> partitionsResult = partitionService.listPartitionsFromKafka(clusterPhy, dto.getTopicName());
if (partitionsResult.successful()){
partitionService.updatePartitions(clusterPhy.getId(), dto.getTopicName(), partitionsResult.getData(), new ArrayList<>());
}
});
}catch (Exception e) {
log.error("method=createTopic||param={}||operator={}||msg=add partition to db failed||errMsg=exception", dto, operator, e);
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, "Topic创建成功但记录Partition到DB中失败等待定时任务同步partition信息");
}
}
return createTopicRes;
}
@Override

View File

@@ -14,6 +14,8 @@ import java.util.Set;
public interface PartitionService {
Result<Map<String, List<Partition>>> listPartitionsFromKafka(ClusterPhy clusterPhy);
Result<List<Partition>> listPartitionsFromKafka(ClusterPhy clusterPhy, String topicName);
List<Partition> listPartitionByCluster(Long clusterPhyId);
List<PartitionPO> listPartitionPOByCluster(Long clusterPhyId);

View File

@@ -98,6 +98,15 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
return this.getPartitionsFromAdminClient(clusterPhy);
}
@Override
public Result<List<Partition>> listPartitionsFromKafka(ClusterPhy clusterPhy, String topicName) {
if (clusterPhy.getRunState().equals(ClusterRunStateEnum.RUN_ZK.getRunState())) {
return this.getPartitionsFromZKClientByClusterTopicName(clusterPhy,topicName);
}
return this.getPartitionsFromAdminClientByClusterTopicName(clusterPhy,topicName);
}
@Override
public List<Partition> listPartitionByCluster(Long clusterPhyId) {
LambdaQueryWrapper<PartitionPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
@@ -392,14 +401,12 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
// 获取Topic列表
ListTopicsResult listTopicsResult = adminClient.listTopics(new ListTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).listInternal(true));
for (String topicName: listTopicsResult.names().get()) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(
Arrays.asList(topicName),
new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);
TopicDescription description = describeTopicsResult.all().get().get(topicName);
partitionMap.put(topicName, PartitionConverter.convert2PartitionList(clusterPhy.getId(), description));
Result<List<Partition>> partitionListRes = this.getPartitionsFromAdminClientByClusterTopicName(clusterPhy, topicName);
if (partitionListRes.successful()){
partitionMap.put(topicName, partitionListRes.getData());
}else {
return Result.buildFromIgnoreData(partitionListRes);
}
}
return Result.buildSuc(partitionMap);
@@ -416,13 +423,42 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
try {
List<String> topicNameList = kafkaZKDAO.getChildren(clusterPhy.getId(), TopicsZNode.path(), false);
for (String topicName: topicNameList) {
PartitionMap zkPartitionMap = kafkaZKDAO.getData(clusterPhy.getId(), TopicZNode.path(topicName), PartitionMap.class);
Result<List<Partition>> partitionListRes = this.getPartitionsFromZKClientByClusterTopicName(clusterPhy, topicName);
if (partitionListRes.successful()){
partitionMap.put(topicName, partitionListRes.getData());
}
}
return Result.buildSuc(partitionMap);
} catch (Exception e) {
log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}
}
private Result<List<Partition>> getPartitionsFromAdminClientByClusterTopicName(ClusterPhy clusterPhy, String topicName) {
try {
AdminClient adminClient = kafkaAdminClient.getClient(clusterPhy.getId());
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(
Arrays.asList(topicName),
new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);
TopicDescription description = describeTopicsResult.all().get().get(topicName);
return Result.buildSuc(PartitionConverter.convert2PartitionList(clusterPhy.getId(), description));
}catch (Exception e) {
log.error("class=PartitionServiceImpl||method=getPartitionsFromAdminClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e);
return Result.buildFailure(ResultStatus.KAFKA_OPERATE_FAILED);
}
}
private Result<List<Partition>> getPartitionsFromZKClientByClusterTopicName(ClusterPhy clusterPhy, String topicName) {
try {
PartitionMap zkPartitionMap = kafkaZKDAO.getData(clusterPhy.getId(), TopicZNode.path(topicName), PartitionMap.class);
List<Partition> partitionList = new ArrayList<>();
List<String> partitionIdList = kafkaZKDAO.getChildren(clusterPhy.getId(), TopicPartitionsZNode.path(topicName), false);
for (String partitionId: partitionIdList) {
PartitionState partitionState = kafkaZKDAO.getData(clusterPhy.getId(), TopicPartitionStateZNode.path(new TopicPartition(topicName, Integer.valueOf(partitionId))), PartitionState.class);
Partition partition = new Partition();
partition.setClusterPhyId(clusterPhy.getId());
partition.setTopicName(topicName);
@@ -430,17 +466,11 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
partition.setLeaderBrokerId(partitionState.getLeader());
partition.setInSyncReplicaList(partitionState.getIsr());
partition.setAssignReplicaList(zkPartitionMap.getPartitionAssignReplicas(Integer.valueOf(partitionId)));
partitionList.add(partition);
}
partitionMap.put(topicName, partitionList);
}
return Result.buildSuc(partitionMap);
return Result.buildSuc(partitionList);
} catch (Exception e) {
log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e);
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}
}