diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java index 7992ac17..432f061c 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java @@ -7,6 +7,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import java.util.Date; +import java.util.Objects; @Data @NoArgsConstructor @@ -37,4 +38,16 @@ public class GroupMemberPO extends BasePO { this.memberCount = memberCount; this.updateTime = updateTime; } + + public boolean equal2GroupMemberPO(GroupMemberPO that) { + if (that == null) { + return false; + } + + return Objects.equals(clusterPhyId, that.clusterPhyId) + && Objects.equals(topicName, that.topicName) + && Objects.equals(groupName, that.groupName) + && Objects.equals(state, that.state) + && Objects.equals(memberCount, that.memberCount); + } } \ No newline at end of file diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java index 49ac5bf3..53b925d4 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java @@ -9,6 +9,8 @@ import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum; import lombok.Data; import lombok.NoArgsConstructor; +import java.util.Objects; + @Data @NoArgsConstructor @@ -58,4 +60,18 @@ public class GroupPO extends BasePO { */ private int coordinatorId; + public boolean equal2GroupPO(GroupPO groupPO) { + if (groupPO == null) { + return false; + } + + return coordinatorId == groupPO.coordinatorId + && Objects.equals(clusterPhyId, groupPO.clusterPhyId) + && Objects.equals(type, groupPO.type) + && Objects.equals(name, groupPO.name) + && Objects.equals(state, groupPO.state) + && Objects.equals(memberCount, groupPO.memberCount) + && Objects.equals(topicMembers, groupPO.topicMembers) + && Objects.equals(partitionAssignor, groupPO.partitionAssignor); + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java index 131bd243..c203b3df 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java @@ -10,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import java.util.ArrayList; +import java.util.Date; import java.util.stream.Collectors; /** @@ -57,6 +58,7 @@ public class GroupConverter { po.setTopicMembers(ConvertUtil.obj2Json(group.getTopicMembers())); po.setType(group.getType().getCode()); po.setState(group.getState().getState()); + po.setUpdateTime(new Date()); return po; } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java index 47317c80..c2cb7180 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java @@ -12,9 +12,9 @@ import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import org.apache.kafka.common.TopicPartition; -import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Set; public interface GroupService { /** @@ -35,10 +35,11 @@ public interface GroupService { /** * 批量更新DB + * @param clusterPhyId 集群ID + * @param newGroupList 新的group列表 + * @param getFailedGroupSet 元信息获取失败的group列表 */ - void batchReplaceGroupsAndMembers(Long clusterPhyId, List newGroupList, long updateTime); - - int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime); + void batchReplaceGroupsAndMembers(Long clusterPhyId, List newGroupList, Set getFailedGroupSet); /** * DB-Group相关接口 diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java index 15dc2108..21511a96 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java @@ -49,7 +49,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT @Service public class GroupServiceImpl extends BaseKafkaVersionControlService implements GroupService { - private static final ILog log = LogFactory.getLog(GroupServiceImpl.class); + private static final ILog LOGGER = LogFactory.getLog(GroupServiceImpl.class); @Autowired private GroupDAO groupDAO; @@ -92,7 +92,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements return groupNameList; } catch (Exception e) { - log.error("method=listGroupsFromKafka||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + LOGGER.error("method=listGroupsFromKafka||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED); } finally { @@ -142,7 +142,8 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements member.setMemberCount(member.getMemberCount() + 1); } } - group.setTopicMembers(memberMap.values().stream().collect(Collectors.toList())); + + group.setTopicMembers(new ArrayList<>(memberMap.values())); return group; } @@ -161,7 +162,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements return offsetMap; } catch (Exception e) { - log.error("method=getGroupOffset||clusterPhyId={}|groupName={}||errMsg=exception!", clusterPhyId, groupName, e); + LOGGER.error("method=getGroupOffset||clusterPhyId={}|groupName={}||errMsg=exception!", clusterPhyId, groupName, e); throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED); } @@ -187,7 +188,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements return describeGroupsResult.all().get().get(groupName); } catch(Exception e){ - log.error("method=getGroupDescription||clusterPhyId={}|groupName={}||errMsg=exception!", clusterPhy.getId(), groupName, e); + LOGGER.error("method=getGroupDescription||clusterPhyId={}|groupName={}||errMsg=exception!", clusterPhy.getId(), groupName, e); throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED); } finally { @@ -202,12 +203,12 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements } @Override - public void batchReplaceGroupsAndMembers(Long clusterPhyId, List newGroupList, long updateTime) { + public void batchReplaceGroupsAndMembers(Long clusterPhyId, List newGroupList, Set getFailedGroupSet) { // 更新Group信息 - this.batchReplaceGroups(clusterPhyId, newGroupList, updateTime); + this.batchReplaceGroups(clusterPhyId, newGroupList, getFailedGroupSet); // 更新Group-Topic信息 - this.batchReplaceGroupMembers(clusterPhyId, newGroupList, updateTime); + this.batchReplaceGroupMembers(clusterPhyId, newGroupList, getFailedGroupSet); } @Override @@ -283,21 +284,6 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements return groupDAO.selectList(lambdaQueryWrapper).stream().map(elem -> GroupConverter.convert2Group(elem)).collect(Collectors.toList()); } - @Override - public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) { - // 删除过期Group信息 - LambdaQueryWrapper groupPOLambdaQueryWrapper = new LambdaQueryWrapper<>(); - groupPOLambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId); - groupPOLambdaQueryWrapper.le(GroupPO::getUpdateTime, beforeTime); - groupDAO.delete(groupPOLambdaQueryWrapper); - - // 删除过期GroupMember信息 - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); - queryWrapper.le(GroupMemberPO::getUpdateTime, beforeTime); - return groupMemberDAO.delete(queryWrapper); - } - @Override public List getGroupsFromDB(Long clusterPhyId) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); @@ -368,7 +354,7 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements return Result.buildSuc(); } catch(Exception e){ - log.error("method=resetGroupOffsets||clusterPhyId={}|groupName={}||errMsg=exception!", clusterPhyId, groupName, e); + LOGGER.error("method=resetGroupOffsets||clusterPhyId={}|groupName={}||errMsg=exception!", clusterPhyId, groupName, e); throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED); } @@ -378,62 +364,96 @@ public class GroupServiceImpl extends BaseKafkaVersionControlService implements /**************************************************** private method ****************************************************/ - private void batchReplaceGroupMembers(Long clusterPhyId, List newGroupList, long updateTime) { - if (ValidateUtils.isEmptyList(newGroupList)) { - return; - } - - List dbPOList = this.listClusterGroupsMemberPO(clusterPhyId); - Map dbPOMap = dbPOList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity())); + private void batchReplaceGroupMembers(Long clusterPhyId, List newGroupList, Set getFailedGroupSet) { + // DB 中的数据 + Map dbPOMap = this.listClusterGroupsMemberPO(clusterPhyId) + .stream() + .collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity())); + // 进行数据的更新 for (Group group: newGroupList) { for (GroupTopicMember member : group.getTopicMembers()) { try { - GroupMemberPO newPO = new GroupMemberPO(clusterPhyId, member.getTopicName(), group.getName(), group.getState().getState(), member.getMemberCount(), new Date(updateTime)); + GroupMemberPO newPO = new GroupMemberPO(clusterPhyId, member.getTopicName(), group.getName(), group.getState().getState(), member.getMemberCount(), new Date()); GroupMemberPO dbPO = dbPOMap.remove(newPO.getGroupName() + newPO.getTopicName()); - if (dbPO != null) { + if (dbPO == null) { + // 数据不存在则直接写入 + groupMemberDAO.insert(newPO); + } else if (!dbPO.equal2GroupMemberPO(newPO)) { + // 数据发生了变化则进行更新 newPO.setId(dbPO.getId()); groupMemberDAO.updateById(newPO); - continue; } - - groupMemberDAO.insert(newPO); } catch (Exception e) { - log.error( + LOGGER.error( "method=batchReplaceGroupMembers||clusterPhyId={}||groupName={}||topicName={}||errMsg=exception", clusterPhyId, group.getName(), member.getTopicName(), e ); } } } + + // 删除剩余不存在的 + dbPOMap.values().forEach(elem -> { + try { + if (getFailedGroupSet.contains(elem.getGroupName())) { + // 该group信息获取失败,所以忽略对该数据的删除 + return; + } + + groupDAO.deleteById(elem.getId()); + } catch (Exception e) { + LOGGER.error( + "method=batchReplaceGroupMembers||clusterPhyId={}||groupName={}||topicName={}||msg=delete expired group data in db failed||errMsg=exception", + clusterPhyId, elem.getGroupName(), elem.getTopicName(), e + ); + } + }); } - private void batchReplaceGroups(Long clusterPhyId, List newGroupList, long updateTime) { - if (ValidateUtils.isEmptyList(newGroupList)) { - return; - } - - List dbGroupList = this.listClusterGroupsPO(clusterPhyId); - Map dbGroupMap = dbGroupList.stream().collect(Collectors.toMap(elem -> elem.getName(), Function.identity())); + private void batchReplaceGroups(Long clusterPhyId, List newGroupList, Set getFailedGroupSet) { + // 获取 DB 中的数据 + Map dbGroupMap = this.listClusterGroupsPO(clusterPhyId) + .stream() + .collect(Collectors.toMap(elem -> elem.getName(), Function.identity())); + // 进行数据的更新 for (Group newGroup: newGroupList) { try { - GroupPO newPO = GroupConverter.convert2GroupPO(newGroup); - newPO.setUpdateTime(new Date(updateTime)); - GroupPO dbPO = dbGroupMap.remove(newGroup.getName()); - if (dbPO != null) { - newPO.setId(dbPO.getId()); - groupDAO.updateById(newPO); + if (dbPO == null) { + // 一条新的数据,则直接insert + groupDAO.insert(GroupConverter.convert2GroupPO(newGroup)); continue; } - groupDAO.insert(newPO); + GroupPO newPO = GroupConverter.convert2GroupPO(newGroup); + if (!newPO.equal2GroupPO(dbPO)) { + // 如果不相等,则直接更新 + newPO.setId(dbPO.getId()); + groupDAO.updateById(newPO); + } + + // 其他情况,则不需要进行任何操作 } catch (Exception e) { - log.error("method=batchGroupReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, newGroup.getName(), e); + LOGGER.error("method=batchReplaceGroups||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, newGroup.getName(), e); } } + + // 删除剩余不存在的 + dbGroupMap.values().forEach(elem -> { + try { + if (getFailedGroupSet.contains(elem.getName())) { + // 该group信息获取失败,所以忽略对该数据的删除 + return; + } + + groupDAO.deleteById(elem.getId()); + } catch (Exception e) { + LOGGER.error("method=batchReplaceGroups||clusterPhyId={}||groupName={}||msg=delete expired group data in db failed||errMsg=exception", clusterPhyId, elem.getName(), e); + } + }); } private List listClusterGroupsPO(Long clusterPhyId) { diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaGroupTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaGroupTask.java index 521e1f84..9358993e 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaGroupTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaGroupTask.java @@ -36,7 +36,7 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { // 获取集群的Group列表 List groupNameList = groupService.listGroupsFromKafka(clusterPhy); - TaskResult allSuccess = TaskResult.SUCCESS; + Set getFailedGroupSet = new HashSet<>(); // 获取Group详细信息 List groupList = new ArrayList<>(); @@ -44,13 +44,16 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { try { Group group = groupService.getGroupFromKafka(clusterPhy, groupName); if (group == null) { + // 获取到为空的 group 信息,直接忽略不要 continue; } groupList.add(group); } catch (Exception e) { log.error("method=processClusterTask||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhy.getId(), groupName, e); - allSuccess = TaskResult.FAIL; + + // 记录获取失败的 group 信息 + getFailedGroupSet.add(groupName); } } @@ -58,17 +61,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { this.filterTopicIfTopicNotExist(clusterPhy.getId(), groupList); // 更新DB中的Group信息 - groupService.batchReplaceGroupsAndMembers(clusterPhy.getId(), groupList, triggerTimeUnitMs); + groupService.batchReplaceGroupsAndMembers(clusterPhy.getId(), groupList, getFailedGroupSet); - // 如果存在错误,则直接返回 - if (!TaskResult.SUCCESS.equals(allSuccess)) { - return allSuccess; - } - - // 删除历史的Group - groupService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 5 * 60 * 1000)); - - return allSuccess; + return getFailedGroupSet.isEmpty()? TaskResult.SUCCESS: TaskResult.FAIL; } private void filterTopicIfTopicNotExist(Long clusterPhyId, List groupList) {