[Feature] 集群Group列表按照Group维度进行展示 (#580)

This commit is contained in:
zengqiao
2022-10-20 11:48:27 +08:00
committed by EricZeng
parent 586b37caa0
commit 05c52cd672
22 changed files with 744 additions and 206 deletions

View File

@@ -1,11 +1,14 @@
package com.xiaojukeji.know.streaming.km.biz.group; package com.xiaojukeji.know.streaming.km.biz.group;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedDetailVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedDetailVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
@@ -22,6 +25,10 @@ public interface GroupManager {
String searchGroupKeyword, String searchGroupKeyword,
PaginationBaseDTO dto); PaginationBaseDTO dto);
PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto);
PaginationResult<GroupOverviewVO> pagingClusterGroupsOverview(Long clusterPhyId, ClusterGroupSummaryDTO dto);
PaginationResult<GroupTopicConsumedDetailVO> pagingGroupTopicConsumedMetrics(Long clusterPhyId, PaginationResult<GroupTopicConsumedDetailVO> pagingGroupTopicConsumedMetrics(Long clusterPhyId,
String topicName, String topicName,
String groupName, String groupName,
@@ -31,4 +38,6 @@ public interface GroupManager {
Result<Set<TopicPartitionKS>> listClusterPhyGroupPartitions(Long clusterPhyId, String groupName, Long startTime, Long endTime); Result<Set<TopicPartitionKS>> listClusterPhyGroupPartitions(Long clusterPhyId, String groupName, Long startTime, Long endTime);
Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator) throws Exception; Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator) throws Exception;
List<GroupTopicOverviewVO> getGroupTopicOverviewVOList (Long clusterPhyId, List<GroupMemberPO> groupMemberPOList);
} }

View File

@@ -3,11 +3,14 @@ package com.xiaojukeji.know.streaming.km.biz.group.impl;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager; import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic; import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
@@ -15,11 +18,15 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedDetailVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedDetailVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.constant.PaginationConstant;
import com.xiaojukeji.know.streaming.km.common.converter.GroupConverter;
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
@@ -71,30 +78,60 @@ public class GroupManagerImpl implements GroupManager {
String searchGroupKeyword, String searchGroupKeyword,
PaginationBaseDTO dto) { PaginationBaseDTO dto) {
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, groupName, searchTopicKeyword, searchGroupKeyword, dto); PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, groupName, searchTopicKeyword, searchGroupKeyword, dto);
if (paginationResult.failed()) {
return PaginationResult.buildFailure(paginationResult, dto);
}
if (!paginationResult.hasData()) { if (!paginationResult.hasData()) {
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult); return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
} }
// 获取指标 List<GroupTopicOverviewVO> groupTopicVOList = this.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
Result<List<GroupMetrics>> metricsListResult = groupMetricService.listLatestMetricsAggByGroupTopicFromES(
clusterPhyId, return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
paginationResult.getData().getBizData().stream().map(elem -> new GroupTopic(elem.getGroupName(), elem.getTopicName())).collect(Collectors.toList()), }
Arrays.asList(GroupMetricVersionItems.GROUP_METRIC_LAG),
AggTypeEnum.MAX @Override
); public PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto) {
if (metricsListResult.failed()) { Group group = groupService.getGroupFromDB(clusterPhyId, groupName);
// 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回
log.error("method=pagingGroupMembers||clusterPhyId={}||topicName={}||groupName={}||result={}||errMsg=search es failed", clusterPhyId, topicName, groupName, metricsListResult); //没有topicMember则直接返回
if (group == null || ValidateUtils.isEmptyList(group.getTopicMembers())) {
return PaginationResult.buildSuc(dto);
} }
return PaginationResult.buildSuc( //排序
this.convert2GroupTopicOverviewVOList(paginationResult.getData().getBizData(), metricsListResult.getData()), List<GroupTopicMember> groupTopicMembers = PaginationUtil.pageBySort(group.getTopicMembers(), PaginationConstant.DEFAULT_GROUP_TOPIC_SORTED_FIELD, SortTypeEnum.DESC.getSortType());
paginationResult
); //分页
PaginationResult<GroupTopicMember> paginationResult = PaginationUtil.pageBySubData(groupTopicMembers, dto);
List<GroupMemberPO> groupMemberPOList = paginationResult.getData().getBizData().stream().map(elem -> new GroupMemberPO(clusterPhyId, elem.getTopicName(), groupName, group.getState().getState(), elem.getMemberCount())).collect(Collectors.toList());
return PaginationResult.buildSuc(this.getGroupTopicOverviewVOList(clusterPhyId, groupMemberPOList), paginationResult);
}
@Override
public PaginationResult<GroupOverviewVO> pagingClusterGroupsOverview(Long clusterPhyId, ClusterGroupSummaryDTO dto) {
List<Group> groupList = groupService.listClusterGroups(clusterPhyId);
// 类型转化
List<GroupOverviewVO> voList = groupList.stream().map(elem -> GroupConverter.convert2GroupOverviewVO(elem)).collect(Collectors.toList());
// 搜索groupName
voList = PaginationUtil.pageByFuzzyFilter(voList, dto.getSearchGroupName(), Arrays.asList("name"));
//搜索topic
if (!ValidateUtils.isBlank(dto.getSearchTopicName())) {
voList = voList.stream().filter(elem -> {
for (String topicName : elem.getTopicNameList()) {
if (topicName.contains(dto.getSearchTopicName())) {
return true;
}
}
return false;
}).collect(Collectors.toList());
}
// 分页 后 返回
return PaginationUtil.pageBySubData(voList, dto);
} }
@Override @Override
@@ -104,7 +141,7 @@ public class GroupManagerImpl implements GroupManager {
List<String> latestMetricNames, List<String> latestMetricNames,
PaginationSortDTO dto) throws NotExistException, AdminOperateException { PaginationSortDTO dto) throws NotExistException, AdminOperateException {
// 获取消费组消费的TopicPartition列表 // 获取消费组消费的TopicPartition列表
Map<TopicPartition, Long> consumedOffsetMap = groupService.getGroupOffset(clusterPhyId, groupName); Map<TopicPartition, Long> consumedOffsetMap = groupService.getGroupOffsetFromKafka(clusterPhyId, groupName);
List<Integer> partitionList = consumedOffsetMap.keySet() List<Integer> partitionList = consumedOffsetMap.keySet()
.stream() .stream()
.filter(elem -> elem.topic().equals(topicName)) .filter(elem -> elem.topic().equals(topicName))
@@ -113,7 +150,7 @@ public class GroupManagerImpl implements GroupManager {
Collections.sort(partitionList); Collections.sort(partitionList);
// 获取消费组当前运行信息 // 获取消费组当前运行信息
ConsumerGroupDescription groupDescription = groupService.getGroupDescription(clusterPhyId, groupName); ConsumerGroupDescription groupDescription = groupService.getGroupDescriptionFromKafka(clusterPhyId, groupName);
// 转换存储格式 // 转换存储格式
Map<TopicPartition, MemberDescription> tpMemberMap = new HashMap<>(); Map<TopicPartition, MemberDescription> tpMemberMap = new HashMap<>();
@@ -166,13 +203,13 @@ public class GroupManagerImpl implements GroupManager {
return rv; return rv;
} }
ConsumerGroupDescription description = groupService.getGroupDescription(dto.getClusterId(), dto.getGroupName()); ConsumerGroupDescription description = groupService.getGroupDescriptionFromKafka(dto.getClusterId(), dto.getGroupName());
if (ConsumerGroupState.DEAD.equals(description.state()) && !dto.isCreateIfNotExist()) { if (ConsumerGroupState.DEAD.equals(description.state()) && !dto.isCreateIfNotExist()) {
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, "group不存在, 重置失败"); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, "group不存在, 重置失败");
} }
if (!ConsumerGroupState.EMPTY.equals(description.state()) && !ConsumerGroupState.DEAD.equals(description.state())) { if (!ConsumerGroupState.EMPTY.equals(description.state()) && !ConsumerGroupState.DEAD.equals(description.state())) {
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty | Dead 情况可重置)", GroupStateEnum.getByRawState(description.state()).getState())); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty情况可重置)", GroupStateEnum.getByRawState(description.state()).getState()));
} }
// 获取offset // 获取offset
@@ -185,6 +222,22 @@ public class GroupManagerImpl implements GroupManager {
return groupService.resetGroupOffsets(dto.getClusterId(), dto.getGroupName(), offsetMapResult.getData(), operator); return groupService.resetGroupOffsets(dto.getClusterId(), dto.getGroupName(), offsetMapResult.getData(), operator);
} }
@Override
public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList) {
// 获取指标
Result<List<GroupMetrics>> metricsListResult = groupMetricService.listLatestMetricsAggByGroupTopicFromES(
clusterPhyId,
groupMemberPOList.stream().map(elem -> new GroupTopic(elem.getGroupName(), elem.getTopicName())).collect(Collectors.toList()),
Arrays.asList(GroupMetricVersionItems.GROUP_METRIC_LAG),
AggTypeEnum.MAX
);
if (metricsListResult.failed()) {
// 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回
log.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
}
return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData());
}
/**************************************************** private method ****************************************************/ /**************************************************** private method ****************************************************/
@@ -293,4 +346,31 @@ public class GroupManagerImpl implements GroupManager {
); );
} }
private List<GroupTopicOverviewVO> convert2GroupTopicOverviewVOList(String groupName, String state, List<GroupTopicMember> groupTopicList, List<GroupMetrics> metricsList) {
if (metricsList == null) {
metricsList = new ArrayList<>();
}
// <TopicName, GroupMetrics>
Map<String, GroupMetrics> metricsMap = new HashMap<>();
for (GroupMetrics metrics : metricsList) {
if (!groupName.equals(metrics.getGroup())) continue;
metricsMap.put(metrics.getTopic(), metrics);
}
List<GroupTopicOverviewVO> voList = new ArrayList<>();
for (GroupTopicMember po : groupTopicList) {
GroupTopicOverviewVO vo = ConvertUtil.obj2Obj(po, GroupTopicOverviewVO.class);
vo.setGroupName(groupName);
vo.setState(state);
GroupMetrics metrics = metricsMap.get(po.getTopicName());
if (metrics != null) {
vo.setMaxLag(ConvertUtil.Float2Long(metrics.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG)));
}
voList.add(vo);
}
return voList;
}
} }

View File

@@ -1,8 +1,10 @@
package com.xiaojukeji.know.streaming.km.biz.topic; package com.xiaojukeji.know.streaming.km.biz.topic;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicRecordVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicRecordVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO;
@@ -23,4 +25,6 @@ public interface TopicStateManager {
Result<List<TopicPartitionVO>> getTopicPartitions(Long clusterPhyId, String topicName, List<String> metricsNames); Result<List<TopicPartitionVO>> getTopicPartitions(Long clusterPhyId, String topicName, List<String> metricsNames);
Result<TopicBrokersPartitionsSummaryVO> getTopicBrokersPartitionsSummary(Long clusterPhyId, String topicName); Result<TopicBrokersPartitionsSummaryVO> getTopicBrokersPartitionsSummary(Long clusterPhyId, String topicName);
PaginationResult<GroupTopicOverviewVO> pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto);
} }

View File

@@ -2,17 +2,22 @@ package com.xiaojukeji.know.streaming.km.biz.topic.impl;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.broker.BrokerReplicaSummaryVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.broker.BrokerReplicaSummaryVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicRecordVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicRecordVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO;
@@ -32,6 +37,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
@@ -77,6 +83,12 @@ public class TopicStateManagerImpl implements TopicStateManager {
@Autowired @Autowired
private TopicConfigService topicConfigService; private TopicConfigService topicConfigService;
@Autowired
private GroupService groupService;
@Autowired
private GroupManager groupManager;
@Override @Override
public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException { public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException {
Topic topic = topicService.getTopic(clusterPhyId, topicName); Topic topic = topicService.getTopic(clusterPhyId, topicName);
@@ -346,6 +358,19 @@ public class TopicStateManagerImpl implements TopicStateManager {
return Result.buildSuc(vo); return Result.buildSuc(vo);
} }
@Override
public PaginationResult<GroupTopicOverviewVO> pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto) {
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, "", "", searchGroupName, dto);
if (!paginationResult.hasData()) {
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
}
List<GroupTopicOverviewVO> groupTopicVOList = groupManager.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
}
/**************************************************** private method ****************************************************/ /**************************************************** private method ****************************************************/
private boolean checkIfIgnore(ConsumerRecord<String, String> consumerRecord, String filterKey, String filterValue) { private boolean checkIfIgnore(ConsumerRecord<String, String> consumerRecord, String filterKey, String filterValue) {

View File

@@ -0,0 +1,18 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.cluster;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author wyb
* @date 2022/10/17
*/
@Data
public class ClusterGroupSummaryDTO extends PaginationBaseDTO {
@ApiModelProperty("查找该Topic")
private String searchTopicName;
@ApiModelProperty("查找该Group")
private String searchGroupName;
}

View File

@@ -0,0 +1,74 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.group;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import java.util.ArrayList;
import java.util.List;
/**
* @author wyb
* @date 2022/10/10
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Group {
/**
* 集群id
*/
private Long clusterPhyId;
/**
* group类型
* @see GroupTypeEnum
*/
private GroupTypeEnum type;
/**
* group名称
*/
private String name;
/**
* group状态
* @see GroupStateEnum
*/
private GroupStateEnum state;
/**
* group成员数量
*/
private Integer memberCount;
/**
* group消费的topic列表
*/
private List<GroupTopicMember> topicMembers;
/**
* group分配策略
*/
private String partitionAssignor;
/**
* group协调器brokerId
*/
private int coordinatorId;
public Group(Long clusterPhyId, String groupName, ConsumerGroupDescription groupDescription) {
this.clusterPhyId = clusterPhyId;
this.type = groupDescription.isSimpleConsumerGroup()? GroupTypeEnum.CONSUMER: GroupTypeEnum.CONNECTOR;
this.name = groupName;
this.state = GroupStateEnum.getByRawState(groupDescription.state());
this.memberCount = groupDescription.members() == null? 0: groupDescription.members().size();
this.topicMembers = new ArrayList<>();
this.partitionAssignor = groupDescription.partitionAssignor();
this.coordinatorId = groupDescription.coordinator() == null? Constant.INVALID_CODE: groupDescription.coordinator().id();
}
}

View File

@@ -0,0 +1,27 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.group;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author wyb
* @date 2022/10/10
*/
@Data
@NoArgsConstructor
public class GroupTopicMember {
/**
* Topic名称
*/
private String topicName;
/**
* 消费此Topic的成员数量
*/
private Integer memberCount;
public GroupTopicMember(String topicName, Integer memberCount) {
this.topicName = topicName;
this.memberCount = memberCount;
}
}

View File

@@ -3,7 +3,6 @@ package com.xiaojukeji.know.streaming.km.common.bean.po.group;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO; import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@@ -23,12 +22,19 @@ public class GroupMemberPO extends BasePO {
private Integer memberCount; private Integer memberCount;
public GroupMemberPO(Long clusterPhyId, String topicName, String groupName, Date updateTime) { public GroupMemberPO(Long clusterPhyId, String topicName, String groupName, String state, Integer memberCount) {
this.clusterPhyId = clusterPhyId; this.clusterPhyId = clusterPhyId;
this.topicName = topicName; this.topicName = topicName;
this.groupName = groupName; this.groupName = groupName;
this.state = GroupStateEnum.UNKNOWN.getState(); this.state = state;
this.memberCount = 0; this.memberCount = memberCount;
}
public GroupMemberPO(Long clusterPhyId, String topicName, String groupName, String state, Integer memberCount, Date updateTime) {
this.clusterPhyId = clusterPhyId;
this.topicName = topicName;
this.groupName = groupName;
this.state = state;
this.memberCount = memberCount;
this.updateTime = updateTime; this.updateTime = updateTime;
} }
} }

View File

@@ -0,0 +1,61 @@
package com.xiaojukeji.know.streaming.km.common.bean.po.group;
import com.baomidou.mybatisplus.annotation.TableName;
import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "group")
public class GroupPO extends BasePO {
/**
* 集群id
*/
private Long clusterPhyId;
/**
* group类型
*
* @see GroupTypeEnum
*/
private Integer type;
/**
* group名称
*/
private String name;
/**
* group状态
*
* @see GroupStateEnum
*/
private String state;
/**
* group成员数量
*/
private Integer memberCount;
/**
* group消费的topic列表
*/
private String topicMembers;
/**
* group分配策略
*/
private String partitionAssignor;
/**
* group协调器brokerId
*/
private int coordinatorId;
}

View File

@@ -0,0 +1,27 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.group;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.List;
/**
* @author wyb
* @date 2022/10/9
*/
@Data
@ApiModel(value = "Group信息")
public class GroupOverviewVO {
@ApiModelProperty(value = "Group名称", example = "group-know-streaming-test")
private String name;
@ApiModelProperty(value = "Group状态", example = "Empty")
private String state;
@ApiModelProperty(value = "group的成员数", example = "12")
private Integer memberCount;
@ApiModelProperty(value = "Topic列表", example = "[topic1,topic2]")
private List<String> topicNameList;
}

View File

@@ -10,7 +10,7 @@ import lombok.Data;
*/ */
@Data @Data
@ApiModel(value = "GroupTopic信息") @ApiModel(value = "GroupTopic信息")
public class GroupTopicOverviewVO extends GroupTopicBasicVO{ public class GroupTopicOverviewVO extends GroupTopicBasicVO {
@ApiModelProperty(value = "最大Lag", example = "12345678") @ApiModelProperty(value = "最大Lag", example = "12345678")
private Long maxLag; private Long maxLag;
} }

View File

@@ -18,4 +18,14 @@ public class PaginationConstant {
* 默认页大小 * 默认页大小
*/ */
public static final Integer DEFAULT_PAGE_SIZE = 10; public static final Integer DEFAULT_PAGE_SIZE = 10;
/**
* group列表的默认排序规则
*/
public static final String DEFAULT_GROUP_SORTED_FIELD = "name";
/**
* groupTopic列表的默认排序规则
*/
public static final String DEFAULT_GROUP_TOPIC_SORTED_FIELD = "topicName";
} }

View File

@@ -0,0 +1,62 @@
package com.xiaojukeji.know.streaming.km.common.converter;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import java.util.ArrayList;
import java.util.stream.Collectors;
/**
* @author wyb
* @date 2022/10/10
*/
public class GroupConverter {
private GroupConverter() {
}
public static GroupOverviewVO convert2GroupOverviewVO(Group group) {
GroupOverviewVO vo = ConvertUtil.obj2Obj(group, GroupOverviewVO.class);
vo.setState(group.getState().getState());
vo.setTopicNameList(group.getTopicMembers().stream().map(elem -> elem.getTopicName()).collect(Collectors.toList()));
return vo;
}
public static Group convert2Group(GroupPO po) {
if (po == null) {
return null;
}
Group group = ConvertUtil.obj2Obj(po, Group.class);
if (!ValidateUtils.isBlank(po.getTopicMembers())) {
group.setTopicMembers(ConvertUtil.str2ObjArrayByJson(po.getTopicMembers(), GroupTopicMember.class));
} else {
group.setTopicMembers(new ArrayList<>());
}
group.setType(GroupTypeEnum.getTypeByCode(po.getType()));
group.setState(GroupStateEnum.getByState(po.getState()));
return group;
}
public static GroupPO convert2GroupPO(Group group) {
if (group == null) {
return null;
}
GroupPO po = ConvertUtil.obj2Obj(group, GroupPO.class);
po.setTopicMembers(ConvertUtil.obj2Json(group.getTopicMembers()));
po.setType(group.getType().getCode());
po.setState(group.getState().getState());
return po;
}
}

View File

@@ -0,0 +1,36 @@
package com.xiaojukeji.know.streaming.km.common.enums.group;
import lombok.Getter;
/**
* @author wyb
* @date 2022/10/11
*/
@Getter
public enum GroupTypeEnum {
UNKNOWN(-1, "Unknown"),
CONSUMER(0, "Consumer客户端的消费组"),
CONNECTOR(1, "Connector的消费组");
private final Integer code;
private final String msg;
GroupTypeEnum(Integer code, String msg) {
this.code = code;
this.msg = msg;
}
public static GroupTypeEnum getTypeByCode(Integer code) {
if (code == null) return UNKNOWN;
for (GroupTypeEnum groupTypeEnum : GroupTypeEnum.values()) {
if (groupTypeEnum.code.equals(code)) {
return groupTypeEnum;
}
}
return UNKNOWN;
}
}

View File

@@ -1,6 +1,7 @@
package com.xiaojukeji.know.streaming.km.core.service.group; package com.xiaojukeji.know.streaming.km.core.service.group;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
@@ -16,27 +17,47 @@ import java.util.Map;
public interface GroupService { public interface GroupService {
/** /**
* 从Kafka中获取消费组 * 从Kafka中获取消费组名称列表
* @param clusterPhyId 集群ID
* @return
* @throws NotExistException
* @throws AdminOperateException
*/ */
List<String> listGroupsFromKafka(Long clusterPhyId) throws NotExistException, AdminOperateException; List<String> listGroupsFromKafka(Long clusterPhyId) throws NotExistException, AdminOperateException;
Map<TopicPartition, Long> getGroupOffset(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException; /**
* 从Kafka中获取消费组详细信息
*/
Group getGroupFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException;
ConsumerGroupDescription getGroupDescription(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException; Map<TopicPartition, Long> getGroupOffsetFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException;
int replaceDBData(GroupMemberPO groupMemberPO); ConsumerGroupDescription getGroupDescriptionFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException;
void batchReplace(List<GroupMemberPO> newGroupMemberList); Result<Void> resetGroupOffsets(Long clusterPhyId, String groupName, Map<TopicPartition, Long> offsetMap, String operator) throws NotExistException, AdminOperateException;
/**
* 批量更新DB
*/
void batchReplaceGroupsAndMembers(Long clusterPhyId, List<Group> newGroupList, long updateTime);
int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime);
/**
* DB-Group相关接口
*/
GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName); GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName);
List<GroupMemberPO> listGroupByTopic(Long clusterPhyId, String topicName); Group getGroupFromDB(Long clusterPhyId, String groupName);
List<GroupMemberPO> listGroup(Long clusterPhyId); List<Group> listClusterGroups(Long clusterPhyId);
List<String> getGroupsFromDB(Long clusterPhyId);
Integer calGroupCount(Long clusterPhyId);
Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum);
/**
* DB-GroupTopic相关接口
*/
List<GroupMemberPO> listGroupByTopic(Long clusterPhyId, String topicName);
PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId, PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId,
String topicName, String topicName,
@@ -45,15 +66,5 @@ public interface GroupService {
String searchGroupKeyword, String searchGroupKeyword,
PaginationBaseDTO dto); PaginationBaseDTO dto);
int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime); GroupMemberPO getGroupTopicFromDB(Long clusterPhyId, String groupName, String topicName);
}
List<String> getGroupsFromDB(Long clusterPhyId);
GroupMemberPO getGroupFromDB(Long clusterPhyId, String groupName, String topicName);
Integer calGroupCount(Long clusterPhyId);
Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum);
Result<Void> resetGroupOffsets(Long clusterPhyId, String groupName, Map<TopicPartition, Long> offsetMap, String operator) throws NotExistException, AdminOperateException;
}

View File

@@ -24,7 +24,6 @@ import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreSer
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -183,7 +182,7 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe
List<GroupMetrics> metricsList = new ArrayList<>(); List<GroupMetrics> metricsList = new ArrayList<>();
try { try {
Map<TopicPartition, Long> groupOffsetMap = groupService.getGroupOffset(clusterId, groupName); Map<TopicPartition, Long> groupOffsetMap = groupService.getGroupOffsetFromKafka(clusterId, groupName);
// 组织 GROUP_METRIC_OFFSET_CONSUMED 指标 // 组织 GROUP_METRIC_OFFSET_CONSUMED 指标
for (Map.Entry<TopicPartition, Long> entry: groupOffsetMap.entrySet()) { for (Map.Entry<TopicPartition, Long> entry: groupOffsetMap.entrySet()) {

View File

@@ -7,11 +7,15 @@ import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.converter.GroupConverter;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
@@ -24,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupDAO;
import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupMemberDAO; import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupMemberDAO;
import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -41,6 +46,9 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
public class GroupServiceImpl extends BaseVersionControlService implements GroupService { public class GroupServiceImpl extends BaseVersionControlService implements GroupService {
private static final ILog log = LogFactory.getLog(GroupServiceImpl.class); private static final ILog log = LogFactory.getLog(GroupServiceImpl.class);
@Autowired
private GroupDAO groupDAO;
@Autowired @Autowired
private GroupMemberDAO groupMemberDAO; private GroupMemberDAO groupMemberDAO;
@@ -79,7 +87,43 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
} }
@Override @Override
public Map<TopicPartition, Long> getGroupOffset(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException { public Group getGroupFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException {
// 获取消费组的详细信息
ConsumerGroupDescription groupDescription = this.getGroupDescriptionFromKafka(clusterPhyId, groupName);
if (groupDescription == null) {
return null;
}
Group group = new Group(clusterPhyId, groupName, groupDescription);
// 获取消费组消费过哪些Topic
Map<String, GroupTopicMember> memberMap = new HashMap<>();
for (TopicPartition tp : this.getGroupOffsetFromKafka(clusterPhyId, groupName).keySet()) {
memberMap.putIfAbsent(tp.topic(), new GroupTopicMember(tp.topic(), 0));
}
// 记录成员信息
for (MemberDescription memberDescription : groupDescription.members()) {
Set<TopicPartition> partitionList = new HashSet<>();
if (!ValidateUtils.isNull(memberDescription.assignment().topicPartitions())) {
partitionList = memberDescription.assignment().topicPartitions();
}
Set<String> topicNameSet = partitionList.stream().map(elem -> elem.topic()).collect(Collectors.toSet());
for (String topicName : topicNameSet) {
memberMap.putIfAbsent(topicName, new GroupTopicMember(topicName, 0));
GroupTopicMember member = memberMap.get(topicName);
member.setMemberCount(member.getMemberCount() + 1);
}
}
group.setTopicMembers(memberMap.values().stream().collect(Collectors.toList()));
return group;
}
@Override
public Map<TopicPartition, Long> getGroupOffsetFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException {
AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId); AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId);
Map<TopicPartition, Long> offsetMap = new HashMap<>(); Map<TopicPartition, Long> offsetMap = new HashMap<>();
@@ -99,12 +143,12 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
} }
@Override @Override
public ConsumerGroupDescription getGroupDescription(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException { public ConsumerGroupDescription getGroupDescriptionFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException {
AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId); AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId);
try { try {
DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups( DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups(
Collections.singletonList(groupName), Arrays.asList(groupName),
new DescribeConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).includeAuthorizedOperations(false) new DescribeConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).includeAuthorizedOperations(false)
); );
@@ -117,40 +161,12 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
} }
@Override @Override
public int replaceDBData(GroupMemberPO groupMemberPO) { public void batchReplaceGroupsAndMembers(Long clusterPhyId, List<Group> newGroupList, long updateTime) {
return groupMemberDAO.replace(groupMemberPO); // 更新Group信息
} this.batchReplaceGroups(clusterPhyId, newGroupList, updateTime);
@Override
public void batchReplace(List<GroupMemberPO> newGroupMemberList) {
if (newGroupMemberList == null || newGroupMemberList.isEmpty()) {
return;
}
Long clusterPhyId = newGroupMemberList.get(0).getClusterPhyId();
if (clusterPhyId == null) {
return;
}
List<GroupMemberPO> dbGroupMemberList = listGroup(clusterPhyId);
Map<String, GroupMemberPO> dbGroupMemberMap = dbGroupMemberList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity()));
for (GroupMemberPO groupMemberPO : newGroupMemberList) {
GroupMemberPO po = dbGroupMemberMap.remove(groupMemberPO.getGroupName() + groupMemberPO.getTopicName());
try {
if (po != null) {
groupMemberPO.setId(po.getId());
groupMemberDAO.updateById(groupMemberPO);
} else {
groupMemberDAO.insert(groupMemberPO);
}
} catch (Exception e) {
log.error("method=batchReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, groupMemberPO.getGroupName(), e);
}
}
// 更新Group-Topic信息
this.batchReplaceGroupMembers(clusterPhyId, newGroupList, updateTime);
} }
@Override @Override
@@ -176,14 +192,6 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
return groupMemberDAO.selectList(lambdaQueryWrapper); return groupMemberDAO.selectList(lambdaQueryWrapper);
} }
@Override
public List<GroupMemberPO> listGroup(Long clusterPhyId) {
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
return groupMemberDAO.selectList(lambdaQueryWrapper);
}
@Override @Override
public PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId, public PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId,
String topicName, String topicName,
@@ -208,8 +216,33 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
return PaginationResult.buildSuc(iPage.getRecords(), iPage); return PaginationResult.buildSuc(iPage.getRecords(), iPage);
} }
@Override
public Group getGroupFromDB(Long clusterPhyId, String groupName) {
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.eq(GroupPO::getName, groupName);
GroupPO groupPO = groupDAO.selectOne(lambdaQueryWrapper);
return GroupConverter.convert2Group(groupPO);
}
@Override
public List<Group> listClusterGroups(Long clusterPhyId) {
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
return groupDAO.selectList(lambdaQueryWrapper).stream().map(elem -> GroupConverter.convert2Group(elem)).collect(Collectors.toList());
}
@Override @Override
public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) { public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) {
// 删除过期Group信息
LambdaQueryWrapper<GroupPO> groupPOLambdaQueryWrapper = new LambdaQueryWrapper<>();
groupPOLambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
groupPOLambdaQueryWrapper.le(GroupPO::getUpdateTime, beforeTime);
groupDAO.delete(groupPOLambdaQueryWrapper);
// 删除过期GroupMember信息
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
queryWrapper.le(GroupMemberPO::getUpdateTime, beforeTime); queryWrapper.le(GroupMemberPO::getUpdateTime, beforeTime);
@@ -218,17 +251,19 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
@Override @Override
public List<String> getGroupsFromDB(Long clusterPhyId) { public List<String> getGroupsFromDB(Long clusterPhyId) {
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
List<GroupMemberPO> poList = groupMemberDAO.selectList(queryWrapper);
List<GroupPO> poList = groupDAO.selectList(lambdaQueryWrapper);
if (poList == null) { if (poList == null) {
poList = new ArrayList<>(); poList = new ArrayList<>();
} }
return new ArrayList<>(poList.stream().map(elem -> elem.getGroupName()).collect(Collectors.toSet()));
return new ArrayList<>(poList.stream().map(elem -> elem.getName()).collect(Collectors.toSet()));
} }
@Override @Override
public GroupMemberPO getGroupFromDB(Long clusterPhyId, String groupName, String topicName) { public GroupMemberPO getGroupTopicFromDB(Long clusterPhyId, String groupName, String topicName) {
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
queryWrapper.eq(GroupMemberPO::getTopicName, topicName); queryWrapper.eq(GroupMemberPO::getTopicName, topicName);
@@ -239,28 +274,19 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
@Override @Override
public Integer calGroupCount(Long clusterPhyId) { public Integer calGroupCount(Long clusterPhyId) {
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
List<GroupMemberPO> poList = groupMemberDAO.selectList(queryWrapper);
if (poList == null) {
poList = new ArrayList<>();
}
return poList.stream().map(elem -> elem.getGroupName()).collect(Collectors.toSet()).size(); return groupDAO.selectCount(lambdaQueryWrapper);
} }
@Override @Override
public Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum) { public Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum) {
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
queryWrapper.eq(GroupMemberPO::getState, stateEnum.getState()); lambdaQueryWrapper.eq(GroupPO::getState, stateEnum.getState());
List<GroupMemberPO> poList = groupMemberDAO.selectList(queryWrapper); return groupDAO.selectCount(lambdaQueryWrapper);
if (poList == null) {
poList = new ArrayList<>();
}
return poList.stream().map(elem -> elem.getGroupName()).collect(Collectors.toSet()).size();
} }
@Override @Override
@@ -303,4 +329,74 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
/**************************************************** private method ****************************************************/ /**************************************************** private method ****************************************************/
private void batchReplaceGroupMembers(Long clusterPhyId, List<Group> newGroupList, long updateTime) {
if (ValidateUtils.isEmptyList(newGroupList)) {
return;
}
List<GroupMemberPO> dbPOList = this.listClusterGroupsMemberPO(clusterPhyId);
Map<String, GroupMemberPO> dbPOMap = dbPOList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity()));
for (Group group: newGroupList) {
for (GroupTopicMember member : group.getTopicMembers()) {
try {
GroupMemberPO newPO = new GroupMemberPO(clusterPhyId, member.getTopicName(), group.getName(), group.getState().getState(), member.getMemberCount(), new Date(updateTime));
GroupMemberPO dbPO = dbPOMap.remove(newPO.getGroupName() + newPO.getTopicName());
if (dbPO != null) {
newPO.setId(dbPO.getId());
groupMemberDAO.updateById(newPO);
continue;
}
groupMemberDAO.insert(newPO);
} catch (Exception e) {
log.error(
"method=batchReplaceGroupMembers||clusterPhyId={}||groupName={}||topicName={}||errMsg=exception",
clusterPhyId, group.getName(), member.getTopicName(), e
);
}
}
}
}
private void batchReplaceGroups(Long clusterPhyId, List<Group> newGroupList, long updateTime) {
if (ValidateUtils.isEmptyList(newGroupList)) {
return;
}
List<GroupPO> dbGroupList = this.listClusterGroupsPO(clusterPhyId);
Map<String, GroupPO> dbGroupMap = dbGroupList.stream().collect(Collectors.toMap(elem -> elem.getName(), Function.identity()));
for (Group newGroup: newGroupList) {
try {
GroupPO newPO = GroupConverter.convert2GroupPO(newGroup);
newPO.setUpdateTime(new Date(updateTime));
GroupPO dbPO = dbGroupMap.remove(newGroup.getName());
if (dbPO != null) {
newPO.setId(dbPO.getId());
groupDAO.updateById(newPO);
continue;
}
groupDAO.insert(newPO);
} catch (Exception e) {
log.error("method=batchGroupReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, newGroup.getName(), e);
}
}
}
private List<GroupPO> listClusterGroupsPO(Long clusterPhyId) {
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
return groupDAO.selectList(lambdaQueryWrapper);
}
private List<GroupMemberPO> listClusterGroupsMemberPO(Long clusterPhyId) {
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
return groupMemberDAO.selectList(lambdaQueryWrapper);
}
} }

View File

@@ -0,0 +1,9 @@
package com.xiaojukeji.know.streaming.km.persistence.mysql.group;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO;
import org.springframework.stereotype.Repository;
@Repository
public interface GroupDAO extends BaseMapper<GroupPO> {
}

View File

@@ -1,12 +1,15 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster; package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster;
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager; import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupsOverviewDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupsOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricGroupPartitionDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricGroupPartitionDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.field.PaginationFuzzySearchFieldDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.field.PaginationFuzzySearchFieldDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicPartitionKS;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
@@ -37,7 +40,8 @@ public class ClusterGroupsController {
@Autowired @Autowired
private GroupMetricService groupMetricService; private GroupMetricService groupMetricService;
@ApiOperation(value = "集群Groups信息列表") @Deprecated
@ApiOperation(value = "集群Groups信息列表", notes = "废弃, 下一个版本删除")
@PostMapping(value = "clusters/{clusterPhyId}/groups-overview") @PostMapping(value = "clusters/{clusterPhyId}/groups-overview")
@ResponseBody @ResponseBody
public PaginationResult<GroupTopicOverviewVO> getClusterPhyGroupsOverview(@PathVariable Long clusterPhyId, public PaginationResult<GroupTopicOverviewVO> getClusterPhyGroupsOverview(@PathVariable Long clusterPhyId,
@@ -53,6 +57,13 @@ public class ClusterGroupsController {
); );
} }
@ApiOperation(value = "集群Groups信息列表")
@GetMapping(value = "clusters/{clusterPhyId}/groups-overview")
@ResponseBody
public PaginationResult<GroupOverviewVO> getGroupsOverview(@PathVariable Long clusterPhyId, ClusterGroupSummaryDTO dto) {
return groupManager.pagingClusterGroupsOverview(clusterPhyId, dto);
}
@ApiOperation(value = "集群Groups指标信息") @ApiOperation(value = "集群Groups指标信息")
@PostMapping(value = "clusters/{clusterPhyId}/group-metrics") @PostMapping(value = "clusters/{clusterPhyId}/group-metrics")
@ResponseBody @ResponseBody
@@ -70,8 +81,17 @@ public class ClusterGroupsController {
return groupManager.listClusterPhyGroupPartitions(clusterPhyId, groupName, startTime, endTime); return groupManager.listClusterPhyGroupPartitions(clusterPhyId, groupName, startTime, endTime);
} }
@ApiOperation(value = "Group的Topic列表")
@GetMapping(value = "clusters/{clusterPhyId}/groups/{groupName}/topics-overview")
public PaginationResult<GroupTopicOverviewVO> getGroupTopicsOverview(@PathVariable Long clusterPhyId,
@PathVariable String groupName,
PaginationBaseDTO dto) {
return groupManager.pagingGroupTopicMembers(clusterPhyId, groupName, dto);
}
/**************************************************** private method ****************************************************/ /**************************************************** private method ****************************************************/
@Deprecated
private Tuple<String, String> getSearchKeyWords(ClusterGroupsOverviewDTO dto) { private Tuple<String, String> getSearchKeyWords(ClusterGroupsOverviewDTO dto) {
if (ValidateUtils.isEmptyList(dto.getFuzzySearchDTOList())) { if (ValidateUtils.isEmptyList(dto.getFuzzySearchDTOList())) {
return new Tuple<>("", ""); return new Tuple<>("", "");

View File

@@ -55,7 +55,7 @@ public class GroupController {
public Result<GroupMetadataCombineExistVO> getGroupMetadataCombineExist(@PathVariable Long clusterPhyId, public Result<GroupMetadataCombineExistVO> getGroupMetadataCombineExist(@PathVariable Long clusterPhyId,
@PathVariable String groupName, @PathVariable String groupName,
@PathVariable String topicName) { @PathVariable String topicName) {
GroupMemberPO po = groupService.getGroupFromDB(clusterPhyId, groupName, topicName); GroupMemberPO po = groupService.getGroupTopicFromDB(clusterPhyId, groupName, topicName);
if (po == null) { if (po == null) {
return Result.buildSuc(new GroupMetadataCombineExistVO(clusterPhyId, groupName, topicName, false)); return Result.buildSuc(new GroupMetadataCombineExistVO(clusterPhyId, groupName, topicName, false));
} }

View File

@@ -3,7 +3,6 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.topic;
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
@@ -11,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.acl.AclBindingVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.acl.AclBindingVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicBasicVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicBasicVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicStateVO;
@@ -136,8 +136,17 @@ public class TopicStateController {
@ApiOperation(value = "TopicGroups基本信息列表") @ApiOperation(value = "TopicGroups基本信息列表")
@GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/groups-basic") @GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/groups-basic")
@ResponseBody @ResponseBody
public Result<List<GroupTopicBasicVO>> getTopicGroupsBasic(@PathVariable Long clusterPhyId, public Result<List<GroupTopicBasicVO>> getTopicGroupsBasic(@PathVariable Long clusterPhyId, @PathVariable String topicName) {
@PathVariable String topicName) {
return Result.buildSuc(ConvertUtil.list2List(groupService.listGroupByTopic(clusterPhyId, topicName), GroupTopicBasicVO.class)); return Result.buildSuc(ConvertUtil.list2List(groupService.listGroupByTopic(clusterPhyId, topicName), GroupTopicBasicVO.class));
} }
@ApiOperation("Topic的Group列表")
@GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/groups-overview")
public PaginationResult<GroupTopicOverviewVO> getTopicGroupsOverview(@PathVariable Long clusterPhyId,
@PathVariable String topicName,
@RequestParam(required = false) String searchGroupName,
PaginationBaseDTO dto) {
return topicStateManager.pagingTopicGroupsOverview(clusterPhyId, topicName, searchGroupName, dto);
}
} }

View File

@@ -6,15 +6,10 @@ import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.util.*; import java.util.*;
@@ -38,98 +33,58 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
@Override @Override
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
// 获取集群的Group列表
List<String> groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId()); List<String> groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId());
TaskResult tr = updateGroupMembersTask(clusterPhy, groupNameList, triggerTimeUnitMs);
if (!TaskResult.SUCCESS.equals(tr)) { TaskResult allSuccess = TaskResult.SUCCESS;
return tr;
// 获取Group详细信息
List<Group> groupList = new ArrayList<>();
for (String groupName : groupNameList) {
try {
Group group = groupService.getGroupFromKafka(clusterPhy.getId(), groupName);
if (group == null) {
continue;
}
groupList.add(group);
} catch (Exception e) {
log.error("method=processClusterTask||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhy.getId(), groupName, e);
allSuccess = TaskResult.FAIL;
}
}
// 过滤掉无效的Topic
this.filterTopicIfTopicNotExist(clusterPhy.getId(), groupList);
// 更新DB中的Group信息
groupService.batchReplaceGroupsAndMembers(clusterPhy.getId(), groupList, triggerTimeUnitMs);
// 如果存在错误,则直接返回
if (!TaskResult.SUCCESS.equals(allSuccess)) {
return allSuccess;
} }
// 删除历史的Group // 删除历史的Group
groupService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 5 * 60 * 1000)); groupService.deleteByUpdateTimeBeforeInDB(clusterPhy.getId(), new Date(triggerTimeUnitMs - 5 * 60 * 1000));
return tr; return allSuccess;
} }
private void filterTopicIfTopicNotExist(Long clusterPhyId, List<Group> groupList) {
private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List<String> groupNameList, long triggerTimeUnitMs) { if (ValidateUtils.isEmptyList(groupList)) {
List<GroupMemberPO> groupMemberPOList = new ArrayList<>(); return;
TaskResult tr = TaskResult.SUCCESS;
for (String groupName : groupNameList) {
try {
List<GroupMemberPO> poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs));
groupMemberPOList.addAll(poList);
} catch (Exception e) {
log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhy.getId(), groupName, e);
tr = TaskResult.FAIL;
}
}
groupMemberPOList = this.filterGroupIfTopicNotExist(clusterPhy.getId(), groupMemberPOList);
groupService.batchReplace(groupMemberPOList);
return tr;
}
private List<GroupMemberPO> getGroupMembers(Long clusterPhyId, String groupName, Date updateTime) throws NotExistException, AdminOperateException {
Map<String, GroupMemberPO> groupMap = new HashMap<>();
// 获取消费组消费过哪些Topic
Map<TopicPartition, Long> offsetMap = groupService.getGroupOffset(clusterPhyId, groupName);
for (TopicPartition topicPartition : offsetMap.keySet()) {
GroupMemberPO po = groupMap.get(topicPartition.topic());
if (po == null) {
po = new GroupMemberPO(clusterPhyId, topicPartition.topic(), groupName, updateTime);
}
groupMap.put(topicPartition.topic(), po);
}
// 在上面的基础上,补充消费组的详细信息
ConsumerGroupDescription consumerGroupDescription = groupService.getGroupDescription(clusterPhyId, groupName);
if (consumerGroupDescription == null) {
return new ArrayList<>(groupMap.values());
}
groupMap.forEach((key, val) -> val.setState(GroupStateEnum.getByRawState(consumerGroupDescription.state()).getState()));
for (MemberDescription memberDescription : consumerGroupDescription.members()) {
Set<TopicPartition> partitionList = new HashSet<>();
if (!ValidateUtils.isNull(memberDescription.assignment().topicPartitions())) {
partitionList = memberDescription.assignment().topicPartitions();
}
Set<String> topicNameSet = partitionList.stream().map(elem -> elem.topic()).collect(Collectors.toSet());
for (String topicName : topicNameSet) {
groupMap.putIfAbsent(topicName, new GroupMemberPO(clusterPhyId, topicName, groupName, updateTime));
GroupMemberPO po = groupMap.get(topicName);
po.setMemberCount(po.getMemberCount() + 1);
po.setState(GroupStateEnum.getByRawState(consumerGroupDescription.state()).getState());
}
}
// 如果该消费组没有正在消费任何Topic的特殊情况但是这个Group存在
if (groupMap.isEmpty()) {
GroupMemberPO po = new GroupMemberPO(clusterPhyId, "", groupName, updateTime);
po.setState(GroupStateEnum.getByRawState(consumerGroupDescription.state()).getState());
groupMap.put("", po);
}
return new ArrayList<>(groupMap.values());
}
private List<GroupMemberPO> filterGroupIfTopicNotExist(Long clusterPhyId, List<GroupMemberPO> poList) {
if (poList.isEmpty()) {
return poList;
} }
// 集群Topic集合 // 集群Topic集合
Set<String> dbTopicSet = topicService.listTopicsFromDB(clusterPhyId).stream().map(elem -> elem.getTopicName()).collect(Collectors.toSet()); Set<String> dbTopicSet = topicService.listTopicsFromDB(clusterPhyId).stream().map(elem -> elem.getTopicName()).collect(Collectors.toSet());
dbTopicSet.add(""); //兼容没有消费Topic的group dbTopicSet.add(""); //兼容没有消费Topic的group
// 过滤Topic不存在的消费组 // 过滤Topic不存在的消费组
return poList.stream().filter(elem -> dbTopicSet.contains(elem.getTopicName())).collect(Collectors.toList()); for (Group group: groupList) {
group.setTopicMembers(
group.getTopicMembers().stream().filter(elem -> dbTopicSet.contains(elem.getTopicName())).collect(Collectors.toList())
);
}
} }
} }