[Feature] 集群Group列表按照Group维度进行展示 (#580)

This commit is contained in:
zengqiao
2022-10-20 11:48:27 +08:00
committed by EricZeng
parent ff26a8d46c
commit 79864955e1
22 changed files with 744 additions and 206 deletions

View File

@@ -1,6 +1,7 @@
package com.xiaojukeji.know.streaming.km.core.service.group;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
@@ -16,27 +17,47 @@ import java.util.Map;
public interface GroupService {
/**
* 从Kafka中获取消费组
* @param clusterPhyId 集群ID
* @return
* @throws NotExistException
* @throws AdminOperateException
* 从Kafka中获取消费组名称列表
*/
List<String> listGroupsFromKafka(Long clusterPhyId) throws NotExistException, AdminOperateException;
Map<TopicPartition, Long> getGroupOffset(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException;
/**
* 从Kafka中获取消费组详细信息
*/
Group getGroupFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException;
ConsumerGroupDescription getGroupDescription(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException;
Map<TopicPartition, Long> getGroupOffsetFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException;
int replaceDBData(GroupMemberPO groupMemberPO);
ConsumerGroupDescription getGroupDescriptionFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException;
void batchReplace(List<GroupMemberPO> newGroupMemberList);
Result<Void> resetGroupOffsets(Long clusterPhyId, String groupName, Map<TopicPartition, Long> offsetMap, String operator) throws NotExistException, AdminOperateException;
/**
* 批量更新DB
*/
void batchReplaceGroupsAndMembers(Long clusterPhyId, List<Group> newGroupList, long updateTime);
int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime);
/**
* DB-Group相关接口
*/
GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName);
List<GroupMemberPO> listGroupByTopic(Long clusterPhyId, String topicName);
Group getGroupFromDB(Long clusterPhyId, String groupName);
List<GroupMemberPO> listGroup(Long clusterPhyId);
List<Group> listClusterGroups(Long clusterPhyId);
List<String> getGroupsFromDB(Long clusterPhyId);
Integer calGroupCount(Long clusterPhyId);
Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum);
/**
* DB-GroupTopic相关接口
*/
List<GroupMemberPO> listGroupByTopic(Long clusterPhyId, String topicName);
PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId,
String topicName,
@@ -45,15 +66,5 @@ public interface GroupService {
String searchGroupKeyword,
PaginationBaseDTO dto);
int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime);
List<String> getGroupsFromDB(Long clusterPhyId);
GroupMemberPO getGroupFromDB(Long clusterPhyId, String groupName, String topicName);
Integer calGroupCount(Long clusterPhyId);
Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum);
Result<Void> resetGroupOffsets(Long clusterPhyId, String groupName, Map<TopicPartition, Long> offsetMap, String operator) throws NotExistException, AdminOperateException;
}
GroupMemberPO getGroupTopicFromDB(Long clusterPhyId, String groupName, String topicName);
}

View File

@@ -24,7 +24,6 @@ import com.xiaojukeji.know.streaming.km.core.service.health.score.HealthScoreSer
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
@@ -183,7 +182,7 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe
List<GroupMetrics> metricsList = new ArrayList<>();
try {
Map<TopicPartition, Long> groupOffsetMap = groupService.getGroupOffset(clusterId, groupName);
Map<TopicPartition, Long> groupOffsetMap = groupService.getGroupOffsetFromKafka(clusterId, groupName);
// 组织 GROUP_METRIC_OFFSET_CONSUMED 指标
for (Map.Entry<TopicPartition, Long> entry: groupOffsetMap.entrySet()) {

View File

@@ -7,11 +7,15 @@ import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.converter.GroupConverter;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
@@ -24,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupDAO;
import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupMemberDAO;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -41,6 +46,9 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
public class GroupServiceImpl extends BaseVersionControlService implements GroupService {
private static final ILog log = LogFactory.getLog(GroupServiceImpl.class);
@Autowired
private GroupDAO groupDAO;
@Autowired
private GroupMemberDAO groupMemberDAO;
@@ -79,7 +87,43 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
}
@Override
public Map<TopicPartition, Long> getGroupOffset(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException {
public Group getGroupFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException {
// 获取消费组的详细信息
ConsumerGroupDescription groupDescription = this.getGroupDescriptionFromKafka(clusterPhyId, groupName);
if (groupDescription == null) {
return null;
}
Group group = new Group(clusterPhyId, groupName, groupDescription);
// 获取消费组消费过哪些Topic
Map<String, GroupTopicMember> memberMap = new HashMap<>();
for (TopicPartition tp : this.getGroupOffsetFromKafka(clusterPhyId, groupName).keySet()) {
memberMap.putIfAbsent(tp.topic(), new GroupTopicMember(tp.topic(), 0));
}
// 记录成员信息
for (MemberDescription memberDescription : groupDescription.members()) {
Set<TopicPartition> partitionList = new HashSet<>();
if (!ValidateUtils.isNull(memberDescription.assignment().topicPartitions())) {
partitionList = memberDescription.assignment().topicPartitions();
}
Set<String> topicNameSet = partitionList.stream().map(elem -> elem.topic()).collect(Collectors.toSet());
for (String topicName : topicNameSet) {
memberMap.putIfAbsent(topicName, new GroupTopicMember(topicName, 0));
GroupTopicMember member = memberMap.get(topicName);
member.setMemberCount(member.getMemberCount() + 1);
}
}
group.setTopicMembers(memberMap.values().stream().collect(Collectors.toList()));
return group;
}
@Override
public Map<TopicPartition, Long> getGroupOffsetFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException {
AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId);
Map<TopicPartition, Long> offsetMap = new HashMap<>();
@@ -99,12 +143,12 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
}
@Override
public ConsumerGroupDescription getGroupDescription(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException {
public ConsumerGroupDescription getGroupDescriptionFromKafka(Long clusterPhyId, String groupName) throws NotExistException, AdminOperateException {
AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId);
try {
DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups(
Collections.singletonList(groupName),
Arrays.asList(groupName),
new DescribeConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS).includeAuthorizedOperations(false)
);
@@ -117,40 +161,12 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
}
@Override
public int replaceDBData(GroupMemberPO groupMemberPO) {
return groupMemberDAO.replace(groupMemberPO);
}
@Override
public void batchReplace(List<GroupMemberPO> newGroupMemberList) {
if (newGroupMemberList == null || newGroupMemberList.isEmpty()) {
return;
}
Long clusterPhyId = newGroupMemberList.get(0).getClusterPhyId();
if (clusterPhyId == null) {
return;
}
List<GroupMemberPO> dbGroupMemberList = listGroup(clusterPhyId);
Map<String, GroupMemberPO> dbGroupMemberMap = dbGroupMemberList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity()));
for (GroupMemberPO groupMemberPO : newGroupMemberList) {
GroupMemberPO po = dbGroupMemberMap.remove(groupMemberPO.getGroupName() + groupMemberPO.getTopicName());
try {
if (po != null) {
groupMemberPO.setId(po.getId());
groupMemberDAO.updateById(groupMemberPO);
} else {
groupMemberDAO.insert(groupMemberPO);
}
} catch (Exception e) {
log.error("method=batchReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, groupMemberPO.getGroupName(), e);
}
}
public void batchReplaceGroupsAndMembers(Long clusterPhyId, List<Group> newGroupList, long updateTime) {
// 更新Group信息
this.batchReplaceGroups(clusterPhyId, newGroupList, updateTime);
// 更新Group-Topic信息
this.batchReplaceGroupMembers(clusterPhyId, newGroupList, updateTime);
}
@Override
@@ -176,14 +192,6 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
return groupMemberDAO.selectList(lambdaQueryWrapper);
}
@Override
public List<GroupMemberPO> listGroup(Long clusterPhyId) {
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
return groupMemberDAO.selectList(lambdaQueryWrapper);
}
@Override
public PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId,
String topicName,
@@ -208,8 +216,33 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
return PaginationResult.buildSuc(iPage.getRecords(), iPage);
}
@Override
public Group getGroupFromDB(Long clusterPhyId, String groupName) {
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.eq(GroupPO::getName, groupName);
GroupPO groupPO = groupDAO.selectOne(lambdaQueryWrapper);
return GroupConverter.convert2Group(groupPO);
}
@Override
public List<Group> listClusterGroups(Long clusterPhyId) {
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
return groupDAO.selectList(lambdaQueryWrapper).stream().map(elem -> GroupConverter.convert2Group(elem)).collect(Collectors.toList());
}
@Override
public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) {
// 删除过期Group信息
LambdaQueryWrapper<GroupPO> groupPOLambdaQueryWrapper = new LambdaQueryWrapper<>();
groupPOLambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
groupPOLambdaQueryWrapper.le(GroupPO::getUpdateTime, beforeTime);
groupDAO.delete(groupPOLambdaQueryWrapper);
// 删除过期GroupMember信息
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
queryWrapper.le(GroupMemberPO::getUpdateTime, beforeTime);
@@ -218,17 +251,19 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
@Override
public List<String> getGroupsFromDB(Long clusterPhyId) {
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
List<GroupMemberPO> poList = groupMemberDAO.selectList(queryWrapper);
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
List<GroupPO> poList = groupDAO.selectList(lambdaQueryWrapper);
if (poList == null) {
poList = new ArrayList<>();
}
return new ArrayList<>(poList.stream().map(elem -> elem.getGroupName()).collect(Collectors.toSet()));
return new ArrayList<>(poList.stream().map(elem -> elem.getName()).collect(Collectors.toSet()));
}
@Override
public GroupMemberPO getGroupFromDB(Long clusterPhyId, String groupName, String topicName) {
public GroupMemberPO getGroupTopicFromDB(Long clusterPhyId, String groupName, String topicName) {
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
queryWrapper.eq(GroupMemberPO::getTopicName, topicName);
@@ -239,28 +274,19 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
@Override
public Integer calGroupCount(Long clusterPhyId) {
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
List<GroupMemberPO> poList = groupMemberDAO.selectList(queryWrapper);
if (poList == null) {
poList = new ArrayList<>();
}
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
return poList.stream().map(elem -> elem.getGroupName()).collect(Collectors.toSet()).size();
return groupDAO.selectCount(lambdaQueryWrapper);
}
@Override
public Integer calGroupStatCount(Long clusterPhyId, GroupStateEnum stateEnum) {
LambdaQueryWrapper<GroupMemberPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
queryWrapper.eq(GroupMemberPO::getState, stateEnum.getState());
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.eq(GroupPO::getState, stateEnum.getState());
List<GroupMemberPO> poList = groupMemberDAO.selectList(queryWrapper);
if (poList == null) {
poList = new ArrayList<>();
}
return poList.stream().map(elem -> elem.getGroupName()).collect(Collectors.toSet()).size();
return groupDAO.selectCount(lambdaQueryWrapper);
}
@Override
@@ -303,4 +329,74 @@ public class GroupServiceImpl extends BaseVersionControlService implements Group
/**************************************************** private method ****************************************************/
private void batchReplaceGroupMembers(Long clusterPhyId, List<Group> newGroupList, long updateTime) {
if (ValidateUtils.isEmptyList(newGroupList)) {
return;
}
List<GroupMemberPO> dbPOList = this.listClusterGroupsMemberPO(clusterPhyId);
Map<String, GroupMemberPO> dbPOMap = dbPOList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity()));
for (Group group: newGroupList) {
for (GroupTopicMember member : group.getTopicMembers()) {
try {
GroupMemberPO newPO = new GroupMemberPO(clusterPhyId, member.getTopicName(), group.getName(), group.getState().getState(), member.getMemberCount(), new Date(updateTime));
GroupMemberPO dbPO = dbPOMap.remove(newPO.getGroupName() + newPO.getTopicName());
if (dbPO != null) {
newPO.setId(dbPO.getId());
groupMemberDAO.updateById(newPO);
continue;
}
groupMemberDAO.insert(newPO);
} catch (Exception e) {
log.error(
"method=batchReplaceGroupMembers||clusterPhyId={}||groupName={}||topicName={}||errMsg=exception",
clusterPhyId, group.getName(), member.getTopicName(), e
);
}
}
}
}
private void batchReplaceGroups(Long clusterPhyId, List<Group> newGroupList, long updateTime) {
if (ValidateUtils.isEmptyList(newGroupList)) {
return;
}
List<GroupPO> dbGroupList = this.listClusterGroupsPO(clusterPhyId);
Map<String, GroupPO> dbGroupMap = dbGroupList.stream().collect(Collectors.toMap(elem -> elem.getName(), Function.identity()));
for (Group newGroup: newGroupList) {
try {
GroupPO newPO = GroupConverter.convert2GroupPO(newGroup);
newPO.setUpdateTime(new Date(updateTime));
GroupPO dbPO = dbGroupMap.remove(newGroup.getName());
if (dbPO != null) {
newPO.setId(dbPO.getId());
groupDAO.updateById(newPO);
continue;
}
groupDAO.insert(newPO);
} catch (Exception e) {
log.error("method=batchGroupReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, newGroup.getName(), e);
}
}
}
private List<GroupPO> listClusterGroupsPO(Long clusterPhyId) {
LambdaQueryWrapper<GroupPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId);
return groupDAO.selectList(lambdaQueryWrapper);
}
private List<GroupMemberPO> listClusterGroupsMemberPO(Long clusterPhyId) {
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
return groupMemberDAO.selectList(lambdaQueryWrapper);
}
}