mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
指标发送ES类按照指标类别拆分
This commit is contained in:
@@ -1,121 +0,0 @@
|
|||||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
|
||||||
|
|
||||||
import com.didiglobal.logi.log.ILog;
|
|
||||||
import com.didiglobal.logi.log.LogFactory;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory;
|
|
||||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
|
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
|
||||||
import org.springframework.context.ApplicationListener;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
public class MetricESSender implements ApplicationListener<BaseMetricEvent> {
|
|
||||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
|
||||||
|
|
||||||
private static final int THRESHOLD = 100;
|
|
||||||
|
|
||||||
private ThreadPoolExecutor esExecutor = new ThreadPoolExecutor(10, 20, 6000, TimeUnit.MILLISECONDS,
|
|
||||||
new LinkedBlockingDeque<>(1000),
|
|
||||||
new NamedThreadFactory("KM-Collect-MetricESSender-ES"),
|
|
||||||
(r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount()));
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init(){
|
|
||||||
LOGGER.info("class=MetricESSender||method=init||msg=init finished");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onApplicationEvent(BaseMetricEvent event) {
|
|
||||||
if(event instanceof BrokerMetricEvent) {
|
|
||||||
BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event;
|
|
||||||
send2es(BROKER_INDEX,
|
|
||||||
ConvertUtil.list2List(brokerMetricEvent.getBrokerMetrics(), BrokerMetricPO.class)
|
|
||||||
);
|
|
||||||
|
|
||||||
} else if(event instanceof ClusterMetricEvent) {
|
|
||||||
ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event;
|
|
||||||
send2es(CLUSTER_INDEX,
|
|
||||||
ConvertUtil.list2List(clusterMetricEvent.getClusterMetrics(), ClusterMetricPO.class)
|
|
||||||
);
|
|
||||||
|
|
||||||
} else if(event instanceof TopicMetricEvent) {
|
|
||||||
TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event;
|
|
||||||
send2es(TOPIC_INDEX,
|
|
||||||
ConvertUtil.list2List(topicMetricEvent.getTopicMetrics(), TopicMetricPO.class)
|
|
||||||
);
|
|
||||||
|
|
||||||
} else if(event instanceof PartitionMetricEvent) {
|
|
||||||
PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event;
|
|
||||||
send2es(PARTITION_INDEX,
|
|
||||||
ConvertUtil.list2List(partitionMetricEvent.getPartitionMetrics(), PartitionMetricPO.class)
|
|
||||||
);
|
|
||||||
|
|
||||||
} else if(event instanceof GroupMetricEvent) {
|
|
||||||
GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event;
|
|
||||||
send2es(GROUP_INDEX,
|
|
||||||
ConvertUtil.list2List(groupMetricEvent.getGroupMetrics(), GroupMetricPO.class)
|
|
||||||
);
|
|
||||||
|
|
||||||
} else if(event instanceof ReplicaMetricEvent) {
|
|
||||||
ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event;
|
|
||||||
send2es(REPLICATION_INDEX,
|
|
||||||
ConvertUtil.list2List(replicaMetricEvent.getReplicationMetrics(), ReplicationMetricPO.class)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据不同监控维度来发送
|
|
||||||
*/
|
|
||||||
private boolean send2es(String index, List<? extends BaseESPO> statsList){
|
|
||||||
if (CollectionUtils.isEmpty(statsList)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!EnvUtil.isOnline()) {
|
|
||||||
LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}",
|
|
||||||
index, statsList.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index);
|
|
||||||
if (Objects.isNull( baseMetricESDao )) {
|
|
||||||
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
int size = statsList.size();
|
|
||||||
int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1);
|
|
||||||
|
|
||||||
if (size < THRESHOLD) {
|
|
||||||
esExecutor.execute(
|
|
||||||
() -> baseMetricESDao.batchInsertStats(statsList)
|
|
||||||
);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 1; i < num + 1; i++) {
|
|
||||||
int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD);
|
|
||||||
int start = (i - 1) * THRESHOLD;
|
|
||||||
|
|
||||||
esExecutor.execute(
|
|
||||||
() -> baseMetricESDao.batchInsertStats(statsList.subList(start, end))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,72 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.collector.sink;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
|
||||||
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public abstract class AbstractMetricESSender {
|
||||||
|
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||||
|
|
||||||
|
private static final int THRESHOLD = 100;
|
||||||
|
|
||||||
|
private static final ThreadPoolExecutor esExecutor = new ThreadPoolExecutor(
|
||||||
|
10,
|
||||||
|
20,
|
||||||
|
6000,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
new LinkedBlockingDeque<>(1000),
|
||||||
|
new NamedThreadFactory("KM-Collect-MetricESSender-ES"),
|
||||||
|
(r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount())
|
||||||
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据不同监控维度来发送
|
||||||
|
*/
|
||||||
|
protected boolean send2es(String index, List<? extends BaseESPO> statsList){
|
||||||
|
if (CollectionUtils.isEmpty(statsList)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!EnvUtil.isOnline()) {
|
||||||
|
LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}",
|
||||||
|
index, statsList.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index);
|
||||||
|
if (Objects.isNull( baseMetricESDao )) {
|
||||||
|
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int size = statsList.size();
|
||||||
|
int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1);
|
||||||
|
|
||||||
|
if (size < THRESHOLD) {
|
||||||
|
esExecutor.execute(
|
||||||
|
() -> baseMetricESDao.batchInsertStats(statsList)
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 1; i < num + 1; i++) {
|
||||||
|
int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD);
|
||||||
|
int start = (i - 1) * THRESHOLD;
|
||||||
|
|
||||||
|
esExecutor.execute(
|
||||||
|
() -> baseMetricESDao.batchInsertStats(statsList.subList(start, end))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.collector.sink;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BrokerMetricEvent;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BrokerMetricPO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
|
||||||
|
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.BROKER_INDEX;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class BrokerMetricESSender extends AbstractMetricESSender implements ApplicationListener<BrokerMetricEvent> {
|
||||||
|
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init(){
|
||||||
|
LOGGER.info("class=BrokerMetricESSender||method=init||msg=init finished");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(BrokerMetricEvent event) {
|
||||||
|
send2es(BROKER_INDEX, ConvertUtil.list2List(event.getBrokerMetrics(), BrokerMetricPO.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.collector.sink;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ClusterMetricEvent;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
|
||||||
|
|
||||||
|
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.CLUSTER_INDEX;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class ClusterMetricESSender extends AbstractMetricESSender implements ApplicationListener<ClusterMetricEvent> {
|
||||||
|
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init(){
|
||||||
|
LOGGER.info("class=ClusterMetricESSender||method=init||msg=init finished");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(ClusterMetricEvent event) {
|
||||||
|
send2es(CLUSTER_INDEX, ConvertUtil.list2List(event.getClusterMetrics(), ClusterMetricPO.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.collector.sink;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.GroupMetricEvent;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.GroupMetricPO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
|
||||||
|
|
||||||
|
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.GROUP_INDEX;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class GroupMetricESSender extends AbstractMetricESSender implements ApplicationListener<GroupMetricEvent> {
|
||||||
|
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init(){
|
||||||
|
LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(GroupMetricEvent event) {
|
||||||
|
send2es(GROUP_INDEX, ConvertUtil.list2List(event.getGroupMetrics(), GroupMetricPO.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.collector.sink;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.PartitionMetricEvent;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.PartitionMetricPO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
|
||||||
|
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.PARTITION_INDEX;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class PartitionMetricESSender extends AbstractMetricESSender implements ApplicationListener<PartitionMetricEvent> {
|
||||||
|
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init(){
|
||||||
|
LOGGER.info("class=PartitionMetricESSender||method=init||msg=init finished");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(PartitionMetricEvent event) {
|
||||||
|
send2es(PARTITION_INDEX, ConvertUtil.list2List(event.getPartitionMetrics(), PartitionMetricPO.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.collector.sink;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
|
||||||
|
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.REPLICATION_INDEX;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class ReplicaMetricESSender extends AbstractMetricESSender implements ApplicationListener<ReplicaMetricEvent> {
|
||||||
|
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init(){
|
||||||
|
LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(ReplicaMetricEvent event) {
|
||||||
|
send2es(REPLICATION_INDEX, ConvertUtil.list2List(event.getReplicationMetrics(), ReplicationMetricPO.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.collector.sink;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
|
||||||
|
|
||||||
|
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.TOPIC_INDEX;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class TopicMetricESSender extends AbstractMetricESSender implements ApplicationListener<TopicMetricEvent> {
|
||||||
|
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init(){
|
||||||
|
LOGGER.info("class=TopicMetricESSender||method=init||msg=init finished");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(TopicMetricEvent event) {
|
||||||
|
send2es(TOPIC_INDEX, ConvertUtil.list2List(event.getTopicMetrics(), TopicMetricPO.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user