From 88b5833f77c6c99bf7c48b86665bec2e4c846071 Mon Sep 17 00:00:00 2001 From: renxiangde <1010478766@qq.com> Date: Mon, 24 Oct 2022 23:28:47 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=20=E4=BF=AE=E5=A4=8D=E6=96=B0=E5=BB=BA?= =?UTF-8?q?Topic=E5=90=8E=EF=BC=8C=E7=AB=8B=E5=8D=B3=E6=9F=A5=E7=9C=8BTopi?= =?UTF-8?q?c-Messages=E4=BF=A1=E6=81=AF=E4=BC=9A=E6=8F=90=E7=A4=BATopic?= =?UTF-8?q?=E4=B8=8D=E5=AD=98=E5=9C=A8=E7=9A=84=E9=97=AE=E9=A2=98=20(#697)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/biz/topic/impl/OpTopicManagerImpl.java | 24 ++++++- .../service/partition/PartitionService.java | 2 + .../partition/impl/PartitionServiceImpl.java | 66 ++++++++++++++----- 3 files changed, 73 insertions(+), 19 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java index 3b2d526f..5d27ed74 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java @@ -10,14 +10,18 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; +import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReplicaAssignUtil; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.OpTopicService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import kafka.admin.AdminUtils; @@ -52,6 +56,9 @@ public class OpTopicManagerImpl implements OpTopicManager { @Autowired private ClusterPhyService clusterPhyService; + @Autowired + private PartitionService partitionService; + @Override public Result createTopic(TopicCreateDTO dto, String operator) { log.info("method=createTopic||param={}||operator={}.", dto, operator); @@ -80,7 +87,7 @@ public class OpTopicManagerImpl implements OpTopicManager { ); // 创建Topic - return opTopicService.createTopic( + Result createTopicRes = opTopicService.createTopic( new TopicCreateParam( dto.getClusterId(), dto.getTopicName(), @@ -90,6 +97,21 @@ public class OpTopicManagerImpl implements OpTopicManager { ), operator ); + if (createTopicRes.successful()){ + try{ + FutureUtil.quickStartupFutureUtil.submitTask(() -> { + BackoffUtils.backoff(3000); + Result> partitionsResult = partitionService.listPartitionsFromKafka(clusterPhy, dto.getTopicName()); + if (partitionsResult.successful()){ + partitionService.updatePartitions(clusterPhy.getId(), dto.getTopicName(), partitionsResult.getData(), new ArrayList<>()); + } + }); + }catch (Exception e) { + log.error("method=createTopic||param={}||operator={}||msg=add partition to db failed||errMsg=exception", dto, operator, e); + return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, "Topic创建成功,但记录Partition到DB中失败,等待定时任务同步partition信息"); + } + } + return createTopicRes; } @Override diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionService.java index 129bc3bc..ae68dccf 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionService.java @@ -14,6 +14,8 @@ import java.util.Set; public interface PartitionService { Result>> listPartitionsFromKafka(ClusterPhy clusterPhy); + Result> listPartitionsFromKafka(ClusterPhy clusterPhy, String topicName); + List listPartitionByCluster(Long clusterPhyId); List listPartitionPOByCluster(Long clusterPhyId); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java index 1795e4d4..62d24feb 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java @@ -98,6 +98,15 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P return this.getPartitionsFromAdminClient(clusterPhy); } + @Override + public Result> listPartitionsFromKafka(ClusterPhy clusterPhy, String topicName) { + if (clusterPhy.getRunState().equals(ClusterRunStateEnum.RUN_ZK.getRunState())) { + return this.getPartitionsFromZKClientByClusterTopicName(clusterPhy,topicName); + } + return this.getPartitionsFromAdminClientByClusterTopicName(clusterPhy,topicName); + + } + @Override public List listPartitionByCluster(Long clusterPhyId) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); @@ -392,14 +401,12 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P // 获取Topic列表 ListTopicsResult listTopicsResult = adminClient.listTopics(new ListTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).listInternal(true)); for (String topicName: listTopicsResult.names().get()) { - DescribeTopicsResult describeTopicsResult = adminClient.describeTopics( - Arrays.asList(topicName), - new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) - ); - - TopicDescription description = describeTopicsResult.all().get().get(topicName); - - partitionMap.put(topicName, PartitionConverter.convert2PartitionList(clusterPhy.getId(), description)); + Result> partitionListRes = this.getPartitionsFromAdminClientByClusterTopicName(clusterPhy, topicName); + if (partitionListRes.successful()){ + partitionMap.put(topicName, partitionListRes.getData()); + }else { + return Result.buildFromIgnoreData(partitionListRes); + } } return Result.buildSuc(partitionMap); @@ -416,13 +423,42 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P try { List topicNameList = kafkaZKDAO.getChildren(clusterPhy.getId(), TopicsZNode.path(), false); for (String topicName: topicNameList) { - PartitionMap zkPartitionMap = kafkaZKDAO.getData(clusterPhy.getId(), TopicZNode.path(topicName), PartitionMap.class); + Result> partitionListRes = this.getPartitionsFromZKClientByClusterTopicName(clusterPhy, topicName); + if (partitionListRes.successful()){ + partitionMap.put(topicName, partitionListRes.getData()); + } + } + return Result.buildSuc(partitionMap); + } catch (Exception e) { + log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); + } + } + + private Result> getPartitionsFromAdminClientByClusterTopicName(ClusterPhy clusterPhy, String topicName) { + + try { + AdminClient adminClient = kafkaAdminClient.getClient(clusterPhy.getId()); + DescribeTopicsResult describeTopicsResult = adminClient.describeTopics( + Arrays.asList(topicName), + new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + TopicDescription description = describeTopicsResult.all().get().get(topicName); + return Result.buildSuc(PartitionConverter.convert2PartitionList(clusterPhy.getId(), description)); + }catch (Exception e) { + log.error("class=PartitionServiceImpl||method=getPartitionsFromAdminClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e); + return Result.buildFailure(ResultStatus.KAFKA_OPERATE_FAILED); + } + } + + private Result> getPartitionsFromZKClientByClusterTopicName(ClusterPhy clusterPhy, String topicName) { + try { + PartitionMap zkPartitionMap = kafkaZKDAO.getData(clusterPhy.getId(), TopicZNode.path(topicName), PartitionMap.class); List partitionList = new ArrayList<>(); List partitionIdList = kafkaZKDAO.getChildren(clusterPhy.getId(), TopicPartitionsZNode.path(topicName), false); for (String partitionId: partitionIdList) { PartitionState partitionState = kafkaZKDAO.getData(clusterPhy.getId(), TopicPartitionStateZNode.path(new TopicPartition(topicName, Integer.valueOf(partitionId))), PartitionState.class); - Partition partition = new Partition(); partition.setClusterPhyId(clusterPhy.getId()); partition.setTopicName(topicName); @@ -430,17 +466,11 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P partition.setLeaderBrokerId(partitionState.getLeader()); partition.setInSyncReplicaList(partitionState.getIsr()); partition.setAssignReplicaList(zkPartitionMap.getPartitionAssignReplicas(Integer.valueOf(partitionId))); - partitionList.add(partition); } - - partitionMap.put(topicName, partitionList); - } - - return Result.buildSuc(partitionMap); + return Result.buildSuc(partitionList); } catch (Exception e) { - log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); - + log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } }