diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java index ce6d13a5..790a7c47 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java @@ -30,10 +30,14 @@ public interface GroupService { int replaceDBData(GroupMemberPO groupMemberPO); + void batchReplace(List newGroupMemberList); + GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName); List listGroupByTopic(Long clusterPhyId, String topicName); + List listGroup(Long clusterPhyId); + PaginationResult pagingGroupMembers(Long clusterPhyId, String topicName, String groupName, diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java index b029330c..4cf29d2a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java @@ -32,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_SEARCH_GROUP; @@ -120,6 +121,38 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group return groupMemberDAO.replace(groupMemberPO); } + @Override + public void batchReplace(List newGroupMemberList) { + if (newGroupMemberList == null || newGroupMemberList.isEmpty()) { + return; + } + + Long clusterPhyId = newGroupMemberList.get(0).getClusterPhyId(); + if (clusterPhyId == null) { + return; + } + + List dbGroupMemberList = listGroup(clusterPhyId); + + + Map dbGroupMemberMap = dbGroupMemberList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity())); + for (GroupMemberPO groupMemberPO : newGroupMemberList) { + GroupMemberPO po = dbGroupMemberMap.remove(groupMemberPO.getGroupName() + groupMemberPO.getTopicName()); + try { + if (po != null) { + groupMemberPO.setId(po.getId()); + groupMemberDAO.updateById(groupMemberPO); + } else { + groupMemberDAO.insert(groupMemberPO); + } + } catch (Exception e) { + log.error("method=batchReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, groupMemberPO.getGroupName(), e); + } + + } + + } + @Override public GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); @@ -143,6 +176,14 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group return groupMemberDAO.selectList(lambdaQueryWrapper); } + @Override + public List listGroup(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); + + return groupMemberDAO.selectList(lambdaQueryWrapper); + } + @Override public PaginationResult pagingGroupMembers(Long clusterPhyId, String topicName, diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java index 65b64b96..5951d3c0 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java @@ -15,6 +15,7 @@ import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.stream.Collectors; @@ -34,14 +35,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { @Override public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - TaskResult tr = TaskResult.SUCCESS; List groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId()); - for (String groupName: groupNameList) { - if (!TaskResult.SUCCESS.equals(this.updateGroupMembersTask(clusterPhy, groupName, triggerTimeUnitMs))) { - tr = TaskResult.FAIL; - } - } + TaskResult tr = updateGroupMembersTask(clusterPhy, groupNameList, triggerTimeUnitMs); if (!TaskResult.SUCCESS.equals(tr)) { return tr; @@ -53,19 +49,21 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { return tr; } - private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, String groupName, long triggerTimeUnitMs) { - try { - List poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs)); - for (GroupMemberPO po: poList) { - groupService.replaceDBData(po); + + private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List groupNameList, long triggerTimeUnitMs) { + List groupMemberPOList = new ArrayList<>(); + TaskResult tr = TaskResult.SUCCESS; + for (String groupName : groupNameList) { + try { + List poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs)); + groupMemberPOList.addAll(poList); + } catch (Exception e) { + log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhy.getId(), groupName, e); + tr = TaskResult.FAIL; } - } catch (Exception e) { - log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg={}", clusterPhy.getId(), groupName, e.getMessage()); - - return TaskResult.FAIL; } - - return TaskResult.SUCCESS; + groupService.batchReplace(groupMemberPOList); + return tr; } private List getGroupMembers(Long clusterPhyId, String groupName, Date updateTime) throws NotExistException, AdminOperateException {