From 9d33c725ad52045b20fc61d934841e62e9c2b686 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 28 Sep 2022 10:39:33 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE-624]=E8=BF=87=E6=BB=A4=E6=8E=89=E4=B8=8D?= =?UTF-8?q?=E5=AD=98=E5=9C=A8=E7=9A=84Topic(#624)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 同步Group元信息时,如果Topic已经不存在了,则过滤掉该Group+Topic信息 --- .../km/task/metadata/SyncKafkaGroupTask.java | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java index 5951d3c0..e2f749fe 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java @@ -12,10 +12,10 @@ import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; +import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.stream.Collectors; @@ -33,6 +33,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { @Autowired private GroupService groupService; + @Autowired + private TopicService topicService; + @Override public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { @@ -53,6 +56,7 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List groupNameList, long triggerTimeUnitMs) { List groupMemberPOList = new ArrayList<>(); TaskResult tr = TaskResult.SUCCESS; + for (String groupName : groupNameList) { try { List poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs)); @@ -62,7 +66,10 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { tr = TaskResult.FAIL; } } + + groupMemberPOList = this.filterGroupIfTopicNotExist(clusterPhy.getId(), groupMemberPOList); groupService.batchReplace(groupMemberPOList); + return tr; } @@ -71,7 +78,7 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { // 获取消费组消费过哪些Topic Map offsetMap = groupService.getGroupOffset(clusterPhyId, groupName); - for (TopicPartition topicPartition: offsetMap.keySet()) { + for (TopicPartition topicPartition : offsetMap.keySet()) { GroupMemberPO po = groupMap.get(topicPartition.topic()); if (po == null) { po = new GroupMemberPO(clusterPhyId, topicPartition.topic(), groupName, updateTime); @@ -94,7 +101,7 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { } Set topicNameSet = partitionList.stream().map(elem -> elem.topic()).collect(Collectors.toSet()); - for (String topicName: topicNameSet) { + for (String topicName : topicNameSet) { groupMap.putIfAbsent(topicName, new GroupMemberPO(clusterPhyId, topicName, groupName, updateTime)); GroupMemberPO po = groupMap.get(topicName); @@ -112,4 +119,17 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { return new ArrayList<>(groupMap.values()); } + + private List filterGroupIfTopicNotExist(Long clusterPhyId, List poList) { + if (poList.isEmpty()) { + return poList; + } + + // 集群Topic集合 + Set dbTopicSet = topicService.listTopicsFromDB(clusterPhyId).stream().map(elem -> elem.getTopicName()).collect(Collectors.toSet()); + dbTopicSet.add(""); //兼容没有消费Topic的group + + // 过滤Topic不存在的消费组 + return poList.stream().filter(elem -> dbTopicSet.contains(elem.getTopicName())).collect(Collectors.toList()); + } }