diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java deleted file mode 100644 index a94a377d..00000000 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/MetricESSender.java +++ /dev/null @@ -1,121 +0,0 @@ -package com.xiaojukeji.know.streaming.km.collector.metric; - -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*; -import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; -import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*; -import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; -import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; -import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory; -import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO; -import org.apache.commons.collections.CollectionUtils; -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*; - -@Component -public class MetricESSender implements ApplicationListener { - protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); - - private static final int THRESHOLD = 100; - - private ThreadPoolExecutor esExecutor = new ThreadPoolExecutor(10, 20, 6000, TimeUnit.MILLISECONDS, - new LinkedBlockingDeque<>(1000), - new NamedThreadFactory("KM-Collect-MetricESSender-ES"), - (r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount())); - - @PostConstruct - public void init(){ - LOGGER.info("class=MetricESSender||method=init||msg=init finished"); - } - - @Override - public void onApplicationEvent(BaseMetricEvent event) { - if(event instanceof BrokerMetricEvent) { - BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event; - send2es(BROKER_INDEX, - ConvertUtil.list2List(brokerMetricEvent.getBrokerMetrics(), BrokerMetricPO.class) - ); - - } else if(event instanceof ClusterMetricEvent) { - ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event; - send2es(CLUSTER_INDEX, - ConvertUtil.list2List(clusterMetricEvent.getClusterMetrics(), ClusterMetricPO.class) - ); - - } else if(event instanceof TopicMetricEvent) { - TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event; - send2es(TOPIC_INDEX, - ConvertUtil.list2List(topicMetricEvent.getTopicMetrics(), TopicMetricPO.class) - ); - - } else if(event instanceof PartitionMetricEvent) { - PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event; - send2es(PARTITION_INDEX, - ConvertUtil.list2List(partitionMetricEvent.getPartitionMetrics(), PartitionMetricPO.class) - ); - - } else if(event instanceof GroupMetricEvent) { - GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event; - send2es(GROUP_INDEX, - ConvertUtil.list2List(groupMetricEvent.getGroupMetrics(), GroupMetricPO.class) - ); - - } else if(event instanceof ReplicaMetricEvent) { - ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event; - send2es(REPLICATION_INDEX, - ConvertUtil.list2List(replicaMetricEvent.getReplicationMetrics(), ReplicationMetricPO.class) - ); - } - } - - /** - * 根据不同监控维度来发送 - */ - private boolean send2es(String index, List statsList){ - if (CollectionUtils.isEmpty(statsList)) { - return true; - } - - if (!EnvUtil.isOnline()) { - LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}", - index, statsList.size()); - } - - BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index); - if (Objects.isNull( baseMetricESDao )) { - LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index); - return false; - } - - int size = statsList.size(); - int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1); - - if (size < THRESHOLD) { - esExecutor.execute( - () -> baseMetricESDao.batchInsertStats(statsList) - ); - return true; - } - - for (int i = 1; i < num + 1; i++) { - int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD); - int start = (i - 1) * THRESHOLD; - - esExecutor.execute( - () -> baseMetricESDao.batchInsertStats(statsList.subList(start, end)) - ); - } - - return true; - } -} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/AbstractMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/AbstractMetricESSender.java new file mode 100644 index 00000000..d3192f1f --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/AbstractMetricESSender.java @@ -0,0 +1,72 @@ +package com.xiaojukeji.know.streaming.km.collector.sink; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; +import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; +import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory; +import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO; +import org.apache.commons.collections.CollectionUtils; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractMetricESSender { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + private static final int THRESHOLD = 100; + + private static final ThreadPoolExecutor esExecutor = new ThreadPoolExecutor( + 10, + 20, + 6000, + TimeUnit.MILLISECONDS, + new LinkedBlockingDeque<>(1000), + new NamedThreadFactory("KM-Collect-MetricESSender-ES"), + (r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount()) + ); + + /** + * 根据不同监控维度来发送 + */ + protected boolean send2es(String index, List statsList){ + if (CollectionUtils.isEmpty(statsList)) { + return true; + } + + if (!EnvUtil.isOnline()) { + LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}", + index, statsList.size()); + } + + BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index); + if (Objects.isNull( baseMetricESDao )) { + LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index); + return false; + } + + int size = statsList.size(); + int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1); + + if (size < THRESHOLD) { + esExecutor.execute( + () -> baseMetricESDao.batchInsertStats(statsList) + ); + return true; + } + + for (int i = 1; i < num + 1; i++) { + int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD); + int start = (i - 1) * THRESHOLD; + + esExecutor.execute( + () -> baseMetricESDao.batchInsertStats(statsList.subList(start, end)) + ); + } + + return true; + } +} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/BrokerMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/BrokerMetricESSender.java new file mode 100644 index 00000000..6708ba38 --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/BrokerMetricESSender.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.collector.sink; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BrokerMetricEvent; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BrokerMetricPO; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.BROKER_INDEX; + +@Component +public class BrokerMetricESSender extends AbstractMetricESSender implements ApplicationListener { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @PostConstruct + public void init(){ + LOGGER.info("class=BrokerMetricESSender||method=init||msg=init finished"); + } + + @Override + public void onApplicationEvent(BrokerMetricEvent event) { + send2es(BROKER_INDEX, ConvertUtil.list2List(event.getBrokerMetrics(), BrokerMetricPO.class)); + } +} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ClusterMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ClusterMetricESSender.java new file mode 100644 index 00000000..94091748 --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ClusterMetricESSender.java @@ -0,0 +1,29 @@ +package com.xiaojukeji.know.streaming.km.collector.sink; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ClusterMetricEvent; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + + +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.CLUSTER_INDEX; + +@Component +public class ClusterMetricESSender extends AbstractMetricESSender implements ApplicationListener { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @PostConstruct + public void init(){ + LOGGER.info("class=ClusterMetricESSender||method=init||msg=init finished"); + } + + @Override + public void onApplicationEvent(ClusterMetricEvent event) { + send2es(CLUSTER_INDEX, ConvertUtil.list2List(event.getClusterMetrics(), ClusterMetricPO.class)); + } +} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/GroupMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/GroupMetricESSender.java new file mode 100644 index 00000000..cd7a2242 --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/GroupMetricESSender.java @@ -0,0 +1,29 @@ +package com.xiaojukeji.know.streaming.km.collector.sink; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.GroupMetricEvent; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.GroupMetricPO; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + + +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.GROUP_INDEX; + +@Component +public class GroupMetricESSender extends AbstractMetricESSender implements ApplicationListener { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @PostConstruct + public void init(){ + LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished"); + } + + @Override + public void onApplicationEvent(GroupMetricEvent event) { + send2es(GROUP_INDEX, ConvertUtil.list2List(event.getGroupMetrics(), GroupMetricPO.class)); + } +} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/PartitionMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/PartitionMetricESSender.java new file mode 100644 index 00000000..ce108835 --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/PartitionMetricESSender.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.collector.sink; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.PartitionMetricEvent; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.PartitionMetricPO; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.PARTITION_INDEX; + +@Component +public class PartitionMetricESSender extends AbstractMetricESSender implements ApplicationListener { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @PostConstruct + public void init(){ + LOGGER.info("class=PartitionMetricESSender||method=init||msg=init finished"); + } + + @Override + public void onApplicationEvent(PartitionMetricEvent event) { + send2es(PARTITION_INDEX, ConvertUtil.list2List(event.getPartitionMetrics(), PartitionMetricPO.class)); + } +} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ReplicaMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ReplicaMetricESSender.java new file mode 100644 index 00000000..76b2aa2a --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ReplicaMetricESSender.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.collector.sink; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.REPLICATION_INDEX; + +@Component +public class ReplicaMetricESSender extends AbstractMetricESSender implements ApplicationListener { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @PostConstruct + public void init(){ + LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished"); + } + + @Override + public void onApplicationEvent(ReplicaMetricEvent event) { + send2es(REPLICATION_INDEX, ConvertUtil.list2List(event.getReplicationMetrics(), ReplicationMetricPO.class)); + } +} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/TopicMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/TopicMetricESSender.java new file mode 100644 index 00000000..eebd82aa --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/TopicMetricESSender.java @@ -0,0 +1,29 @@ +package com.xiaojukeji.know.streaming.km.collector.sink; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + + +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.TOPIC_INDEX; + +@Component +public class TopicMetricESSender extends AbstractMetricESSender implements ApplicationListener { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @PostConstruct + public void init(){ + LOGGER.info("class=TopicMetricESSender||method=init||msg=init finished"); + } + + @Override + public void onApplicationEvent(TopicMetricEvent event) { + send2es(TOPIC_INDEX, ConvertUtil.list2List(event.getTopicMetrics(), TopicMetricPO.class)); + } +}