From b4d44ef8c7333d7c6c1198151216d920f6b95582 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Fri, 23 Sep 2022 17:02:25 +0800 Subject: [PATCH] =?UTF-8?q?DB=E4=B8=ADGroup=E4=BF=A1=E6=81=AF=E7=9A=84?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=96=B9=E5=BC=8F=EF=BC=8C=E7=94=B1replace?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=B8=BAinsert=E6=88=96update?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/core/service/group/GroupService.java | 4 ++ .../service/group/impl/GroupServiceImpl.java | 41 +++++++++++++++++++ .../km/task/metadata/SyncKafkaGroupTask.java | 32 +++++++-------- 3 files changed, 60 insertions(+), 17 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java index ce6d13a5..790a7c47 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java @@ -30,10 +30,14 @@ public interface GroupService { int replaceDBData(GroupMemberPO groupMemberPO); + void batchReplace(List newGroupMemberList); + GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName); List listGroupByTopic(Long clusterPhyId, String topicName); + List listGroup(Long clusterPhyId); + PaginationResult pagingGroupMembers(Long clusterPhyId, String topicName, String groupName, diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java index b029330c..4cf29d2a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java @@ -32,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_SEARCH_GROUP; @@ -120,6 +121,38 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group return groupMemberDAO.replace(groupMemberPO); } + @Override + public void batchReplace(List newGroupMemberList) { + if (newGroupMemberList == null || newGroupMemberList.isEmpty()) { + return; + } + + Long clusterPhyId = newGroupMemberList.get(0).getClusterPhyId(); + if (clusterPhyId == null) { + return; + } + + List dbGroupMemberList = listGroup(clusterPhyId); + + + Map dbGroupMemberMap = dbGroupMemberList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity())); + for (GroupMemberPO groupMemberPO : newGroupMemberList) { + GroupMemberPO po = dbGroupMemberMap.remove(groupMemberPO.getGroupName() + groupMemberPO.getTopicName()); + try { + if (po != null) { + groupMemberPO.setId(po.getId()); + groupMemberDAO.updateById(groupMemberPO); + } else { + groupMemberDAO.insert(groupMemberPO); + } + } catch (Exception e) { + log.error("method=batchReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, groupMemberPO.getGroupName(), e); + } + + } + + } + @Override public GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); @@ -143,6 +176,14 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group return groupMemberDAO.selectList(lambdaQueryWrapper); } + @Override + public List listGroup(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); + + return groupMemberDAO.selectList(lambdaQueryWrapper); + } + @Override public PaginationResult pagingGroupMembers(Long clusterPhyId, String topicName, diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java index 65b64b96..5951d3c0 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java @@ -15,6 +15,7 @@ import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.stream.Collectors; @@ -34,14 +35,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { @Override public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - TaskResult tr = TaskResult.SUCCESS; List groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId()); - for (String groupName: groupNameList) { - if (!TaskResult.SUCCESS.equals(this.updateGroupMembersTask(clusterPhy, groupName, triggerTimeUnitMs))) { - tr = TaskResult.FAIL; - } - } + TaskResult tr = updateGroupMembersTask(clusterPhy, groupNameList, triggerTimeUnitMs); if (!TaskResult.SUCCESS.equals(tr)) { return tr; @@ -53,19 +49,21 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { return tr; } - private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, String groupName, long triggerTimeUnitMs) { - try { - List poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs)); - for (GroupMemberPO po: poList) { - groupService.replaceDBData(po); + + private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List groupNameList, long triggerTimeUnitMs) { + List groupMemberPOList = new ArrayList<>(); + TaskResult tr = TaskResult.SUCCESS; + for (String groupName : groupNameList) { + try { + List poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs)); + groupMemberPOList.addAll(poList); + } catch (Exception e) { + log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhy.getId(), groupName, e); + tr = TaskResult.FAIL; } - } catch (Exception e) { - log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg={}", clusterPhy.getId(), groupName, e.getMessage()); - - return TaskResult.FAIL; } - - return TaskResult.SUCCESS; + groupService.batchReplace(groupMemberPOList); + return tr; } private List getGroupMembers(Long clusterPhyId, String groupName, Date updateTime) throws NotExistException, AdminOperateException {