mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
修复Group指标防重复不生效问题
This commit is contained in:
@@ -90,23 +90,31 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<List<GroupMetrics>> collectGroupMetricsFromKafka(Long clusterId, String groupName, List<String> metrics) {
|
public Result<List<GroupMetrics>> collectGroupMetricsFromKafka(Long clusterId, String groupName, List<String> metrics) {
|
||||||
List<GroupMetrics> allGroupMetrics = new ArrayList<>();
|
List<GroupMetrics> allGroupMetrics = new ArrayList<>();
|
||||||
Map<String, GroupMetrics> topicPartitionGroupMap = new HashMap<>();
|
Map<String, GroupMetrics> topicPartitionGroupMap = new HashMap<>();
|
||||||
|
|
||||||
GroupMetrics groupMetrics = new GroupMetrics(clusterId, groupName, true);
|
GroupMetrics groupMetrics = new GroupMetrics(clusterId, groupName, true);
|
||||||
for(String metric : metrics){
|
Set<String> existMetricSet = new HashSet<>();
|
||||||
if(null != groupMetrics.getMetrics().get(metric)){continue;}
|
for (String metric : metrics) {
|
||||||
|
if (existMetricSet.contains(metric)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
Result<List<GroupMetrics>> ret = collectGroupMetricsFromKafka(clusterId, groupName, metric);
|
Result<List<GroupMetrics>> ret = collectGroupMetricsFromKafka(clusterId, groupName, metric);
|
||||||
if(null != ret && ret.successful()){
|
if (null != ret && ret.successful()) {
|
||||||
List<GroupMetrics> groupMetricsList = ret.getData();
|
List<GroupMetrics> groupMetricsList = ret.getData();
|
||||||
for(GroupMetrics gm : groupMetricsList){
|
|
||||||
if(gm.isBGroupMetric()){
|
for (GroupMetrics gm : groupMetricsList) {
|
||||||
|
|
||||||
|
//记录已存在的指标
|
||||||
|
existMetricSet.addAll(gm.getMetrics().keySet());
|
||||||
|
|
||||||
|
if (gm.isBGroupMetric()) {
|
||||||
groupMetrics.getMetrics().putAll(gm.getMetrics());
|
groupMetrics.getMetrics().putAll(gm.getMetrics());
|
||||||
}else {
|
} else {
|
||||||
GroupMetrics topicGroupMetric = topicPartitionGroupMap.getOrDefault(
|
GroupMetrics topicGroupMetric = topicPartitionGroupMap.getOrDefault(
|
||||||
gm.getTopic() + gm.getPartitionId(),
|
gm.getTopic() + gm.getPartitionId(),
|
||||||
new GroupMetrics(clusterId, groupName, false));
|
new GroupMetrics(clusterId, gm.getPartitionId(), gm.getTopic(), groupName, false));
|
||||||
|
|
||||||
topicGroupMetric.getMetrics().putAll(gm.getMetrics());
|
topicGroupMetric.getMetrics().putAll(gm.getMetrics());
|
||||||
topicPartitionGroupMap.put(gm.getTopic() + gm.getPartitionId(), topicGroupMetric);
|
topicPartitionGroupMap.put(gm.getTopic() + gm.getPartitionId(), topicGroupMetric);
|
||||||
|
|||||||
Reference in New Issue
Block a user