mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-05 21:12:13 +08:00
[ISSUE-624]过滤掉不存在的Topic(#624)
同步Group元信息时,如果Topic已经不存在了,则过滤掉该Group+Topic信息
This commit is contained in:
@@ -12,10 +12,10 @@ import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||||
|
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||||
import org.apache.kafka.clients.admin.*;
|
import org.apache.kafka.clients.admin.*;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@@ -33,6 +33,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private GroupService groupService;
|
private GroupService groupService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TopicService topicService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||||
|
|
||||||
@@ -53,6 +56,7 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
|||||||
private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List<String> groupNameList, long triggerTimeUnitMs) {
|
private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List<String> groupNameList, long triggerTimeUnitMs) {
|
||||||
List<GroupMemberPO> groupMemberPOList = new ArrayList<>();
|
List<GroupMemberPO> groupMemberPOList = new ArrayList<>();
|
||||||
TaskResult tr = TaskResult.SUCCESS;
|
TaskResult tr = TaskResult.SUCCESS;
|
||||||
|
|
||||||
for (String groupName : groupNameList) {
|
for (String groupName : groupNameList) {
|
||||||
try {
|
try {
|
||||||
List<GroupMemberPO> poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs));
|
List<GroupMemberPO> poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs));
|
||||||
@@ -62,7 +66,10 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
|||||||
tr = TaskResult.FAIL;
|
tr = TaskResult.FAIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
groupMemberPOList = this.filterGroupIfTopicNotExist(clusterPhy.getId(), groupMemberPOList);
|
||||||
groupService.batchReplace(groupMemberPOList);
|
groupService.batchReplace(groupMemberPOList);
|
||||||
|
|
||||||
return tr;
|
return tr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,7 +78,7 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
|||||||
|
|
||||||
// 获取消费组消费过哪些Topic
|
// 获取消费组消费过哪些Topic
|
||||||
Map<TopicPartition, Long> offsetMap = groupService.getGroupOffset(clusterPhyId, groupName);
|
Map<TopicPartition, Long> offsetMap = groupService.getGroupOffset(clusterPhyId, groupName);
|
||||||
for (TopicPartition topicPartition: offsetMap.keySet()) {
|
for (TopicPartition topicPartition : offsetMap.keySet()) {
|
||||||
GroupMemberPO po = groupMap.get(topicPartition.topic());
|
GroupMemberPO po = groupMap.get(topicPartition.topic());
|
||||||
if (po == null) {
|
if (po == null) {
|
||||||
po = new GroupMemberPO(clusterPhyId, topicPartition.topic(), groupName, updateTime);
|
po = new GroupMemberPO(clusterPhyId, topicPartition.topic(), groupName, updateTime);
|
||||||
@@ -94,7 +101,7 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Set<String> topicNameSet = partitionList.stream().map(elem -> elem.topic()).collect(Collectors.toSet());
|
Set<String> topicNameSet = partitionList.stream().map(elem -> elem.topic()).collect(Collectors.toSet());
|
||||||
for (String topicName: topicNameSet) {
|
for (String topicName : topicNameSet) {
|
||||||
groupMap.putIfAbsent(topicName, new GroupMemberPO(clusterPhyId, topicName, groupName, updateTime));
|
groupMap.putIfAbsent(topicName, new GroupMemberPO(clusterPhyId, topicName, groupName, updateTime));
|
||||||
|
|
||||||
GroupMemberPO po = groupMap.get(topicName);
|
GroupMemberPO po = groupMap.get(topicName);
|
||||||
@@ -112,4 +119,17 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
|||||||
|
|
||||||
return new ArrayList<>(groupMap.values());
|
return new ArrayList<>(groupMap.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<GroupMemberPO> filterGroupIfTopicNotExist(Long clusterPhyId, List<GroupMemberPO> poList) {
|
||||||
|
if (poList.isEmpty()) {
|
||||||
|
return poList;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 集群Topic集合
|
||||||
|
Set<String> dbTopicSet = topicService.listTopicsFromDB(clusterPhyId).stream().map(elem -> elem.getTopicName()).collect(Collectors.toSet());
|
||||||
|
dbTopicSet.add(""); //兼容没有消费Topic的group
|
||||||
|
|
||||||
|
// 过滤Topic不存在的消费组
|
||||||
|
return poList.stream().filter(elem -> dbTopicSet.contains(elem.getTopicName())).collect(Collectors.toList());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user