From 79864955e1f493a0a8aab2da47bd1ae64514e542 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 20 Oct 2022 11:48:27 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]=20=E9=9B=86=E7=BE=A4Group=E5=88=97?= =?UTF-8?q?=E8=A1=A8=E6=8C=89=E7=85=A7Group=E7=BB=B4=E5=BA=A6=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E5=B1=95=E7=A4=BA=20(#580)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../streaming/km/biz/group/GroupManager.java | 9 + .../km/biz/group/impl/GroupManagerImpl.java | 122 ++++++++-- .../km/biz/topic/TopicStateManager.java | 6 +- .../biz/topic/impl/TopicStateManagerImpl.java | 25 ++ .../dto/cluster/ClusterGroupSummaryDTO.java | 18 ++ .../km/common/bean/entity/group/Group.java | 74 ++++++ .../bean/entity/group/GroupTopicMember.java | 27 +++ .../common/bean/po/group/GroupMemberPO.java | 14 +- .../km/common/bean/po/group/GroupPO.java | 61 +++++ .../common/bean/vo/group/GroupOverviewVO.java | 27 +++ .../bean/vo/group/GroupTopicOverviewVO.java | 2 +- .../common/constant/PaginationConstant.java | 10 + .../km/common/converter/GroupConverter.java | 62 +++++ .../km/common/enums/group/GroupTypeEnum.java | 36 +++ .../km/core/service/group/GroupService.java | 57 +++-- .../group/impl/GroupMetricServiceImpl.java | 3 +- .../service/group/impl/GroupServiceImpl.java | 226 +++++++++++++----- .../km/persistence/mysql/group/GroupDAO.java | 9 + .../v3/cluster/ClusterGroupsController.java | 22 +- .../km/rest/api/v3/group/GroupController.java | 2 +- .../api/v3/topic/TopicStateController.java | 15 +- .../km/task/metadata/SyncKafkaGroupTask.java | 123 +++------- 22 files changed, 744 insertions(+), 206 deletions(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterGroupSummaryDTO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/group/Group.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/group/GroupTopicMember.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/group/GroupOverviewVO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/group/GroupTypeEnum.java create mode 100644 km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/group/GroupDAO.java diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java index 5c1518ca..a3686c03 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java @@ -1,11 +1,14 @@ package com.xiaojukeji.know.streaming.km.biz.group; +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS; +import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedDetailVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; @@ -22,6 +25,10 @@ public interface GroupManager { String searchGroupKeyword, PaginationBaseDTO dto); + PaginationResult pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto); + + PaginationResult pagingClusterGroupsOverview(Long clusterPhyId, ClusterGroupSummaryDTO dto); + PaginationResult pagingGroupTopicConsumedMetrics(Long clusterPhyId, String topicName, String groupName, @@ -31,4 +38,6 @@ public interface GroupManager { Result> listClusterPhyGroupPartitions(Long clusterPhyId, String groupName, Long startTime, Long endTime); Result resetGroupOffsets(GroupOffsetResetDTO dto, String operator) throws Exception; + + List getGroupTopicOverviewVOList (Long clusterPhyId, List groupMemberPOList); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index ff1f476b..97d464ed 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -3,11 +3,14 @@ package com.xiaojukeji.know.streaming.km.biz.group.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.biz.group.GroupManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group; import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic; +import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -15,11 +18,15 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedDetailVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.constant.PaginationConstant; +import com.xiaojukeji.know.streaming.km.common.converter.GroupConverter; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; +import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; @@ -71,30 +78,60 @@ public class GroupManagerImpl implements GroupManager { String searchGroupKeyword, PaginationBaseDTO dto) { PaginationResult paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, groupName, searchTopicKeyword, searchGroupKeyword, dto); - if (paginationResult.failed()) { - return PaginationResult.buildFailure(paginationResult, dto); - } if (!paginationResult.hasData()) { return PaginationResult.buildSuc(new ArrayList<>(), paginationResult); } - // 获取指标 - Result> metricsListResult = groupMetricService.listLatestMetricsAggByGroupTopicFromES( - clusterPhyId, - paginationResult.getData().getBizData().stream().map(elem -> new GroupTopic(elem.getGroupName(), elem.getTopicName())).collect(Collectors.toList()), - Arrays.asList(GroupMetricVersionItems.GROUP_METRIC_LAG), - AggTypeEnum.MAX - ); - if (metricsListResult.failed()) { - // 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回 - log.error("method=pagingGroupMembers||clusterPhyId={}||topicName={}||groupName={}||result={}||errMsg=search es failed", clusterPhyId, topicName, groupName, metricsListResult); + List groupTopicVOList = this.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData()); + + return PaginationResult.buildSuc(groupTopicVOList, paginationResult); + } + + @Override + public PaginationResult pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto) { + Group group = groupService.getGroupFromDB(clusterPhyId, groupName); + + //没有topicMember则直接返回 + if (group == null || ValidateUtils.isEmptyList(group.getTopicMembers())) { + return PaginationResult.buildSuc(dto); } - return PaginationResult.buildSuc( - this.convert2GroupTopicOverviewVOList(paginationResult.getData().getBizData(), metricsListResult.getData()), - paginationResult - ); + //排序 + List groupTopicMembers = PaginationUtil.pageBySort(group.getTopicMembers(), PaginationConstant.DEFAULT_GROUP_TOPIC_SORTED_FIELD, SortTypeEnum.DESC.getSortType()); + + //分页 + PaginationResult paginationResult = PaginationUtil.pageBySubData(groupTopicMembers, dto); + + List groupMemberPOList = paginationResult.getData().getBizData().stream().map(elem -> new GroupMemberPO(clusterPhyId, elem.getTopicName(), groupName, group.getState().getState(), elem.getMemberCount())).collect(Collectors.toList()); + + return PaginationResult.buildSuc(this.getGroupTopicOverviewVOList(clusterPhyId, groupMemberPOList), paginationResult); + } + + @Override + public PaginationResult pagingClusterGroupsOverview(Long clusterPhyId, ClusterGroupSummaryDTO dto) { + List groupList = groupService.listClusterGroups(clusterPhyId); + + // 类型转化 + List voList = groupList.stream().map(elem -> GroupConverter.convert2GroupOverviewVO(elem)).collect(Collectors.toList()); + + // 搜索groupName + voList = PaginationUtil.pageByFuzzyFilter(voList, dto.getSearchGroupName(), Arrays.asList("name")); + + //搜索topic + if (!ValidateUtils.isBlank(dto.getSearchTopicName())) { + voList = voList.stream().filter(elem -> { + for (String topicName : elem.getTopicNameList()) { + if (topicName.contains(dto.getSearchTopicName())) { + return true; + } + } + return false; + }).collect(Collectors.toList()); + } + + // 分页 后 返回 + return PaginationUtil.pageBySubData(voList, dto); } @Override @@ -104,7 +141,7 @@ public class GroupManagerImpl implements GroupManager { List latestMetricNames, PaginationSortDTO dto) throws NotExistException, AdminOperateException { // 获取消费组消费的TopicPartition列表 - Map consumedOffsetMap = groupService.getGroupOffset(clusterPhyId, groupName); + Map consumedOffsetMap = groupService.getGroupOffsetFromKafka(clusterPhyId, groupName); List partitionList = consumedOffsetMap.keySet() .stream() .filter(elem -> elem.topic().equals(topicName)) @@ -113,7 +150,7 @@ public class GroupManagerImpl implements GroupManager { Collections.sort(partitionList); // 获取消费组当前运行信息 - ConsumerGroupDescription groupDescription = groupService.getGroupDescription(clusterPhyId, groupName); + ConsumerGroupDescription groupDescription = groupService.getGroupDescriptionFromKafka(clusterPhyId, groupName); // 转换存储格式 Map tpMemberMap = new HashMap<>(); @@ -166,13 +203,13 @@ public class GroupManagerImpl implements GroupManager { return rv; } - ConsumerGroupDescription description = groupService.getGroupDescription(dto.getClusterId(), dto.getGroupName()); + ConsumerGroupDescription description = groupService.getGroupDescriptionFromKafka(dto.getClusterId(), dto.getGroupName()); if (ConsumerGroupState.DEAD.equals(description.state()) && !dto.isCreateIfNotExist()) { return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, "group不存在, 重置失败"); } if (!ConsumerGroupState.EMPTY.equals(description.state()) && !ConsumerGroupState.DEAD.equals(description.state())) { - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty | Dead 情况可重置)", GroupStateEnum.getByRawState(description.state()).getState())); + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty情况可重置)", GroupStateEnum.getByRawState(description.state()).getState())); } // 获取offset @@ -185,6 +222,22 @@ public class GroupManagerImpl implements GroupManager { return groupService.resetGroupOffsets(dto.getClusterId(), dto.getGroupName(), offsetMapResult.getData(), operator); } + @Override + public List getGroupTopicOverviewVOList(Long clusterPhyId, List groupMemberPOList) { + // 获取指标 + Result> metricsListResult = groupMetricService.listLatestMetricsAggByGroupTopicFromES( + clusterPhyId, + groupMemberPOList.stream().map(elem -> new GroupTopic(elem.getGroupName(), elem.getTopicName())).collect(Collectors.toList()), + Arrays.asList(GroupMetricVersionItems.GROUP_METRIC_LAG), + AggTypeEnum.MAX + ); + if (metricsListResult.failed()) { + // 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回 + log.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult); + } + return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData()); + } + /**************************************************** private method ****************************************************/ @@ -293,4 +346,31 @@ public class GroupManagerImpl implements GroupManager { ); } + private List convert2GroupTopicOverviewVOList(String groupName, String state, List groupTopicList, List metricsList) { + if (metricsList == null) { + metricsList = new ArrayList<>(); + } + + // + Map metricsMap = new HashMap<>(); + for (GroupMetrics metrics : metricsList) { + if (!groupName.equals(metrics.getGroup())) continue; + metricsMap.put(metrics.getTopic(), metrics); + } + + List voList = new ArrayList<>(); + for (GroupTopicMember po : groupTopicList) { + GroupTopicOverviewVO vo = ConvertUtil.obj2Obj(po, GroupTopicOverviewVO.class); + vo.setGroupName(groupName); + vo.setState(state); + GroupMetrics metrics = metricsMap.get(po.getTopicName()); + if (metrics != null) { + vo.setMaxLag(ConvertUtil.Float2Long(metrics.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG))); + } + + voList.add(vo); + } + return voList; + } + } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java index ec3a3207..f2c05300 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java @@ -1,8 +1,10 @@ package com.xiaojukeji.know.streaming.km.biz.topic; -import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicRecordVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO; @@ -23,4 +25,6 @@ public interface TopicStateManager { Result> getTopicPartitions(Long clusterPhyId, String topicName, List metricsNames); Result getTopicBrokersPartitionsSummary(Long clusterPhyId, String topicName); + + PaginationResult pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 9c03737a..afc907da 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -2,17 +2,22 @@ package com.xiaojukeji.know.streaming.km.biz.topic.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.biz.group.GroupManager; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; +import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.broker.BrokerReplicaSummaryVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicRecordVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO; @@ -32,6 +37,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; @@ -77,6 +83,12 @@ public class TopicStateManagerImpl implements TopicStateManager { @Autowired private TopicConfigService topicConfigService; + @Autowired + private GroupService groupService; + + @Autowired + private GroupManager groupManager; + @Override public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException { Topic topic = topicService.getTopic(clusterPhyId, topicName); @@ -346,6 +358,19 @@ public class TopicStateManagerImpl implements TopicStateManager { return Result.buildSuc(vo); } + @Override + public PaginationResult pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto) { + PaginationResult paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, "", "", searchGroupName, dto); + + if (!paginationResult.hasData()) { + return PaginationResult.buildSuc(new ArrayList<>(), paginationResult); + } + + List groupTopicVOList = groupManager.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData()); + + return PaginationResult.buildSuc(groupTopicVOList, paginationResult); + } + /**************************************************** private method ****************************************************/ private boolean checkIfIgnore(ConsumerRecord consumerRecord, String filterKey, String filterValue) { diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterGroupSummaryDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterGroupSummaryDTO.java new file mode 100644 index 00000000..d199e0d8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterGroupSummaryDTO.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.cluster; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author wyb + * @date 2022/10/17 + */ +@Data +public class ClusterGroupSummaryDTO extends PaginationBaseDTO { + @ApiModelProperty("查找该Topic") + private String searchTopicName; + + @ApiModelProperty("查找该Group") + private String searchGroupName; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/group/Group.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/group/Group.java new file mode 100644 index 00000000..3b2e22e9 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/group/Group.java @@ -0,0 +1,74 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.group; + +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; +import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author wyb + * @date 2022/10/10 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Group { + /** + * 集群id + */ + private Long clusterPhyId; + + /** + * group类型 + * @see GroupTypeEnum + */ + private GroupTypeEnum type; + + /** + * group名称 + */ + private String name; + + /** + * group状态 + * @see GroupStateEnum + */ + private GroupStateEnum state; + + /** + * group成员数量 + */ + private Integer memberCount; + + /** + * group消费的topic列表 + */ + private List topicMembers; + + /** + * group分配策略 + */ + private String partitionAssignor; + + /** + * group协调器brokerId + */ + private int coordinatorId; + + public Group(Long clusterPhyId, String groupName, ConsumerGroupDescription groupDescription) { + this.clusterPhyId = clusterPhyId; + this.type = groupDescription.isSimpleConsumerGroup()? GroupTypeEnum.CONSUMER: GroupTypeEnum.CONNECTOR; + this.name = groupName; + this.state = GroupStateEnum.getByRawState(groupDescription.state()); + this.memberCount = groupDescription.members() == null? 0: groupDescription.members().size(); + this.topicMembers = new ArrayList<>(); + this.partitionAssignor = groupDescription.partitionAssignor(); + this.coordinatorId = groupDescription.coordinator() == null? Constant.INVALID_CODE: groupDescription.coordinator().id(); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/group/GroupTopicMember.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/group/GroupTopicMember.java new file mode 100644 index 00000000..5fe960b1 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/group/GroupTopicMember.java @@ -0,0 +1,27 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.group; + +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author wyb + * @date 2022/10/10 + */ +@Data +@NoArgsConstructor +public class GroupTopicMember { + /** + * Topic名称 + */ + private String topicName; + + /** + * 消费此Topic的成员数量 + */ + private Integer memberCount; + + public GroupTopicMember(String topicName, Integer memberCount) { + this.topicName = topicName; + this.memberCount = memberCount; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java index 3d999952..7992ac17 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupMemberPO.java @@ -3,7 +3,6 @@ package com.xiaojukeji.know.streaming.km.common.bean.po.group; import com.baomidou.mybatisplus.annotation.TableName; import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO; import com.xiaojukeji.know.streaming.km.common.constant.Constant; -import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import lombok.Data; import lombok.NoArgsConstructor; @@ -23,12 +22,19 @@ public class GroupMemberPO extends BasePO { private Integer memberCount; - public GroupMemberPO(Long clusterPhyId, String topicName, String groupName, Date updateTime) { + public GroupMemberPO(Long clusterPhyId, String topicName, String groupName, String state, Integer memberCount) { this.clusterPhyId = clusterPhyId; this.topicName = topicName; this.groupName = groupName; - this.state = GroupStateEnum.UNKNOWN.getState(); - this.memberCount = 0; + this.state = state; + this.memberCount = memberCount; + } + public GroupMemberPO(Long clusterPhyId, String topicName, String groupName, String state, Integer memberCount, Date updateTime) { + this.clusterPhyId = clusterPhyId; + this.topicName = topicName; + this.groupName = groupName; + this.state = state; + this.memberCount = memberCount; this.updateTime = updateTime; } } \ No newline at end of file diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java new file mode 100644 index 00000000..49ac5bf3 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/group/GroupPO.java @@ -0,0 +1,61 @@ +package com.xiaojukeji.know.streaming.km.common.bean.po.group; + + +import com.baomidou.mybatisplus.annotation.TableName; +import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; +import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "group") +public class GroupPO extends BasePO { + /** + * 集群id + */ + private Long clusterPhyId; + + /** + * group类型 + * + * @see GroupTypeEnum + */ + private Integer type; + + /** + * group名称 + */ + private String name; + + /** + * group状态 + * + * @see GroupStateEnum + */ + private String state; + + /** + * group成员数量 + */ + private Integer memberCount; + + /** + * group消费的topic列表 + */ + private String topicMembers; + + /** + * group分配策略 + */ + private String partitionAssignor; + + /** + * group协调器brokerId + */ + private int coordinatorId; + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/group/GroupOverviewVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/group/GroupOverviewVO.java new file mode 100644 index 00000000..df976643 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/group/GroupOverviewVO.java @@ -0,0 +1,27 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.group; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.List; + +/** + * @author wyb + * @date 2022/10/9 + */ +@Data +@ApiModel(value = "Group信息") +public class GroupOverviewVO { + @ApiModelProperty(value = "Group名称", example = "group-know-streaming-test") + private String name; + + @ApiModelProperty(value = "Group状态", example = "Empty") + private String state; + + @ApiModelProperty(value = "group的成员数", example = "12") + private Integer memberCount; + + @ApiModelProperty(value = "Topic列表", example = "[topic1,topic2]") + private List topicNameList; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/group/GroupTopicOverviewVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/group/GroupTopicOverviewVO.java index 205fb923..40d2f652 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/group/GroupTopicOverviewVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/group/GroupTopicOverviewVO.java @@ -10,7 +10,7 @@ import lombok.Data; */ @Data @ApiModel(value = "GroupTopic信息") -public class GroupTopicOverviewVO extends GroupTopicBasicVO{ +public class GroupTopicOverviewVO extends GroupTopicBasicVO { @ApiModelProperty(value = "最大Lag", example = "12345678") private Long maxLag; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/PaginationConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/PaginationConstant.java index 68dd9358..9b8def80 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/PaginationConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/PaginationConstant.java @@ -18,4 +18,14 @@ public class PaginationConstant { * 默认页大小 */ public static final Integer DEFAULT_PAGE_SIZE = 10; + + /** + * group列表的默认排序规则 + */ + public static final String DEFAULT_GROUP_SORTED_FIELD = "name"; + + /** + * groupTopic列表的默认排序规则 + */ + public static final String DEFAULT_GROUP_TOPIC_SORTED_FIELD = "topicName"; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java new file mode 100644 index 00000000..131bd243 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/GroupConverter.java @@ -0,0 +1,62 @@ +package com.xiaojukeji.know.streaming.km.common.converter; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group; +import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember; +import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO; +import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; +import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; + +import java.util.ArrayList; +import java.util.stream.Collectors; + +/** + * @author wyb + * @date 2022/10/10 + */ +public class GroupConverter { + + private GroupConverter() { + + } + + public static GroupOverviewVO convert2GroupOverviewVO(Group group) { + GroupOverviewVO vo = ConvertUtil.obj2Obj(group, GroupOverviewVO.class); + + vo.setState(group.getState().getState()); + vo.setTopicNameList(group.getTopicMembers().stream().map(elem -> elem.getTopicName()).collect(Collectors.toList())); + + return vo; + } + + public static Group convert2Group(GroupPO po) { + if (po == null) { + return null; + } + + Group group = ConvertUtil.obj2Obj(po, Group.class); + if (!ValidateUtils.isBlank(po.getTopicMembers())) { + group.setTopicMembers(ConvertUtil.str2ObjArrayByJson(po.getTopicMembers(), GroupTopicMember.class)); + } else { + group.setTopicMembers(new ArrayList<>()); + } + + group.setType(GroupTypeEnum.getTypeByCode(po.getType())); + group.setState(GroupStateEnum.getByState(po.getState())); + return group; + } + + public static GroupPO convert2GroupPO(Group group) { + if (group == null) { + return null; + } + + GroupPO po = ConvertUtil.obj2Obj(group, GroupPO.class); + po.setTopicMembers(ConvertUtil.obj2Json(group.getTopicMembers())); + po.setType(group.getType().getCode()); + po.setState(group.getState().getState()); + return po; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/group/GroupTypeEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/group/GroupTypeEnum.java new file mode 100644 index 00000000..ebb91ea1 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/group/GroupTypeEnum.java @@ -0,0 +1,36 @@ +package com.xiaojukeji.know.streaming.km.common.enums.group; + +import lombok.Getter; + +/** + * @author wyb + * @date 2022/10/11 + */ +@Getter +public enum GroupTypeEnum { + + UNKNOWN(-1, "Unknown"), + + CONSUMER(0, "Consumer客户端的消费组"), + + CONNECTOR(1, "Connector的消费组"); + + private final Integer code; + + private final String msg; + + GroupTypeEnum(Integer code, String msg) { + this.code = code; + this.msg = msg; + } + + public static GroupTypeEnum getTypeByCode(Integer code) { + if (code == null) return UNKNOWN; + for (GroupTypeEnum groupTypeEnum : GroupTypeEnum.values()) { + if (groupTypeEnum.code.equals(code)) { + return groupTypeEnum; + } + } + return UNKNOWN; + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java index 790a7c47..8dc1c535 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java @@ -1,6 +1,7 @@ package com.xiaojukeji.know.streaming.km.core.service.group; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; @@ -16,27 +17,47 @@ import java.util.Map; public interface GroupService { /** - * 从Kafka中获取消费组 - * @param clusterPhyId 集群ID - * @return - * @throws NotExistException - * @throws AdminOperateException + * 从Kafka中获取消费组名称列表 */ List listGroupsFromKafka(Long clusterPhyId) throws NotExistException, AdminOperateException; - Map getGroupOffset(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException; + /** + * 从Kafka中获取消费组详细信息 + */ + Group getGroupFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException; - ConsumerGroupDescription getGroupDescription(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException; + Map getGroupOffsetFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException; - int replaceDBData(GroupMemberPO groupMemberPO); + ConsumerGroupDescription getGroupDescriptionFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException; - void batchReplace(List newGroupMemberList); + Result resetGroupOffsets(Long clusterPhyId, String groupName, Map offsetMap, String operator) throws NotExistException, AdminOperateException; + /** + * 批量更新DB + */ + void batchReplaceGroupsAndMembers(Long clusterPhyId, List newGroupList, long updateTime); + + int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime); + + /** + * DB-Group相关接口 + */ GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName); - List listGroupByTopic(Long clusterPhyId, String topicName); + Group getGroupFromDB(Long clusterPhyId, String groupName); - List listGroup(Long clusterPhyId); + List listClusterGroups(Long clusterPhyId); + + List getGroupsFromDB(Long clusterPhyId); + + Integer calGroupCount(Long clusterPhyId); + + Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum); + + /** + * DB-GroupTopic相关接口 + */ + List listGroupByTopic(Long clusterPhyId, String topicName); PaginationResult pagingGroupMembers(Long clusterPhyId, String topicName, @@ -45,15 +66,5 @@ public interface GroupService { String searchGroupKeyword, PaginationBaseDTO dto); - int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime); - - List getGroupsFromDB(Long clusterPhyId); - - GroupMemberPO getGroupFromDB(Long clusterPhyId, String groupName, String topicName); - - Integer calGroupCount(Long clusterPhyId); - - Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum); - - Result resetGroupOffsets(Long clusterPhyId, String groupName, Map offsetMap, String operator) throws NotExistException, AdminOperateException; -} + GroupMemberPO getGroupTopicFromDB(Long clusterPhyId, String groupName, String topicName); +} \ No newline at end of file diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java index 427edc2c..936897a3 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java @@ -24,7 +24,6 @@ import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreSer import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO; -import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; @@ -183,7 +182,7 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe List metricsList = new ArrayList<>(); try { - Map groupOffsetMap = groupService.getGroupOffset(clusterId, groupName); + Map groupOffsetMap = groupService.getGroupOffsetFromKafka(clusterId, groupName); // 组织 GROUP_METRIC_OFFSET_CONSUMED 指标 for (Map.Entry entry: groupOffsetMap.entrySet()) { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java index 1789671b..1a923f21 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java @@ -7,11 +7,15 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group; +import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; +import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.converter.GroupConverter; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; @@ -24,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; +import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupDAO; import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupMemberDAO; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -41,6 +46,9 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT public class GroupServiceImpl extends BaseVersionControlService implements GroupService { private static final ILog log = LogFactory.getLog(GroupServiceImpl.class); + @Autowired + private GroupDAO groupDAO; + @Autowired private GroupMemberDAO groupMemberDAO; @@ -79,7 +87,43 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group } @Override - public Map getGroupOffset(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException { + public Group getGroupFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException { + // 获取消费组的详细信息 + ConsumerGroupDescription groupDescription = this.getGroupDescriptionFromKafka(clusterPhyId, groupName); + if (groupDescription == null) { + return null; + } + + Group group = new Group(clusterPhyId, groupName, groupDescription); + + // 获取消费组消费过哪些Topic + Map memberMap = new HashMap<>(); + for (TopicPartition tp : this.getGroupOffsetFromKafka(clusterPhyId, groupName).keySet()) { + memberMap.putIfAbsent(tp.topic(), new GroupTopicMember(tp.topic(), 0)); + } + + // 记录成员信息 + for (MemberDescription memberDescription : groupDescription.members()) { + Set partitionList = new HashSet<>(); + if (!ValidateUtils.isNull(memberDescription.assignment().topicPartitions())) { + partitionList = memberDescription.assignment().topicPartitions(); + } + + Set topicNameSet = partitionList.stream().map(elem -> elem.topic()).collect(Collectors.toSet()); + for (String topicName : topicNameSet) { + memberMap.putIfAbsent(topicName, new GroupTopicMember(topicName, 0)); + + GroupTopicMember member = memberMap.get(topicName); + member.setMemberCount(member.getMemberCount() + 1); + } + } + group.setTopicMembers(memberMap.values().stream().collect(Collectors.toList())); + + return group; + } + + @Override + public Map getGroupOffsetFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException { AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId); Map offsetMap = new HashMap<>(); @@ -99,12 +143,12 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group } @Override - public ConsumerGroupDescription getGroupDescription(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException { + public ConsumerGroupDescription getGroupDescriptionFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException { AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId); try { DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups( - Collections.singletonList(groupName), + Arrays.asList(groupName), new DescribeConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).includeAuthorizedOperations(false) ); @@ -117,40 +161,12 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group } @Override - public int replaceDBData(GroupMemberPO groupMemberPO) { - return groupMemberDAO.replace(groupMemberPO); - } - - @Override - public void batchReplace(List newGroupMemberList) { - if (newGroupMemberList == null || newGroupMemberList.isEmpty()) { - return; - } - - Long clusterPhyId = newGroupMemberList.get(0).getClusterPhyId(); - if (clusterPhyId == null) { - return; - } - - List dbGroupMemberList = listGroup(clusterPhyId); - - - Map dbGroupMemberMap = dbGroupMemberList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity())); - for (GroupMemberPO groupMemberPO : newGroupMemberList) { - GroupMemberPO po = dbGroupMemberMap.remove(groupMemberPO.getGroupName() + groupMemberPO.getTopicName()); - try { - if (po != null) { - groupMemberPO.setId(po.getId()); - groupMemberDAO.updateById(groupMemberPO); - } else { - groupMemberDAO.insert(groupMemberPO); - } - } catch (Exception e) { - log.error("method=batchReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, groupMemberPO.getGroupName(), e); - } - - } + public void batchReplaceGroupsAndMembers(Long clusterPhyId, List newGroupList, long updateTime) { + // 更新Group信息 + this.batchReplaceGroups(clusterPhyId, newGroupList, updateTime); + // 更新Group-Topic信息 + this.batchReplaceGroupMembers(clusterPhyId, newGroupList, updateTime); } @Override @@ -176,14 +192,6 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group return groupMemberDAO.selectList(lambdaQueryWrapper); } - @Override - public List listGroup(Long clusterPhyId) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); - - return groupMemberDAO.selectList(lambdaQueryWrapper); - } - @Override public PaginationResult pagingGroupMembers(Long clusterPhyId, String topicName, @@ -208,8 +216,33 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group return PaginationResult.buildSuc(iPage.getRecords(), iPage); } + @Override + public Group getGroupFromDB(Long clusterPhyId, String groupName) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId); + lambdaQueryWrapper.eq(GroupPO::getName, groupName); + + GroupPO groupPO = groupDAO.selectOne(lambdaQueryWrapper); + return GroupConverter.convert2Group(groupPO); + } + + @Override + public List listClusterGroups(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId); + + return groupDAO.selectList(lambdaQueryWrapper).stream().map(elem -> GroupConverter.convert2Group(elem)).collect(Collectors.toList()); + } + @Override public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) { + // 删除过期Group信息 + LambdaQueryWrapper groupPOLambdaQueryWrapper = new LambdaQueryWrapper<>(); + groupPOLambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId); + groupPOLambdaQueryWrapper.le(GroupPO::getUpdateTime, beforeTime); + groupDAO.delete(groupPOLambdaQueryWrapper); + + // 删除过期GroupMember信息 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); queryWrapper.le(GroupMemberPO::getUpdateTime, beforeTime); @@ -218,17 +251,19 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group @Override public List getGroupsFromDB(Long clusterPhyId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); - List poList = groupMemberDAO.selectList(queryWrapper); + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId); + + List poList = groupDAO.selectList(lambdaQueryWrapper); if (poList == null) { poList = new ArrayList<>(); } - return new ArrayList<>(poList.stream().map(elem -> elem.getGroupName()).collect(Collectors.toSet())); + + return new ArrayList<>(poList.stream().map(elem -> elem.getName()).collect(Collectors.toSet())); } @Override - public GroupMemberPO getGroupFromDB(Long clusterPhyId, String groupName, String topicName) { + public GroupMemberPO getGroupTopicFromDB(Long clusterPhyId, String groupName, String topicName) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); queryWrapper.eq(GroupMemberPO::getTopicName, topicName); @@ -239,28 +274,19 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group @Override public Integer calGroupCount(Long clusterPhyId) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); - List poList = groupMemberDAO.selectList(queryWrapper); - if (poList == null) { - poList = new ArrayList<>(); - } + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId); - return poList.stream().map(elem -> elem.getGroupName()).collect(Collectors.toSet()).size(); + return groupDAO.selectCount(lambdaQueryWrapper); } @Override public Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); - queryWrapper.eq(GroupMemberPO::getState, stateEnum.getState()); + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId); + lambdaQueryWrapper.eq(GroupPO::getState, stateEnum.getState()); - List poList = groupMemberDAO.selectList(queryWrapper); - if (poList == null) { - poList = new ArrayList<>(); - } - - return poList.stream().map(elem -> elem.getGroupName()).collect(Collectors.toSet()).size(); + return groupDAO.selectCount(lambdaQueryWrapper); } @Override @@ -303,4 +329,74 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group /**************************************************** private method ****************************************************/ + private void batchReplaceGroupMembers(Long clusterPhyId, List newGroupList, long updateTime) { + if (ValidateUtils.isEmptyList(newGroupList)) { + return; + } + + List dbPOList = this.listClusterGroupsMemberPO(clusterPhyId); + Map dbPOMap = dbPOList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity())); + + for (Group group: newGroupList) { + for (GroupTopicMember member : group.getTopicMembers()) { + try { + GroupMemberPO newPO = new GroupMemberPO(clusterPhyId, member.getTopicName(), group.getName(), group.getState().getState(), member.getMemberCount(), new Date(updateTime)); + + GroupMemberPO dbPO = dbPOMap.remove(newPO.getGroupName() + newPO.getTopicName()); + if (dbPO != null) { + newPO.setId(dbPO.getId()); + groupMemberDAO.updateById(newPO); + continue; + } + + groupMemberDAO.insert(newPO); + } catch (Exception e) { + log.error( + "method=batchReplaceGroupMembers||clusterPhyId={}||groupName={}||topicName={}||errMsg=exception", + clusterPhyId, group.getName(), member.getTopicName(), e + ); + } + } + } + } + + private void batchReplaceGroups(Long clusterPhyId, List newGroupList, long updateTime) { + if (ValidateUtils.isEmptyList(newGroupList)) { + return; + } + + List dbGroupList = this.listClusterGroupsPO(clusterPhyId); + Map dbGroupMap = dbGroupList.stream().collect(Collectors.toMap(elem -> elem.getName(), Function.identity())); + + for (Group newGroup: newGroupList) { + try { + GroupPO newPO = GroupConverter.convert2GroupPO(newGroup); + newPO.setUpdateTime(new Date(updateTime)); + + GroupPO dbPO = dbGroupMap.remove(newGroup.getName()); + if (dbPO != null) { + newPO.setId(dbPO.getId()); + groupDAO.updateById(newPO); + continue; + } + + groupDAO.insert(newPO); + } catch (Exception e) { + log.error("method=batchGroupReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, newGroup.getName(), e); + } + } + } + + private List listClusterGroupsPO(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId); + return groupDAO.selectList(lambdaQueryWrapper); + } + + private List listClusterGroupsMemberPO(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); + + return groupMemberDAO.selectList(lambdaQueryWrapper); + } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/group/GroupDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/group/GroupDAO.java new file mode 100644 index 00000000..eb4465c3 --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/group/GroupDAO.java @@ -0,0 +1,9 @@ +package com.xiaojukeji.know.streaming.km.persistence.mysql.group; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO; +import org.springframework.stereotype.Repository; + +@Repository +public interface GroupDAO extends BaseMapper { +} diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterGroupsController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterGroupsController.java index 4515d695..b035ea02 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterGroupsController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterGroupsController.java @@ -1,12 +1,15 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster; import com.xiaojukeji.know.streaming.km.biz.group.GroupManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupsOverviewDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricGroupPartitionDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.field.PaginationFuzzySearchFieldDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS; +import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; @@ -37,7 +40,8 @@ public class ClusterGroupsController { @Autowired private GroupMetricService groupMetricService; - @ApiOperation(value = "集群Groups信息列表") + @Deprecated + @ApiOperation(value = "集群Groups信息列表", notes = "废弃, 下一个版本删除") @PostMapping(value = "clusters/{clusterPhyId}/groups-overview") @ResponseBody public PaginationResult getClusterPhyGroupsOverview(@PathVariable Long clusterPhyId, @@ -53,6 +57,13 @@ public class ClusterGroupsController { ); } + @ApiOperation(value = "集群Groups信息列表") + @GetMapping(value = "clusters/{clusterPhyId}/groups-overview") + @ResponseBody + public PaginationResult getGroupsOverview(@PathVariable Long clusterPhyId, ClusterGroupSummaryDTO dto) { + return groupManager.pagingClusterGroupsOverview(clusterPhyId, dto); + } + @ApiOperation(value = "集群Groups指标信息") @PostMapping(value = "clusters/{clusterPhyId}/group-metrics") @ResponseBody @@ -70,8 +81,17 @@ public class ClusterGroupsController { return groupManager.listClusterPhyGroupPartitions(clusterPhyId, groupName, startTime, endTime); } + @ApiOperation(value = "Group的Topic列表") + @GetMapping(value = "clusters/{clusterPhyId}/groups/{groupName}/topics-overview") + public PaginationResult getGroupTopicsOverview(@PathVariable Long clusterPhyId, + @PathVariable String groupName, + PaginationBaseDTO dto) { + return groupManager.pagingGroupTopicMembers(clusterPhyId, groupName, dto); + } + /**************************************************** private method ****************************************************/ + @Deprecated private Tuple getSearchKeyWords(ClusterGroupsOverviewDTO dto) { if (ValidateUtils.isEmptyList(dto.getFuzzySearchDTOList())) { return new Tuple<>("", ""); diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/group/GroupController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/group/GroupController.java index 17986b16..55e7e778 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/group/GroupController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/group/GroupController.java @@ -55,7 +55,7 @@ public class GroupController { public Result getGroupMetadataCombineExist(@PathVariable Long clusterPhyId, @PathVariable String groupName, @PathVariable String topicName) { - GroupMemberPO po = groupService.getGroupFromDB(clusterPhyId, groupName, topicName); + GroupMemberPO po = groupService.getGroupTopicFromDB(clusterPhyId, groupName, topicName); if (po == null) { return Result.buildSuc(new GroupMetadataCombineExistVO(clusterPhyId, groupName, topicName, false)); } diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java index d1e09e66..b0371537 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java @@ -3,7 +3,6 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.topic; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; -import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; @@ -11,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.acl.AclBindingVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicBasicVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO; @@ -136,8 +136,17 @@ public class TopicStateController { @ApiOperation(value = "TopicGroups基本信息列表") @GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/groups-basic") @ResponseBody - public Result> getTopicGroupsBasic(@PathVariable Long clusterPhyId, - @PathVariable String topicName) { + public Result> getTopicGroupsBasic(@PathVariable Long clusterPhyId, @PathVariable String topicName) { return Result.buildSuc(ConvertUtil.list2List(groupService.listGroupByTopic(clusterPhyId, topicName), GroupTopicBasicVO.class)); } + + @ApiOperation("Topic的Group列表") + @GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/groups-overview") + public PaginationResult getTopicGroupsOverview(@PathVariable Long clusterPhyId, + @PathVariable String topicName, + @RequestParam(required = false) String searchGroupName, + PaginationBaseDTO dto) { + return topicStateManager.pagingTopicGroupsOverview(clusterPhyId, topicName, searchGroupName, dto); + } + } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java index e2f749fe..cbec5bd2 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java @@ -6,15 +6,10 @@ import com.didiglobal.logi.job.core.consensual.ConsensualEnum; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; -import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; -import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; -import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; +import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; -import org.apache.kafka.clients.admin.*; -import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import java.util.*; @@ -38,98 +33,58 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask { @Override public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { - + // 获取集群的Group列表 List groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId()); - TaskResult tr = updateGroupMembersTask(clusterPhy, groupNameList, triggerTimeUnitMs); - if (!TaskResult.SUCCESS.equals(tr)) { - return tr; + TaskResult allSuccess = TaskResult.SUCCESS; + + // 获取Group详细信息 + List groupList = new ArrayList<>(); + for (String groupName : groupNameList) { + try { + Group group = groupService.getGroupFromKafka(clusterPhy.getId(), groupName); + if (group == null) { + continue; + } + + groupList.add(group); + } catch (Exception e) { + log.error("method=processClusterTask||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhy.getId(), groupName, e); + allSuccess = TaskResult.FAIL; + } + } + + // 过滤掉无效的Topic + this.filterTopicIfTopicNotExist(clusterPhy.getId(), groupList); + + // 更新DB中的Group信息 + groupService.batchReplaceGroupsAndMembers(clusterPhy.getId(), groupList, triggerTimeUnitMs); + + // 如果存在错误,则直接返回 + if (!TaskResult.SUCCESS.equals(allSuccess)) { + return allSuccess; } // 删除历史的Group groupService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 5 * 60 * 1000)); - return tr; + return allSuccess; } - - private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List groupNameList, long triggerTimeUnitMs) { - List groupMemberPOList = new ArrayList<>(); - TaskResult tr = TaskResult.SUCCESS; - - for (String groupName : groupNameList) { - try { - List poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs)); - groupMemberPOList.addAll(poList); - } catch (Exception e) { - log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhy.getId(), groupName, e); - tr = TaskResult.FAIL; - } - } - - groupMemberPOList = this.filterGroupIfTopicNotExist(clusterPhy.getId(), groupMemberPOList); - groupService.batchReplace(groupMemberPOList); - - return tr; - } - - private List getGroupMembers(Long clusterPhyId, String groupName, Date updateTime) throws NotExistException, AdminOperateException { - Map groupMap = new HashMap<>(); - - // 获取消费组消费过哪些Topic - Map offsetMap = groupService.getGroupOffset(clusterPhyId, groupName); - for (TopicPartition topicPartition : offsetMap.keySet()) { - GroupMemberPO po = groupMap.get(topicPartition.topic()); - if (po == null) { - po = new GroupMemberPO(clusterPhyId, topicPartition.topic(), groupName, updateTime); - } - groupMap.put(topicPartition.topic(), po); - } - - // 在上面的基础上,补充消费组的详细信息 - ConsumerGroupDescription consumerGroupDescription = groupService.getGroupDescription(clusterPhyId, groupName); - if (consumerGroupDescription == null) { - return new ArrayList<>(groupMap.values()); - } - - groupMap.forEach((key, val) -> val.setState(GroupStateEnum.getByRawState(consumerGroupDescription.state()).getState())); - - for (MemberDescription memberDescription : consumerGroupDescription.members()) { - Set partitionList = new HashSet<>(); - if (!ValidateUtils.isNull(memberDescription.assignment().topicPartitions())) { - partitionList = memberDescription.assignment().topicPartitions(); - } - - Set topicNameSet = partitionList.stream().map(elem -> elem.topic()).collect(Collectors.toSet()); - for (String topicName : topicNameSet) { - groupMap.putIfAbsent(topicName, new GroupMemberPO(clusterPhyId, topicName, groupName, updateTime)); - - GroupMemberPO po = groupMap.get(topicName); - po.setMemberCount(po.getMemberCount() + 1); - po.setState(GroupStateEnum.getByRawState(consumerGroupDescription.state()).getState()); - } - } - - // 如果该消费组没有正在消费任何Topic的特殊情况,但是这个Group存在 - if (groupMap.isEmpty()) { - GroupMemberPO po = new GroupMemberPO(clusterPhyId, "", groupName, updateTime); - po.setState(GroupStateEnum.getByRawState(consumerGroupDescription.state()).getState()); - groupMap.put("", po); - } - - return new ArrayList<>(groupMap.values()); - } - - private List filterGroupIfTopicNotExist(Long clusterPhyId, List poList) { - if (poList.isEmpty()) { - return poList; + private void filterTopicIfTopicNotExist(Long clusterPhyId, List groupList) { + if (ValidateUtils.isEmptyList(groupList)) { + return; } // 集群Topic集合 Set dbTopicSet = topicService.listTopicsFromDB(clusterPhyId).stream().map(elem -> elem.getTopicName()).collect(Collectors.toSet()); dbTopicSet.add(""); //兼容没有消费Topic的group - + // 过滤Topic不存在的消费组 - return poList.stream().filter(elem -> dbTopicSet.contains(elem.getTopicName())).collect(Collectors.toList()); + for (Group group: groupList) { + group.setTopicMembers( + group.getTopicMembers().stream().filter(elem -> dbTopicSet.contains(elem.getTopicName())).collect(Collectors.toList()) + ); + } } }