diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java index ea324888..427edc2c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java @@ -90,23 +90,31 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe @Override public Result> collectGroupMetricsFromKafka(Long clusterId, String groupName, List metrics) { - List allGroupMetrics = new ArrayList<>(); - Map topicPartitionGroupMap = new HashMap<>(); + List allGroupMetrics = new ArrayList<>(); + Map topicPartitionGroupMap = new HashMap<>(); GroupMetrics groupMetrics = new GroupMetrics(clusterId, groupName, true); - for(String metric : metrics){ - if(null != groupMetrics.getMetrics().get(metric)){continue;} + Set existMetricSet = new HashSet<>(); + for (String metric : metrics) { + if (existMetricSet.contains(metric)) { + continue; + } Result> ret = collectGroupMetricsFromKafka(clusterId, groupName, metric); - if(null != ret && ret.successful()){ + if (null != ret && ret.successful()) { List groupMetricsList = ret.getData(); - for(GroupMetrics gm : groupMetricsList){ - if(gm.isBGroupMetric()){ + + for (GroupMetrics gm : groupMetricsList) { + + //记录已存在的指标 + existMetricSet.addAll(gm.getMetrics().keySet()); + + if (gm.isBGroupMetric()) { groupMetrics.getMetrics().putAll(gm.getMetrics()); - }else { + } else { GroupMetrics topicGroupMetric = topicPartitionGroupMap.getOrDefault( gm.getTopic() + gm.getPartitionId(), - new GroupMetrics(clusterId, groupName, false)); + new GroupMetrics(clusterId, gm.getPartitionId(), gm.getTopic(), groupName, false)); topicGroupMetric.getMetrics().putAll(gm.getMetrics()); topicPartitionGroupMap.put(gm.getTopic() + gm.getPartitionId(), topicGroupMetric);