From d68a19679e19f81135abe17ad209acff6cf95703 Mon Sep 17 00:00:00 2001 From: EricZeng Date: Mon, 3 Jul 2023 14:37:35 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]Group=E5=88=97=E8=A1=A8=E7=9A=84maxLa?= =?UTF-8?q?g=E6=8C=87=E6=A0=87=E8=B0=83=E6=95=B4=E4=B8=BA=E5=AE=9E?= =?UTF-8?q?=E6=97=B6=E8=8E=B7=E5=8F=96=20(#1074)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、增加调用的超时时间,在前端需要的超时时间内返回; 2、将Group列表的maxLag指标调整为实时获取; --- docs/install_guide/版本升级手册.md | 9 ++ .../streaming/km/biz/group/GroupManager.java | 2 + .../km/biz/group/impl/GroupManagerImpl.java | 89 +++++++++++++++++-- .../biz/topic/impl/TopicStateManagerImpl.java | 16 +++- .../km/core/service/config/ConfigUtils.java | 16 ---- .../km/core/service/config/KSConfigUtils.java | 24 +++++ .../group/impl/GroupMetricServiceImpl.java | 30 +++++-- .../core/utils/ApiCallThreadPoolService.java | 9 ++ km-rest/src/main/resources/application.yml | 4 + 9 files changed, 162 insertions(+), 37 deletions(-) delete mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/ConfigUtils.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/KSConfigUtils.java diff --git a/docs/install_guide/版本升级手册.md b/docs/install_guide/版本升级手册.md index 0a192658..ba8fa8d8 100644 --- a/docs/install_guide/版本升级手册.md +++ b/docs/install_guide/版本升级手册.md @@ -6,6 +6,15 @@ ### 升级至 `master` 版本 +**配置变更** + +```yaml +# 新增的配置 +request: # 请求相关的配置 + api-call: # api调用 + timeout-unit-ms: 8000 # 超时时间,默认8000毫秒 +``` + **SQL 变更** ```sql -- 多集群管理权限2023-06-27新增 diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java index a6b41eef..ea6465a3 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java @@ -42,5 +42,7 @@ public interface GroupManager { Result deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception; + @Deprecated List getGroupTopicOverviewVOList(Long clusterPhyId, List groupMemberPOList); + List getGroupTopicOverviewVOList(Long clusterPhyId, List groupMemberPOList, Integer timeoutUnitMs); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index c80c4020..1f37c368 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -45,12 +45,14 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil; import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.config.KSConfigUtils; import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.group.OpGroupService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems; +import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService; import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.TopicPartition; @@ -58,6 +60,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum.CONNECT_CLUSTER_PROTOCOL_TYPE; @@ -87,6 +90,9 @@ public class GroupManagerImpl implements GroupManager { @Autowired private ClusterPhyService clusterPhyService; + @Autowired + private KSConfigUtils ksConfigUtils; + @Override public PaginationResult pagingGroupMembers(Long clusterPhyId, String topicName, @@ -94,19 +100,27 @@ public class GroupManagerImpl implements GroupManager { String searchTopicKeyword, String searchGroupKeyword, PaginationBaseDTO dto) { + long startTimeUnitMs = System.currentTimeMillis(); + PaginationResult paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, groupName, searchTopicKeyword, searchGroupKeyword, dto); if (!paginationResult.hasData()) { return PaginationResult.buildSuc(new ArrayList<>(), paginationResult); } - List groupTopicVOList = this.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData()); + List groupTopicVOList = this.getGroupTopicOverviewVOList( + clusterPhyId, + paginationResult.getData().getBizData(), + ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间 + ); return PaginationResult.buildSuc(groupTopicVOList, paginationResult); } @Override public PaginationResult pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto) { + long startTimeUnitMs = System.currentTimeMillis(); + Group group = groupService.getGroupFromDB(clusterPhyId, groupName); //没有topicMember则直接返回 @@ -122,7 +136,14 @@ public class GroupManagerImpl implements GroupManager { List groupMemberPOList = paginationResult.getData().getBizData().stream().map(elem -> new GroupMemberPO(clusterPhyId, elem.getTopicName(), groupName, group.getState().getState(), elem.getMemberCount())).collect(Collectors.toList()); - return PaginationResult.buildSuc(this.getGroupTopicOverviewVOList(clusterPhyId, groupMemberPOList), paginationResult); + return PaginationResult.buildSuc( + this.getGroupTopicOverviewVOList( + clusterPhyId, + groupMemberPOList, + ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间 + ), + paginationResult + ); } @Override @@ -317,6 +338,49 @@ public class GroupManagerImpl implements GroupManager { return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData()); } + @Override + public List getGroupTopicOverviewVOList(Long clusterPhyId, List poList, Integer timeoutUnitMs) { + Set requestedGroupSet = new HashSet<>(); + + // 获取指标 + Map> groupTopicLagMap = new ConcurrentHashMap<>(); + poList.forEach(elem -> { + if (requestedGroupSet.contains(elem.getGroupName())) { + // 该Group已经处理过 + return; + } + + requestedGroupSet.add(elem.getGroupName()); + ApiCallThreadPoolService.runnableTask( + String.format("clusterPhyId=%d||groupName=%s||msg=getGroupTopicLag", clusterPhyId, elem.getGroupName()), + timeoutUnitMs, + () -> { + Result> listResult = groupMetricService.collectGroupMetricsFromKafka(clusterPhyId, elem.getGroupName(), GroupMetricVersionItems.GROUP_METRIC_LAG); + if (listResult == null || !listResult.hasData()) { + return; + } + + Map lagMetricMap = new HashMap<>(); + listResult.getData().forEach(item -> { + Float newLag = item.getMetric(GroupMetricVersionItems.GROUP_METRIC_LAG); + if (newLag == null) { + return; + } + + Float oldLag = lagMetricMap.getOrDefault(item.getTopic(), newLag); + lagMetricMap.put(item.getTopic(), Math.max(oldLag, newLag)); + }); + + groupTopicLagMap.put(elem.getGroupName(), lagMetricMap); + } + ); + }); + + ApiCallThreadPoolService.waitResult(); + + return this.convert2GroupTopicOverviewVOList(poList, groupTopicLagMap); + } + /**************************************************** private method ****************************************************/ @@ -370,13 +434,22 @@ public class GroupManagerImpl implements GroupManager { metricsList = new ArrayList<>(); } - // > - Map> metricsMap = new HashMap<>(); + // > + Map> metricsMap = new HashMap<>(); metricsList.stream().forEach(elem -> { + Float metricValue = elem.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG); + if (metricValue == null) { + return; + } + metricsMap.putIfAbsent(elem.getGroup(), new HashMap<>()); - metricsMap.get(elem.getGroup()).put(elem.getTopic(), elem); + metricsMap.get(elem.getGroup()).put(elem.getTopic(), metricValue); }); + return this.convert2GroupTopicOverviewVOList(poList, metricsMap); + } + + private List convert2GroupTopicOverviewVOList(List poList, Map> metricsMap) { List voList = new ArrayList<>(); for (GroupMemberPO po: poList) { GroupTopicOverviewVO vo = ConvertUtil.obj2Obj(po, GroupTopicOverviewVO.class); @@ -384,9 +457,9 @@ public class GroupManagerImpl implements GroupManager { continue; } - GroupMetrics metrics = metricsMap.getOrDefault(po.getGroupName(), new HashMap<>()).get(po.getTopicName()); - if (metrics != null) { - vo.setMaxLag(ConvertUtil.Float2Long(metrics.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG))); + Float metricValue = metricsMap.getOrDefault(po.getGroupName(), new HashMap<>()).get(po.getTopicName()); + if (metricValue != null) { + vo.setMaxLag(ConvertUtil.Float2Long(metricValue)); } voList.add(vo); diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index cd970528..3b4b5b5f 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -38,6 +38,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.config.KSConfigUtils; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; @@ -89,6 +90,9 @@ public class TopicStateManagerImpl implements TopicStateManager { @Autowired private GroupManager groupManager; + @Autowired + private KSConfigUtils ksConfigUtils; + @Override public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException { Topic topic = topicService.getTopic(clusterPhyId, topicName); @@ -178,9 +182,7 @@ public class TopicStateManagerImpl implements TopicStateManager { // 获取指定时间每个分区的offset(按指定开始时间查询消息时) if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { Map timestampsToSearch = new HashMap<>(); - partitionList.forEach(topicPartition -> { - timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()); - }); + partitionList.forEach(topicPartition -> timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs())); partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch); } @@ -360,13 +362,19 @@ public class TopicStateManagerImpl implements TopicStateManager { @Override public PaginationResult pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto) { + long startTimeUnitMs = System.currentTimeMillis(); + PaginationResult paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, "", "", searchGroupName, dto); if (!paginationResult.hasData()) { return PaginationResult.buildSuc(new ArrayList<>(), paginationResult); } - List groupTopicVOList = groupManager.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData()); + List groupTopicVOList = groupManager.getGroupTopicOverviewVOList( + clusterPhyId, + paginationResult.getData().getBizData(), + ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间 + ); return PaginationResult.buildSuc(groupTopicVOList, paginationResult); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/ConfigUtils.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/ConfigUtils.java deleted file mode 100644 index 58c26b69..00000000 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/ConfigUtils.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.xiaojukeji.know.streaming.km.core.service.config; - -import lombok.Getter; -import org.springframework.stereotype.Service; - - -/** - * @author zengqiao - * @date 22/6/14 - */ -@Getter -@Service -public class ConfigUtils { - private ConfigUtils() { - } -} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/KSConfigUtils.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/KSConfigUtils.java new file mode 100644 index 00000000..55cdf051 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/KSConfigUtils.java @@ -0,0 +1,24 @@ +package com.xiaojukeji.know.streaming.km.core.service.config; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + + +/** + * @author zengqiao + * @date 22/6/14 + */ +@Getter +@Service +public class KSConfigUtils { + private KSConfigUtils() { + } + + @Value(value = "${request.api-call.timeout-unit-ms:8000}") + private Integer apiCallTimeoutUnitMs; + + public Integer getApiCallLeftTimeUnitMs(Long costedUnitMs) { + return Math.max(1000, (int)(apiCallTimeoutUnitMs - costedUnitMs)); + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java index c9d65468..1303c2ae 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java @@ -39,7 +39,7 @@ import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafk */ @Service("groupMetricService") public class GroupMetricServiceImpl extends BaseMetricService implements GroupMetricService { - private static final ILog LOGGER = LogFactory.getLog( GroupMetricServiceImpl.class); + private static final ILog LOGGER = LogFactory.getLog(GroupMetricServiceImpl.class); public static final String GROUP_METHOD_GET_JUST_FRO_TEST = "getMetricJustForTest"; public static final String GROUP_METHOD_GET_HEALTH_SCORE = "getMetricHealthScore"; @@ -54,7 +54,7 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe @Override protected void initRegisterVCHandler(){ registerVCHandler( GROUP_METHOD_GET_JUST_FRO_TEST, this::getMetricJustForTest); - registerVCHandler( GROUP_METHOD_GET_LAG_RELEVANT_FROM_ADMIN_CLIENT, this::getLagRelevantFromAdminClient ); + registerVCHandler( GROUP_METHOD_GET_LAG_RELEVANT_FROM_ADMIN_CLIENT, this::getLagRelevantFromAdminClient); registerVCHandler( GROUP_METHOD_GET_HEALTH_SCORE, this::getMetricHealthScore); registerVCHandler( GROUP_METHOD_GET_STATE, this::getGroupState); } @@ -129,8 +129,14 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe @Override public Result> listGroupMetricsFromES(Long clusterId, MetricGroupPartitionDTO dto) { Table> retTable = groupMetricESDAO.listGroupMetrics( - clusterId, dto.getGroup(), dto.getGroupTopics(), dto.getMetricsNames(), - dto.getAggType(), dto.getStartTime(), dto.getEndTime()); + clusterId, + dto.getGroup(), + dto.getGroupTopics(), + dto.getMetricsNames(), + dto.getAggType(), + dto.getStartTime(), + dto.getEndTime() + ); List multiLinesVOS = metricMap2VO(clusterId, retTable.rowMap()); return Result.buildSuc(multiLinesVOS); @@ -140,7 +146,11 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe public Result> listLatestMetricsAggByGroupTopicFromES(Long clusterPhyId, List groupTopicList, List metricNames, AggTypeEnum aggType) { List groupMetricPOS = groupMetricESDAO.listLatestMetricsAggByGroupTopic( - clusterPhyId, groupTopicList, metricNames, aggType); + clusterPhyId, + groupTopicList, + metricNames, + aggType + ); return Result.buildSuc( ConvertUtil.list2List(groupMetricPOS, GroupMetrics.class)); } @@ -149,7 +159,11 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe public Result> listPartitionLatestMetricsFromES(Long clusterPhyId, String groupName, String topicName, List metricNames) { List groupMetricPOS = groupMetricESDAO.listPartitionLatestMetrics( - clusterPhyId, groupName, topicName, metricNames); + clusterPhyId, + groupName, + topicName, + metricNames + ); return Result.buildSuc( ConvertUtil.list2List(groupMetricPOS, GroupMetrics.class)); } @@ -158,9 +172,7 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe public Result countMetricValueOccurrencesFromES(Long clusterPhyId, String groupName, SearchTerm term, Long startTime, Long endTime) { setQueryMetricFlag(term); - int count = groupMetricESDAO.countMetricValue(clusterPhyId, groupName, - term, startTime, endTime); - + int count = groupMetricESDAO.countMetricValue(clusterPhyId, groupName, term, startTime, endTime); if(count < 0){ return Result.buildFail(); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java index e66b4aa5..cce34c6f 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java @@ -37,7 +37,16 @@ public class ApiCallThreadPoolService { apiFutureUtil.runnableTask(taskName, timeoutUnisMs, callable); } + public static void runnableTask(String taskName, Integer timeoutUnisMs, Runnable runnable) { + apiFutureUtil.runnableTask(taskName, timeoutUnisMs, runnable); + } + + @Deprecated public static void waitResult(Integer stepWaitTimeUnitMs) { apiFutureUtil.waitResult(stepWaitTimeUnitMs); } + + public static void waitResult() { + apiFutureUtil.waitResult(0); + } } \ No newline at end of file diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index b8849dbf..40152cdd 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -95,6 +95,10 @@ es: index: expire: 15 # 索引过期天数,15表示超过15天的索引会被KS过期删除 +request: # 请求相关的配置 + api-call: # api调用 + timeout-unit-ms: 8000 # 超时时间,默认8000毫秒 + # 普罗米修斯指标导出相关配置 management: endpoints: