From 63fbe728c45bf976e11999a56c33f85539107634 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 12 Oct 2022 11:11:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ZK=E6=8C=87=E6=A0=87=E4=B8=8A?= =?UTF-8?q?=E6=8A=A5=E6=99=AE=E7=BD=97=E7=B1=B3=E4=BF=AE=E6=96=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../component/AbstractMonitorSinkService.java | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java index b4fd1986..b2ca9283 100644 --- a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java +++ b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java @@ -37,29 +37,32 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< @Override public void onApplicationEvent(BaseMetricEvent event) { executor.execute( () -> { - if(event instanceof BrokerMetricEvent){ + if (event instanceof BrokerMetricEvent) { BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event; sinkMetrics(brokerMetric2SinkPoint(brokerMetricEvent.getBrokerMetrics())); - }else if(event instanceof ClusterMetricEvent){ + } else if(event instanceof ClusterMetricEvent) { ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event; sinkMetrics(clusterMetric2SinkPoint(clusterMetricEvent.getClusterMetrics())); - }else if(event instanceof TopicMetricEvent){ + } else if(event instanceof TopicMetricEvent) { TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event; sinkMetrics(topicMetric2SinkPoint(topicMetricEvent.getTopicMetrics())); - }else if(event instanceof PartitionMetricEvent){ + } else if(event instanceof PartitionMetricEvent) { PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event; sinkMetrics(partitionMetric2SinkPoint(partitionMetricEvent.getPartitionMetrics())); - }else if(event instanceof GroupMetricEvent){ + } else if(event instanceof GroupMetricEvent) { GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event; sinkMetrics(groupMetric2SinkPoint(groupMetricEvent.getGroupMetrics())); - }else if(event instanceof ReplicaMetricEvent){ + } else if(event instanceof ReplicaMetricEvent) { ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event; sinkMetrics(replicationMetric2SinkPoint(replicaMetricEvent.getReplicationMetrics())); + } else if(event instanceof ZookeeperMetricEvent) { + ZookeeperMetricEvent zookeeperMetricEvent = (ZookeeperMetricEvent)event; + sinkMetrics(zookeeperMetric2SinkPoint(zookeeperMetricEvent.getZookeeperMetrics())); } } ); } @@ -72,6 +75,7 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< public abstract Boolean sinkMetrics(List pointList); /**************************************************** private method ****************************************************/ + private List brokerMetric2SinkPoint(List brokerMetrics){ List pointList = new ArrayList<>(); @@ -161,8 +165,23 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< return pointList; } - private List genSinkPoint(String metricPre, Map metrics, - long timeStamp, Map tagsMap){ + private List zookeeperMetric2SinkPoint(List zookeeperMetricsList){ + List pointList = new ArrayList<>(); + + for(ZookeeperMetrics z : zookeeperMetricsList){ + Map tagsMap = new HashMap<>(); + tagsMap.put(CLUSTER_ID.getName(), z.getClusterPhyId()); + + pointList.addAll(genSinkPoint("Zookeeper", z.getMetrics(), z.getTimestamp(), tagsMap)); + } + + return pointList; + } + + private List genSinkPoint(String metricPre, + Map metrics, + long timeStamp, + Map tagsMap) { List pointList = new ArrayList<>(); for(String metricName : metrics.keySet()){