Merge pull request #621 from didi/dev

指标发送ES类按照指标类别拆分
This commit is contained in:
EricZeng
2022-09-27 18:38:05 +08:00
committed by GitHub
8 changed files with 243 additions and 121 deletions

View File

@@ -1,121 +0,0 @@
package com.xiaojukeji.know.streaming.km.collector.metric;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*;
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.*;
@Component
public class MetricESSender implements ApplicationListener<BaseMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
private static final int THRESHOLD = 100;
private ThreadPoolExecutor esExecutor = new ThreadPoolExecutor(10, 20, 6000, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(1000),
new NamedThreadFactory("KM-Collect-MetricESSender-ES"),
(r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount()));
@PostConstruct
public void init(){
LOGGER.info("class=MetricESSender||method=init||msg=init finished");
}
@Override
public void onApplicationEvent(BaseMetricEvent event) {
if(event instanceof BrokerMetricEvent) {
BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event;
send2es(BROKER_INDEX,
ConvertUtil.list2List(brokerMetricEvent.getBrokerMetrics(), BrokerMetricPO.class)
);
} else if(event instanceof ClusterMetricEvent) {
ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event;
send2es(CLUSTER_INDEX,
ConvertUtil.list2List(clusterMetricEvent.getClusterMetrics(), ClusterMetricPO.class)
);
} else if(event instanceof TopicMetricEvent) {
TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event;
send2es(TOPIC_INDEX,
ConvertUtil.list2List(topicMetricEvent.getTopicMetrics(), TopicMetricPO.class)
);
} else if(event instanceof PartitionMetricEvent) {
PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event;
send2es(PARTITION_INDEX,
ConvertUtil.list2List(partitionMetricEvent.getPartitionMetrics(), PartitionMetricPO.class)
);
} else if(event instanceof GroupMetricEvent) {
GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event;
send2es(GROUP_INDEX,
ConvertUtil.list2List(groupMetricEvent.getGroupMetrics(), GroupMetricPO.class)
);
} else if(event instanceof ReplicaMetricEvent) {
ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event;
send2es(REPLICATION_INDEX,
ConvertUtil.list2List(replicaMetricEvent.getReplicationMetrics(), ReplicationMetricPO.class)
);
}
}
/**
* 根据不同监控维度来发送
*/
private boolean send2es(String index, List<? extends BaseESPO> statsList){
if (CollectionUtils.isEmpty(statsList)) {
return true;
}
if (!EnvUtil.isOnline()) {
LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}",
index, statsList.size());
}
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index);
if (Objects.isNull( baseMetricESDao )) {
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index);
return false;
}
int size = statsList.size();
int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1);
if (size < THRESHOLD) {
esExecutor.execute(
() -> baseMetricESDao.batchInsertStats(statsList)
);
return true;
}
for (int i = 1; i < num + 1; i++) {
int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD);
int start = (i - 1) * THRESHOLD;
esExecutor.execute(
() -> baseMetricESDao.batchInsertStats(statsList.subList(start, end))
);
}
return true;
}
}

View File

@@ -0,0 +1,72 @@
package com.xiaojukeji.know.streaming.km.collector.sink;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
import org.apache.commons.collections.CollectionUtils;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public abstract class AbstractMetricESSender {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
private static final int THRESHOLD = 100;
private static final ThreadPoolExecutor esExecutor = new ThreadPoolExecutor(
10,
20,
6000,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(1000),
new NamedThreadFactory("KM-Collect-MetricESSender-ES"),
(r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount())
);
/**
* 根据不同监控维度来发送
*/
protected boolean send2es(String index, List<? extends BaseESPO> statsList){
if (CollectionUtils.isEmpty(statsList)) {
return true;
}
if (!EnvUtil.isOnline()) {
LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}",
index, statsList.size());
}
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(index);
if (Objects.isNull( baseMetricESDao )) {
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", index);
return false;
}
int size = statsList.size();
int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1);
if (size < THRESHOLD) {
esExecutor.execute(
() -> baseMetricESDao.batchInsertStats(statsList)
);
return true;
}
for (int i = 1; i < num + 1; i++) {
int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD);
int start = (i - 1) * THRESHOLD;
esExecutor.execute(
() -> baseMetricESDao.batchInsertStats(statsList.subList(start, end))
);
}
return true;
}
}

View File

@@ -0,0 +1,28 @@
package com.xiaojukeji.know.streaming.km.collector.sink;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BrokerMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BrokerMetricPO;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.BROKER_INDEX;
@Component
public class BrokerMetricESSender extends AbstractMetricESSender implements ApplicationListener<BrokerMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
@PostConstruct
public void init(){
LOGGER.info("class=BrokerMetricESSender||method=init||msg=init finished");
}
@Override
public void onApplicationEvent(BrokerMetricEvent event) {
send2es(BROKER_INDEX, ConvertUtil.list2List(event.getBrokerMetrics(), BrokerMetricPO.class));
}
}

View File

@@ -0,0 +1,29 @@
package com.xiaojukeji.know.streaming.km.collector.sink;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ClusterMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.CLUSTER_INDEX;
@Component
public class ClusterMetricESSender extends AbstractMetricESSender implements ApplicationListener<ClusterMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
@PostConstruct
public void init(){
LOGGER.info("class=ClusterMetricESSender||method=init||msg=init finished");
}
@Override
public void onApplicationEvent(ClusterMetricEvent event) {
send2es(CLUSTER_INDEX, ConvertUtil.list2List(event.getClusterMetrics(), ClusterMetricPO.class));
}
}

View File

@@ -0,0 +1,29 @@
package com.xiaojukeji.know.streaming.km.collector.sink;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.GroupMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.GroupMetricPO;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.GROUP_INDEX;
@Component
public class GroupMetricESSender extends AbstractMetricESSender implements ApplicationListener<GroupMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
@PostConstruct
public void init(){
LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished");
}
@Override
public void onApplicationEvent(GroupMetricEvent event) {
send2es(GROUP_INDEX, ConvertUtil.list2List(event.getGroupMetrics(), GroupMetricPO.class));
}
}

View File

@@ -0,0 +1,28 @@
package com.xiaojukeji.know.streaming.km.collector.sink;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.PartitionMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.PartitionMetricPO;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.PARTITION_INDEX;
@Component
public class PartitionMetricESSender extends AbstractMetricESSender implements ApplicationListener<PartitionMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
@PostConstruct
public void init(){
LOGGER.info("class=PartitionMetricESSender||method=init||msg=init finished");
}
@Override
public void onApplicationEvent(PartitionMetricEvent event) {
send2es(PARTITION_INDEX, ConvertUtil.list2List(event.getPartitionMetrics(), PartitionMetricPO.class));
}
}

View File

@@ -0,0 +1,28 @@
package com.xiaojukeji.know.streaming.km.collector.sink;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ReplicationMetricPO;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.REPLICATION_INDEX;
@Component
public class ReplicaMetricESSender extends AbstractMetricESSender implements ApplicationListener<ReplicaMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
@PostConstruct
public void init(){
LOGGER.info("class=GroupMetricESSender||method=init||msg=init finished");
}
@Override
public void onApplicationEvent(ReplicaMetricEvent event) {
send2es(REPLICATION_INDEX, ConvertUtil.list2List(event.getReplicationMetrics(), ReplicationMetricPO.class));
}
}

View File

@@ -0,0 +1,29 @@
package com.xiaojukeji.know.streaming.km.collector.sink;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.TOPIC_INDEX;
@Component
public class TopicMetricESSender extends AbstractMetricESSender implements ApplicationListener<TopicMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
@PostConstruct
public void init(){
LOGGER.info("class=TopicMetricESSender||method=init||msg=init finished");
}
@Override
public void onApplicationEvent(TopicMetricEvent event) {
send2es(TOPIC_INDEX, ConvertUtil.list2List(event.getTopicMetrics(), TopicMetricPO.class));
}
}