diff --git a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java index b4fd1986..b2ca9283 100644 --- a/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java +++ b/km-extends/km-monitor/src/main/java/com/xiaojukeji/know/streaming/km/monitor/component/AbstractMonitorSinkService.java @@ -37,29 +37,32 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< @Override public void onApplicationEvent(BaseMetricEvent event) { executor.execute( () -> { - if(event instanceof BrokerMetricEvent){ + if (event instanceof BrokerMetricEvent) { BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event; sinkMetrics(brokerMetric2SinkPoint(brokerMetricEvent.getBrokerMetrics())); - }else if(event instanceof ClusterMetricEvent){ + } else if(event instanceof ClusterMetricEvent) { ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event; sinkMetrics(clusterMetric2SinkPoint(clusterMetricEvent.getClusterMetrics())); - }else if(event instanceof TopicMetricEvent){ + } else if(event instanceof TopicMetricEvent) { TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event; sinkMetrics(topicMetric2SinkPoint(topicMetricEvent.getTopicMetrics())); - }else if(event instanceof PartitionMetricEvent){ + } else if(event instanceof PartitionMetricEvent) { PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event; sinkMetrics(partitionMetric2SinkPoint(partitionMetricEvent.getPartitionMetrics())); - }else if(event instanceof GroupMetricEvent){ + } else if(event instanceof GroupMetricEvent) { GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event; sinkMetrics(groupMetric2SinkPoint(groupMetricEvent.getGroupMetrics())); - }else if(event instanceof ReplicaMetricEvent){ + } else if(event instanceof ReplicaMetricEvent) { ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event; sinkMetrics(replicationMetric2SinkPoint(replicaMetricEvent.getReplicationMetrics())); + } else if(event instanceof ZookeeperMetricEvent) { + ZookeeperMetricEvent zookeeperMetricEvent = (ZookeeperMetricEvent)event; + sinkMetrics(zookeeperMetric2SinkPoint(zookeeperMetricEvent.getZookeeperMetrics())); } } ); } @@ -72,6 +75,7 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< public abstract Boolean sinkMetrics(List pointList); /**************************************************** private method ****************************************************/ + private List brokerMetric2SinkPoint(List brokerMetrics){ List pointList = new ArrayList<>(); @@ -161,8 +165,23 @@ public abstract class AbstractMonitorSinkService implements ApplicationListener< return pointList; } - private List genSinkPoint(String metricPre, Map metrics, - long timeStamp, Map tagsMap){ + private List zookeeperMetric2SinkPoint(List zookeeperMetricsList){ + List pointList = new ArrayList<>(); + + for(ZookeeperMetrics z : zookeeperMetricsList){ + Map tagsMap = new HashMap<>(); + tagsMap.put(CLUSTER_ID.getName(), z.getClusterPhyId()); + + pointList.addAll(genSinkPoint("Zookeeper", z.getMetrics(), z.getTimestamp(), tagsMap)); + } + + return pointList; + } + + private List genSinkPoint(String metricPre, + Map metrics, + long timeStamp, + Map tagsMap) { List pointList = new ArrayList<>(); for(String metricName : metrics.keySet()){