mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Optimize]Group列表的maxLag指标调整为实时获取 (#1074)
1、增加调用的超时时间,在前端需要的超时时间内返回; 2、将Group列表的maxLag指标调整为实时获取;
This commit is contained in:
@@ -42,5 +42,7 @@ public interface GroupManager {
|
||||
|
||||
Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception;
|
||||
|
||||
@Deprecated
|
||||
List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList);
|
||||
List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList, Integer timeoutUnitMs);
|
||||
}
|
||||
|
||||
@@ -45,12 +45,14 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.config.KSConfigUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.OpGroupService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems;
|
||||
import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO;
|
||||
import org.apache.kafka.common.ConsumerGroupState;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
@@ -58,6 +60,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum.CONNECT_CLUSTER_PROTOCOL_TYPE;
|
||||
@@ -87,6 +90,9 @@ public class GroupManagerImpl implements GroupManager {
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
@Autowired
|
||||
private KSConfigUtils ksConfigUtils;
|
||||
|
||||
@Override
|
||||
public PaginationResult<GroupTopicOverviewVO> pagingGroupMembers(Long clusterPhyId,
|
||||
String topicName,
|
||||
@@ -94,19 +100,27 @@ public class GroupManagerImpl implements GroupManager {
|
||||
String searchTopicKeyword,
|
||||
String searchGroupKeyword,
|
||||
PaginationBaseDTO dto) {
|
||||
long startTimeUnitMs = System.currentTimeMillis();
|
||||
|
||||
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, groupName, searchTopicKeyword, searchGroupKeyword, dto);
|
||||
|
||||
if (!paginationResult.hasData()) {
|
||||
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
|
||||
}
|
||||
|
||||
List<GroupTopicOverviewVO> groupTopicVOList = this.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
|
||||
List<GroupTopicOverviewVO> groupTopicVOList = this.getGroupTopicOverviewVOList(
|
||||
clusterPhyId,
|
||||
paginationResult.getData().getBizData(),
|
||||
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间
|
||||
);
|
||||
|
||||
return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto) {
|
||||
long startTimeUnitMs = System.currentTimeMillis();
|
||||
|
||||
Group group = groupService.getGroupFromDB(clusterPhyId, groupName);
|
||||
|
||||
//没有topicMember则直接返回
|
||||
@@ -122,7 +136,14 @@ public class GroupManagerImpl implements GroupManager {
|
||||
|
||||
List<GroupMemberPO> groupMemberPOList = paginationResult.getData().getBizData().stream().map(elem -> new GroupMemberPO(clusterPhyId, elem.getTopicName(), groupName, group.getState().getState(), elem.getMemberCount())).collect(Collectors.toList());
|
||||
|
||||
return PaginationResult.buildSuc(this.getGroupTopicOverviewVOList(clusterPhyId, groupMemberPOList), paginationResult);
|
||||
return PaginationResult.buildSuc(
|
||||
this.getGroupTopicOverviewVOList(
|
||||
clusterPhyId,
|
||||
groupMemberPOList,
|
||||
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间
|
||||
),
|
||||
paginationResult
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -317,6 +338,49 @@ public class GroupManagerImpl implements GroupManager {
|
||||
return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> poList, Integer timeoutUnitMs) {
|
||||
Set<String> requestedGroupSet = new HashSet<>();
|
||||
|
||||
// 获取指标
|
||||
Map<String, Map<String, Float>> groupTopicLagMap = new ConcurrentHashMap<>();
|
||||
poList.forEach(elem -> {
|
||||
if (requestedGroupSet.contains(elem.getGroupName())) {
|
||||
// 该Group已经处理过
|
||||
return;
|
||||
}
|
||||
|
||||
requestedGroupSet.add(elem.getGroupName());
|
||||
ApiCallThreadPoolService.runnableTask(
|
||||
String.format("clusterPhyId=%d||groupName=%s||msg=getGroupTopicLag", clusterPhyId, elem.getGroupName()),
|
||||
timeoutUnitMs,
|
||||
() -> {
|
||||
Result<List<GroupMetrics>> listResult = groupMetricService.collectGroupMetricsFromKafka(clusterPhyId, elem.getGroupName(), GroupMetricVersionItems.GROUP_METRIC_LAG);
|
||||
if (listResult == null || !listResult.hasData()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Float> lagMetricMap = new HashMap<>();
|
||||
listResult.getData().forEach(item -> {
|
||||
Float newLag = item.getMetric(GroupMetricVersionItems.GROUP_METRIC_LAG);
|
||||
if (newLag == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Float oldLag = lagMetricMap.getOrDefault(item.getTopic(), newLag);
|
||||
lagMetricMap.put(item.getTopic(), Math.max(oldLag, newLag));
|
||||
});
|
||||
|
||||
groupTopicLagMap.put(elem.getGroupName(), lagMetricMap);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
ApiCallThreadPoolService.waitResult();
|
||||
|
||||
return this.convert2GroupTopicOverviewVOList(poList, groupTopicLagMap);
|
||||
}
|
||||
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
@@ -370,13 +434,22 @@ public class GroupManagerImpl implements GroupManager {
|
||||
metricsList = new ArrayList<>();
|
||||
}
|
||||
|
||||
// <GroupName, <TopicName, GroupMetrics>>
|
||||
Map<String, Map<String, GroupMetrics>> metricsMap = new HashMap<>();
|
||||
// <GroupName, <TopicName, lag>>
|
||||
Map<String, Map<String, Float>> metricsMap = new HashMap<>();
|
||||
metricsList.stream().forEach(elem -> {
|
||||
Float metricValue = elem.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG);
|
||||
if (metricValue == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
metricsMap.putIfAbsent(elem.getGroup(), new HashMap<>());
|
||||
metricsMap.get(elem.getGroup()).put(elem.getTopic(), elem);
|
||||
metricsMap.get(elem.getGroup()).put(elem.getTopic(), metricValue);
|
||||
});
|
||||
|
||||
return this.convert2GroupTopicOverviewVOList(poList, metricsMap);
|
||||
}
|
||||
|
||||
private List<GroupTopicOverviewVO> convert2GroupTopicOverviewVOList(List<GroupMemberPO> poList, Map<String, Map<String, Float>> metricsMap) {
|
||||
List<GroupTopicOverviewVO> voList = new ArrayList<>();
|
||||
for (GroupMemberPO po: poList) {
|
||||
GroupTopicOverviewVO vo = ConvertUtil.obj2Obj(po, GroupTopicOverviewVO.class);
|
||||
@@ -384,9 +457,9 @@ public class GroupManagerImpl implements GroupManager {
|
||||
continue;
|
||||
}
|
||||
|
||||
GroupMetrics metrics = metricsMap.getOrDefault(po.getGroupName(), new HashMap<>()).get(po.getTopicName());
|
||||
if (metrics != null) {
|
||||
vo.setMaxLag(ConvertUtil.Float2Long(metrics.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG)));
|
||||
Float metricValue = metricsMap.getOrDefault(po.getGroupName(), new HashMap<>()).get(po.getTopicName());
|
||||
if (metricValue != null) {
|
||||
vo.setMaxLag(ConvertUtil.Float2Long(metricValue));
|
||||
}
|
||||
|
||||
voList.add(vo);
|
||||
|
||||
@@ -38,6 +38,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.config.KSConfigUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||
@@ -89,6 +90,9 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
@Autowired
|
||||
private GroupManager groupManager;
|
||||
|
||||
@Autowired
|
||||
private KSConfigUtils ksConfigUtils;
|
||||
|
||||
@Override
|
||||
public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException {
|
||||
Topic topic = topicService.getTopic(clusterPhyId, topicName);
|
||||
@@ -178,9 +182,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
|
||||
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
||||
partitionList.forEach(topicPartition -> {
|
||||
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
|
||||
});
|
||||
partitionList.forEach(topicPartition -> timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()));
|
||||
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
|
||||
}
|
||||
|
||||
@@ -360,13 +362,19 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
|
||||
@Override
|
||||
public PaginationResult<GroupTopicOverviewVO> pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto) {
|
||||
long startTimeUnitMs = System.currentTimeMillis();
|
||||
|
||||
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, "", "", searchGroupName, dto);
|
||||
|
||||
if (!paginationResult.hasData()) {
|
||||
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
|
||||
}
|
||||
|
||||
List<GroupTopicOverviewVO> groupTopicVOList = groupManager.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
|
||||
List<GroupTopicOverviewVO> groupTopicVOList = groupManager.getGroupTopicOverviewVOList(
|
||||
clusterPhyId,
|
||||
paginationResult.getData().getBizData(),
|
||||
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间
|
||||
);
|
||||
|
||||
return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user