mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
初始化3.0.0版本
This commit is contained in:
33
km-collector/pom.xml
Normal file
33
km-collector/pom.xml
Normal file
@@ -0,0 +1,33 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<artifactId>km-collector</artifactId>
|
||||
<version>${km.revision}</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<artifactId>km</artifactId>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<version>${km.revision}</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<artifactId>km-common</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<artifactId>km-core</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-webmvc</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,58 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.xiaojukeji.know.streaming.km.collector.service.CollectThreadPoolService;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BaseMetricEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
public abstract class AbstractMetricCollector<T> {
|
||||
private static final double SIZE_THRESHOLD = 0.8;
|
||||
|
||||
private final Cache<String, BaseMetrics> latestMetricsMap = Caffeine.newBuilder()
|
||||
.expireAfterWrite(3, TimeUnit.MINUTES)
|
||||
.build();
|
||||
|
||||
public abstract void collectMetrics(ClusterPhy clusterPhy);
|
||||
|
||||
public abstract VersionItemTypeEnum collectorType();
|
||||
|
||||
@Autowired
|
||||
private CollectThreadPoolService collectThreadPoolService;
|
||||
|
||||
/**
|
||||
* 如果最近3分钟内的指标有异常,则采用之前的值
|
||||
*/
|
||||
protected void doOptimizeMetric(BaseMetrics metricPO){
|
||||
BaseMetrics latestMetrics = latestMetricsMap.getIfPresent(metricPO.unique());
|
||||
if (latestMetrics == null) {
|
||||
latestMetrics = metricPO;
|
||||
}
|
||||
|
||||
if(metricPO.getMetrics().size() < latestMetrics.getMetrics().size() * SIZE_THRESHOLD) {
|
||||
// 异常采集时,则替换metrics
|
||||
metricPO.putMetric(latestMetrics.getMetrics());
|
||||
} else {
|
||||
// 正常采集时,则替换cache
|
||||
latestMetricsMap.put(metricPO.unique(), metricPO);
|
||||
}
|
||||
}
|
||||
|
||||
protected FutureWaitUtil<Void> getFutureUtilByClusterPhyId(Long clusterPhyId) {
|
||||
return collectThreadPoolService.selectSuitableFutureUtil(clusterPhyId * 1000L + this.collectorType().getCode());
|
||||
}
|
||||
|
||||
protected <T extends BaseMetricEvent> void publishMetric(T event){
|
||||
SpringTool.publish(event);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.BrokerMetricEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_BROKER;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Component
|
||||
public class BrokerMetricCollector extends AbstractMetricCollector<BrokerMetrics> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
|
||||
@Autowired
|
||||
private VersionControlService versionControlService;
|
||||
|
||||
@Autowired
|
||||
private BrokerMetricService brokerMetricService;
|
||||
|
||||
@Autowired
|
||||
private BrokerService brokerService;
|
||||
|
||||
@Override
|
||||
public void collectMetrics(ClusterPhy clusterPhy) {
|
||||
Long startTime = System.currentTimeMillis();
|
||||
Long clusterPhyId = clusterPhy.getId();
|
||||
|
||||
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterPhy.getId());
|
||||
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
|
||||
|
||||
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
|
||||
|
||||
List<BrokerMetrics> brokerMetrics = new ArrayList<>();
|
||||
for(Broker broker : brokers) {
|
||||
BrokerMetrics metrics = new BrokerMetrics(clusterPhyId, broker.getBrokerId(), broker.getHost(), broker.getPort());
|
||||
brokerMetrics.add(metrics);
|
||||
|
||||
future.runnableTask(
|
||||
String.format("method=BrokerMetricCollector||clusterPhyId=%d||brokerId=%d", clusterPhyId, broker.getBrokerId()),
|
||||
30000,
|
||||
() -> collectMetrics(clusterPhyId, metrics, items)
|
||||
);
|
||||
}
|
||||
|
||||
future.waitExecute(30000);
|
||||
this.publishMetric(new BrokerMetricEvent(this, brokerMetrics));
|
||||
|
||||
LOGGER.info("method=BrokerMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=collect finished.",
|
||||
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionItemTypeEnum collectorType() {
|
||||
return METRIC_BROKER;
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private void collectMetrics(Long clusterPhyId, BrokerMetrics metrics, List<VersionControlItem> items) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
|
||||
|
||||
for(VersionControlItem v : items) {
|
||||
try {
|
||||
if(metrics.getMetrics().containsKey(v.getName())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Result<BrokerMetrics> ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterPhyId, metrics.getBrokerId(), v.getName());
|
||||
if(null == ret || ret.failed() || null == ret.getData()){
|
||||
continue;
|
||||
}
|
||||
|
||||
metrics.putMetric(ret.getData().getMetrics());
|
||||
|
||||
if(!EnvUtil.isOnline()){
|
||||
LOGGER.info("method=BrokerMetricCollector||clusterId={}||brokerId={}||metric={}||metric={}!",
|
||||
clusterPhyId, metrics.getBrokerId(), v.getName(), JSON.toJSONString(ret.getData().getMetrics()));
|
||||
}
|
||||
} catch (Exception e){
|
||||
LOGGER.error("method=BrokerMetricCollector||clusterId={}||brokerId={}||metric={}||errMsg=exception!",
|
||||
clusterPhyId, metrics.getBrokerId(), v.getName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
doOptimizeMetric(metrics);
|
||||
|
||||
// 记录采集性能
|
||||
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,90 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ClusterMetricEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_CLUSTER;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Component
|
||||
public class ClusterMetricCollector extends AbstractMetricCollector<ClusterMetricPO> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
|
||||
@Autowired
|
||||
private VersionControlService versionControlService;
|
||||
|
||||
@Autowired
|
||||
private ClusterMetricService clusterMetricService;
|
||||
|
||||
@Override
|
||||
public void collectMetrics(ClusterPhy clusterPhy) {
|
||||
Long startTime = System.currentTimeMillis();
|
||||
Long clusterPhyId = clusterPhy.getId();
|
||||
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
|
||||
|
||||
ClusterMetrics metrics = new ClusterMetrics(clusterPhyId, clusterPhy.getKafkaVersion());
|
||||
|
||||
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
|
||||
|
||||
for(VersionControlItem v : items) {
|
||||
future.runnableTask(
|
||||
String.format("method=ClusterMetricCollector||clusterPhyId=%d||metricName=%s", clusterPhyId, v.getName()),
|
||||
30000,
|
||||
() -> {
|
||||
try {
|
||||
if(null != metrics.getMetrics().get(v.getName())){return null;}
|
||||
|
||||
Result<ClusterMetrics> ret = clusterMetricService.collectClusterMetricsFromKafka(clusterPhyId, v.getName());
|
||||
if(null == ret || ret.failed() || null == ret.getData()){return null;}
|
||||
|
||||
metrics.putMetric(ret.getData().getMetrics());
|
||||
|
||||
if(!EnvUtil.isOnline()){
|
||||
LOGGER.info("method=ClusterMetricCollector||clusterPhyId={}||metricName={}||metricValue={}",
|
||||
clusterPhyId, v.getName(), ConvertUtil.obj2Json(ret.getData().getMetrics()));
|
||||
}
|
||||
} catch (Exception e){
|
||||
LOGGER.error("method=ClusterMetricCollector||clusterPhyId={}||metricName={}||errMsg=exception!",
|
||||
clusterPhyId, v.getName(), e);
|
||||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
future.waitExecute(30000);
|
||||
doOptimizeMetric(metrics);
|
||||
|
||||
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
|
||||
|
||||
publishMetric(new ClusterMetricEvent(this, Arrays.asList(metrics)));
|
||||
|
||||
LOGGER.info("method=ClusterMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=msg=collect finished.",
|
||||
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionItemTypeEnum collectorType() {
|
||||
return METRIC_CLUSTER;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,145 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.GroupMetricEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_GROUP;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Component
|
||||
public class GroupMetricCollector extends AbstractMetricCollector<List<GroupMetrics>> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
|
||||
@Autowired
|
||||
private VersionControlService versionControlService;
|
||||
|
||||
@Autowired
|
||||
private GroupMetricService groupMetricService;
|
||||
|
||||
@Autowired
|
||||
private GroupService groupService;
|
||||
|
||||
@Override
|
||||
public void collectMetrics(ClusterPhy clusterPhy) {
|
||||
Long startTime = System.currentTimeMillis();
|
||||
Long clusterPhyId = clusterPhy.getId();
|
||||
|
||||
List<String> groups = new ArrayList<>();
|
||||
try {
|
||||
groups = groupService.listGroupsFromKafka(clusterPhyId);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("method=GroupMetricCollector||clusterPhyId={}||msg=exception!", clusterPhyId, e);
|
||||
}
|
||||
|
||||
if(CollectionUtils.isEmpty(groups)){return;}
|
||||
|
||||
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
|
||||
|
||||
FutureWaitUtil<Void> future = getFutureUtilByClusterPhyId(clusterPhyId);
|
||||
|
||||
Map<String, List<GroupMetrics>> metricsMap = new ConcurrentHashMap<>();
|
||||
for(String groupName : groups) {
|
||||
future.runnableTask(
|
||||
String.format("method=GroupMetricCollector||clusterPhyId=%d||groupName=%s", clusterPhyId, groupName),
|
||||
30000,
|
||||
() -> collectMetrics(clusterPhyId, groupName, metricsMap, items));
|
||||
}
|
||||
|
||||
future.waitResult(30000);
|
||||
|
||||
List<GroupMetrics> metricsList = new ArrayList<>();
|
||||
metricsMap.values().forEach(elem -> metricsList.addAll(elem));
|
||||
|
||||
publishMetric(new GroupMetricEvent(this, metricsList));
|
||||
|
||||
LOGGER.info("method=GroupMetricCollector||clusterPhyId={}||startTime={}||cost={}||msg=collect finished.",
|
||||
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionItemTypeEnum collectorType() {
|
||||
return METRIC_GROUP;
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private void collectMetrics(Long clusterPhyId, String groupName, Map<String, List<GroupMetrics>> metricsMap, List<VersionControlItem> items) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
List<GroupMetrics> groupMetricsList = new ArrayList<>();
|
||||
|
||||
Map<String, GroupMetrics> tpGroupPOMap = new HashMap<>();
|
||||
|
||||
GroupMetrics groupMetrics = new GroupMetrics(clusterPhyId, groupName, true);
|
||||
groupMetrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
|
||||
|
||||
for(VersionControlItem v : items) {
|
||||
try {
|
||||
String metricName = v.getName();
|
||||
|
||||
Result<List<GroupMetrics>> ret = groupMetricService.collectGroupMetricsFromKafka(clusterPhyId, groupName, metricName);
|
||||
if(null == ret || ret.failed() || ValidateUtils.isEmptyList(ret.getData())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ret.getData().stream().forEach(metrics -> {
|
||||
if (metrics.isBGroupMetric()) {
|
||||
groupMetrics.putMetric(metrics.getMetrics());
|
||||
} else {
|
||||
String topicName = metrics.getTopic();
|
||||
Integer partitionId = metrics.getPartitionId();
|
||||
String tpGroupKey = genTopicPartitionGroupKey(topicName, partitionId);
|
||||
|
||||
tpGroupPOMap.putIfAbsent(tpGroupKey, new GroupMetrics(clusterPhyId, partitionId, topicName, groupName, false));
|
||||
tpGroupPOMap.get(tpGroupKey).putMetric(metrics.getMetrics());
|
||||
}
|
||||
});
|
||||
|
||||
if(!EnvUtil.isOnline()){
|
||||
LOGGER.info("method=GroupMetricCollector||clusterPhyId={}||groupName={}||metricName={}||metricValue={}",
|
||||
clusterPhyId, groupName, metricName, JSON.toJSONString(ret.getData()));
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOGGER.error("method=GroupMetricCollector||clusterPhyId={}||groupName={}||errMsg=exception!", clusterPhyId, groupName, e);
|
||||
}
|
||||
}
|
||||
|
||||
doOptimizeMetric(groupMetrics);
|
||||
groupMetricsList.add(groupMetrics);
|
||||
groupMetricsList.addAll(tpGroupPOMap.values());
|
||||
|
||||
// 记录采集性能
|
||||
groupMetrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
|
||||
|
||||
metricsMap.put(groupName, groupMetricsList);
|
||||
}
|
||||
|
||||
private String genTopicPartitionGroupKey(String topic, Integer partitionId){
|
||||
return topic + "@" + partitionId;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,116 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.metric.KafkaMetricIndexEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.NamedThreadFactory;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BaseMetricESDAO;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
public class MetricESSender implements ApplicationListener<BaseMetricEvent> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
|
||||
private static final int THRESHOLD = 100;
|
||||
|
||||
private ThreadPoolExecutor esExecutor = new ThreadPoolExecutor(10, 20, 6000, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingDeque<>(1000),
|
||||
new NamedThreadFactory("KM-Collect-MetricESSender-ES"),
|
||||
(r, e) -> LOGGER.warn("class=MetricESSender||msg=KM-Collect-MetricESSender-ES Deque is blocked, taskCount:{}" + e.getTaskCount()));
|
||||
|
||||
@PostConstruct
|
||||
public void init(){
|
||||
LOGGER.info("class=MetricESSender||method=init||msg=init finished");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(BaseMetricEvent event) {
|
||||
if(event instanceof BrokerMetricEvent){
|
||||
BrokerMetricEvent brokerMetricEvent = (BrokerMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.BROKER_INFO,
|
||||
ConvertUtil.list2List(brokerMetricEvent.getBrokerMetrics(), BrokerMetricPO.class));
|
||||
|
||||
}else if(event instanceof ClusterMetricEvent){
|
||||
ClusterMetricEvent clusterMetricEvent = (ClusterMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.CLUSTER_INFO,
|
||||
ConvertUtil.list2List(clusterMetricEvent.getClusterMetrics(), ClusterMetricPO.class));
|
||||
|
||||
}else if(event instanceof TopicMetricEvent){
|
||||
TopicMetricEvent topicMetricEvent = (TopicMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.TOPIC_INFO,
|
||||
ConvertUtil.list2List(topicMetricEvent.getTopicMetrics(), TopicMetricPO.class));
|
||||
|
||||
}else if(event instanceof PartitionMetricEvent){
|
||||
PartitionMetricEvent partitionMetricEvent = (PartitionMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.PARTITION_INFO,
|
||||
ConvertUtil.list2List(partitionMetricEvent.getPartitionMetrics(), PartitionMetricPO.class));
|
||||
|
||||
}else if(event instanceof GroupMetricEvent){
|
||||
GroupMetricEvent groupMetricEvent = (GroupMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.GROUP_INFO,
|
||||
ConvertUtil.list2List(groupMetricEvent.getGroupMetrics(), GroupMetricPO.class));
|
||||
|
||||
}else if(event instanceof ReplicaMetricEvent){
|
||||
ReplicaMetricEvent replicaMetricEvent = (ReplicaMetricEvent)event;
|
||||
send2es(KafkaMetricIndexEnum.REPLICATION_INFO,
|
||||
ConvertUtil.list2List(replicaMetricEvent.getReplicationMetrics(), ReplicationMetricPO.class));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据不同监控维度来发送
|
||||
*
|
||||
* @param stats
|
||||
* @param statsList
|
||||
* @return
|
||||
*/
|
||||
private boolean send2es(KafkaMetricIndexEnum stats, List<? extends BaseESPO> statsList){
|
||||
if (CollectionUtils.isEmpty(statsList)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!EnvUtil.isOnline()) {
|
||||
LOGGER.info("class=MetricESSender||method=send2es||ariusStats={}||size={}",
|
||||
stats.getIndex(), statsList.size());
|
||||
}
|
||||
|
||||
BaseMetricESDAO baseMetricESDao = BaseMetricESDAO.getByStatsType(stats);
|
||||
if (Objects.isNull( baseMetricESDao )) {
|
||||
LOGGER.error("class=MetricESSender||method=send2es||errMsg=fail to find {}", stats.getIndex());
|
||||
return false;
|
||||
}
|
||||
|
||||
int size = statsList.size();
|
||||
int num = (size) % THRESHOLD == 0 ? (size / THRESHOLD) : (size / THRESHOLD + 1);
|
||||
|
||||
if (size < THRESHOLD) {
|
||||
esExecutor.execute(() ->
|
||||
baseMetricESDao.batchInsertStats(statsList));
|
||||
return true;
|
||||
}
|
||||
|
||||
for (int i = 1; i < num + 1; i++) {
|
||||
int end = (i * THRESHOLD) > size ? size : (i * THRESHOLD);
|
||||
int start = (i - 1) * THRESHOLD;
|
||||
|
||||
esExecutor.execute(() ->
|
||||
baseMetricESDao.batchInsertStats(statsList.subList(start, end)));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,130 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.PartitionMetricEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_PARTITION;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Component
|
||||
public class PartitionMetricCollector extends AbstractMetricCollector<PartitionMetrics> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
|
||||
@Autowired
|
||||
private VersionControlService versionControlService;
|
||||
|
||||
@Autowired
|
||||
private PartitionMetricService partitionMetricService;
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
|
||||
@Override
|
||||
public void collectMetrics(ClusterPhy clusterPhy) {
|
||||
Long startTime = System.currentTimeMillis();
|
||||
Long clusterPhyId = clusterPhy.getId();
|
||||
List<Topic> topicList = topicService.listTopicsFromCacheFirst(clusterPhyId);
|
||||
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
|
||||
|
||||
// 获取集群所有分区
|
||||
|
||||
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
|
||||
|
||||
Map<String, Map<Integer, PartitionMetrics>> metricsMap = new ConcurrentHashMap<>();
|
||||
for (Topic topic : topicList) {
|
||||
metricsMap.put(topic.getTopicName(), new ConcurrentHashMap<>());
|
||||
|
||||
future.runnableTask(
|
||||
String.format("method=PartitionMetricCollector||clusterPhyId=%d||topicName=%s", clusterPhyId, topic.getTopicName()),
|
||||
30000,
|
||||
() -> collectMetrics(clusterPhyId, topic.getTopicName(), metricsMap.get(topic.getTopicName()), items)
|
||||
);
|
||||
}
|
||||
|
||||
future.waitExecute(30000);
|
||||
|
||||
List<PartitionMetrics> metricsList = new ArrayList<>();
|
||||
metricsMap.values().forEach(elem -> metricsList.addAll(elem.values()));
|
||||
|
||||
this.publishMetric(new PartitionMetricEvent(this, metricsList));
|
||||
|
||||
LOGGER.info(
|
||||
"method=PartitionMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=collect finished.",
|
||||
clusterPhyId, startTime, System.currentTimeMillis() - startTime
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionItemTypeEnum collectorType() {
|
||||
return METRIC_PARTITION;
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private void collectMetrics(Long clusterPhyId, String topicName, Map<Integer, PartitionMetrics> metricsMap, List<VersionControlItem> items) {
|
||||
Set<String> collectedMetricsNameSet = new HashSet<>();
|
||||
for (VersionControlItem v : items) {
|
||||
try {
|
||||
if (collectedMetricsNameSet.contains(v.getName())) {
|
||||
// 指标已存在
|
||||
continue;
|
||||
}
|
||||
collectedMetricsNameSet.add(v.getName());
|
||||
|
||||
Result<List<PartitionMetrics>> ret = partitionMetricService.collectPartitionsMetricsFromKafkaWithCache(
|
||||
clusterPhyId,
|
||||
topicName,
|
||||
v.getName()
|
||||
);
|
||||
if (null == ret || ret.failed() || null == ret.getData() || ret.getData().isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 记录已经采集的指标
|
||||
collectedMetricsNameSet.addAll(ret.getData().get(0).getMetrics().keySet());
|
||||
|
||||
// 放到map中
|
||||
for (PartitionMetrics subMetrics: ret.getData()) {
|
||||
metricsMap.putIfAbsent(subMetrics.getPartitionId(), subMetrics);
|
||||
PartitionMetrics allMetrics = metricsMap.get(subMetrics.getPartitionId());
|
||||
allMetrics.putMetric(subMetrics.getMetrics());
|
||||
}
|
||||
|
||||
if (!EnvUtil.isOnline()) {
|
||||
LOGGER.info(
|
||||
"class=PartitionMetricCollector||method=collectMetrics||clusterPhyId={}||topicName={}||metricName={}||metricValue={}!",
|
||||
clusterPhyId, topicName, v.getName(), ConvertUtil.obj2Json(ret.getData())
|
||||
);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.info(
|
||||
"class=PartitionMetricCollector||method=collectMetrics||clusterPhyId={}||topicName={}||metricName={}||errMsg=exception",
|
||||
clusterPhyId, topicName, v.getName(), e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
metricsMap.entrySet().forEach(elem -> doOptimizeMetric(elem.getValue()));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ReplicaMetricEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_REPLICATION;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Component
|
||||
public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationMetrics> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
|
||||
@Autowired
|
||||
private VersionControlService versionControlService;
|
||||
|
||||
@Autowired
|
||||
private ReplicaMetricService replicaMetricService;
|
||||
|
||||
@Autowired
|
||||
private PartitionService partitionService;
|
||||
|
||||
@Override
|
||||
public void collectMetrics(ClusterPhy clusterPhy) {
|
||||
Long startTime = System.currentTimeMillis();
|
||||
Long clusterPhyId = clusterPhy.getId();
|
||||
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
|
||||
|
||||
List<Partition> partitions = partitionService.listPartitionByCluster(clusterPhyId);
|
||||
|
||||
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
|
||||
|
||||
List<ReplicationMetrics> metricsList = new ArrayList<>();
|
||||
for(Partition partition : partitions) {
|
||||
for (Integer brokerId: partition.getAssignReplicaList()) {
|
||||
ReplicationMetrics metrics = new ReplicationMetrics(clusterPhyId, partition.getTopicName(), brokerId, partition.getPartitionId());
|
||||
metricsList.add(metrics);
|
||||
|
||||
future.runnableTask(
|
||||
String.format("method=ReplicaMetricCollector||clusterPhyId=%d||brokerId=%d||topicName=%s||partitionId=%d",
|
||||
clusterPhyId, brokerId, partition.getTopicName(), partition.getPartitionId()),
|
||||
30000,
|
||||
() -> collectMetrics(clusterPhyId, metrics, items)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
future.waitExecute(30000);
|
||||
|
||||
publishMetric(new ReplicaMetricEvent(this, metricsList));
|
||||
|
||||
LOGGER.info("method=ReplicaMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=collect finished.",
|
||||
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionItemTypeEnum collectorType() {
|
||||
return METRIC_REPLICATION;
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private ReplicationMetrics collectMetrics(Long clusterPhyId, ReplicationMetrics metrics, List<VersionControlItem> items) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
|
||||
|
||||
for(VersionControlItem v : items) {
|
||||
try {
|
||||
if (metrics.getMetrics().containsKey(v.getName())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Result<ReplicationMetrics> ret = replicaMetricService.collectReplicaMetricsFromKafkaWithCache(
|
||||
clusterPhyId,
|
||||
metrics.getTopic(),
|
||||
metrics.getBrokerId(),
|
||||
metrics.getPartitionId(),
|
||||
v.getName()
|
||||
);
|
||||
|
||||
if (null == ret || ret.failed() || null == ret.getData()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
metrics.putMetric(ret.getData().getMetrics());
|
||||
|
||||
if (!EnvUtil.isOnline()) {
|
||||
LOGGER.info("method=ReplicaMetricCollector||clusterPhyId={}||topicName={}||partitionId={}||metricName={}||metricValue={}",
|
||||
clusterPhyId, metrics.getTopic(), metrics.getPartitionId(), v.getName(), JSON.toJSONString(ret.getData().getMetrics()));
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("method=ReplicaMetricCollector||clusterPhyId={}||topicName={}||partition={}||metricName={}||errMsg=exception!",
|
||||
clusterPhyId, metrics.getTopic(), metrics.getPartitionId(), v.getName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
doOptimizeMetric(metrics);
|
||||
|
||||
// 记录采集性能
|
||||
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
|
||||
|
||||
return metrics;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,139 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.TopicMetricEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_TOPIC;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Component
|
||||
public class TopicMetricCollector extends AbstractMetricCollector<List<TopicMetrics>> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
|
||||
@Autowired
|
||||
private VersionControlService versionControlService;
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
|
||||
@Autowired
|
||||
private TopicMetricService topicMetricService;
|
||||
|
||||
private static final Integer AGG_METRICS_BROKER_ID = -10000;
|
||||
|
||||
@Override
|
||||
public void collectMetrics(ClusterPhy clusterPhy) {
|
||||
Long startTime = System.currentTimeMillis();
|
||||
Long clusterPhyId = clusterPhy.getId();
|
||||
List<Topic> topics = topicService.listTopicsFromCacheFirst(clusterPhyId);
|
||||
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
|
||||
|
||||
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);
|
||||
|
||||
Map<String/*Topic名称*/, Map<Integer/*BrokerId*/, TopicMetrics/*metrics*/>> allMetricsMap = new ConcurrentHashMap<>();
|
||||
|
||||
for(Topic topic : topics) {
|
||||
Map<Integer, TopicMetrics> metricsMap = new ConcurrentHashMap<>();
|
||||
metricsMap.put(AGG_METRICS_BROKER_ID, new TopicMetrics(topic.getTopicName(), clusterPhyId));
|
||||
metricsMap.get(AGG_METRICS_BROKER_ID).putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, Constant.COLLECT_METRICS_ERROR_COST_TIME);
|
||||
|
||||
allMetricsMap.put(topic.getTopicName(), metricsMap);
|
||||
|
||||
future.runnableTask(
|
||||
String.format("method=TopicMetricCollector||clusterPhyId=%d||topicName=%s", clusterPhyId, topic.getTopicName()),
|
||||
30000,
|
||||
() -> collectMetrics(clusterPhyId, topic.getTopicName(), metricsMap, items)
|
||||
);
|
||||
}
|
||||
|
||||
future.waitExecute(30000);
|
||||
|
||||
List<TopicMetrics> metricsList = new ArrayList<>();
|
||||
allMetricsMap.values().forEach(elem -> metricsList.addAll(elem.values()));
|
||||
|
||||
this.publishMetric(new TopicMetricEvent(this, metricsList));
|
||||
|
||||
LOGGER.info("method=TopicMetricCollector||clusterPhyId={}||startTime={}||costTime={}||msg=collect finished.",
|
||||
clusterPhyId, startTime, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionItemTypeEnum collectorType() {
|
||||
return METRIC_TOPIC;
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private void collectMetrics(Long clusterPhyId, String topicName, Map<Integer, TopicMetrics> metricsMap, List<VersionControlItem> items) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
TopicMetrics aggMetrics = metricsMap.get(AGG_METRICS_BROKER_ID);
|
||||
for (VersionControlItem v : items) {
|
||||
try {
|
||||
if (aggMetrics.getMetrics().containsKey(v.getName())) {
|
||||
// 如果已经有该指标,则直接continue
|
||||
continue;
|
||||
}
|
||||
|
||||
Result<List<TopicMetrics>> ret = topicMetricService.collectTopicMetricsFromKafkaWithCacheFirst(clusterPhyId, topicName, v.getName());
|
||||
if (null == ret || ret.failed() || ValidateUtils.isEmptyList(ret.getData())) {
|
||||
// 返回为空、错误、无数据的情况下,直接跳过
|
||||
continue;
|
||||
}
|
||||
|
||||
// 记录数据
|
||||
ret.getData().stream().forEach(metrics -> {
|
||||
if (metrics.isBBrokerAgg()) {
|
||||
aggMetrics.putMetric(metrics.getMetrics());
|
||||
} else {
|
||||
metricsMap.putIfAbsent(
|
||||
metrics.getBrokerId(),
|
||||
new TopicMetrics(topicName, clusterPhyId, metrics.getBrokerId(), false)
|
||||
);
|
||||
|
||||
metricsMap.get(metrics.getBrokerId()).putMetric(metrics.getMetrics());
|
||||
}
|
||||
});
|
||||
|
||||
if (!EnvUtil.isOnline()) {
|
||||
LOGGER.info("method=TopicMetricCollector||clusterPhyId={}||topicName={}||metricName={}||metricValue={}.",
|
||||
clusterPhyId, topicName, v.getName(), ConvertUtil.obj2Json(ret.getData())
|
||||
);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("method=TopicMetricCollector||clusterPhyId={}||topicName={}||metricName={}||errMsg=exception!",
|
||||
clusterPhyId, topicName, v.getName(), e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
doOptimizeMetric(aggMetrics);
|
||||
|
||||
// 记录采集性能
|
||||
aggMetrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,263 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.service;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@Service
|
||||
public class CollectThreadPoolService {
|
||||
private static final ILog LOGGER = LogFactory.getLog(CollectThreadPoolService.class);
|
||||
|
||||
private final AtomicLong shardIdx = new AtomicLong(0L);
|
||||
|
||||
@Value(value = "${thread-pool.collector.future-util.num:1}")
|
||||
private Integer futureUtilNum;
|
||||
|
||||
@Value(value = "${thread-pool.collector.future-util.thread-num:8}")
|
||||
private Integer futureUtilThreadNum;
|
||||
|
||||
@Value(value = "${thread-pool.collector.future-util.queue-size:10000}")
|
||||
private Integer futureUtilQueueSize;
|
||||
|
||||
@Value(value = "${thread-pool.collector.future-util.select-suitable-enable:true}")
|
||||
private Boolean futureUtilSelectSuitableEnable;
|
||||
|
||||
@Value(value = "${thread-pool.collector.future-util.suitable-queue-size:5000}")
|
||||
private Integer futureUtilSuitableQueueSize;
|
||||
|
||||
private static final Map<Long, FutureWaitUtil<Void>> SHARD_ID_FUTURE_UTIL_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private static final Cache<Long, Long> PHYSICAL_CLUSTER_ID_SHARD_ID_CACHE = Caffeine
|
||||
.newBuilder()
|
||||
.expireAfterWrite(16, TimeUnit.MINUTES)
|
||||
.maximumSize(1000)
|
||||
.build();
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
if (futureUtilNum <= 0) {
|
||||
futureUtilNum = 1;
|
||||
}
|
||||
|
||||
// 初始化job线程池
|
||||
for (int idx = 0; idx < futureUtilNum; ++idx) {
|
||||
closeOldAndCreateNew((long)idx);
|
||||
}
|
||||
}
|
||||
|
||||
public FutureWaitUtil<Void> selectSuitableFutureUtil(Long clusterPhyId) {
|
||||
// 获取集群对应的shardId
|
||||
Long shardId = this.getShardId(clusterPhyId);
|
||||
|
||||
return SHARD_ID_FUTURE_UTIL_MAP.get(shardId);
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private Long getShardId(Long clusterPhyId) {
|
||||
Long shardId = PHYSICAL_CLUSTER_ID_SHARD_ID_CACHE.getIfPresent(clusterPhyId);
|
||||
if (shardId == null) {
|
||||
shardId = shardIdx.incrementAndGet() % this.futureUtilNum;
|
||||
}
|
||||
|
||||
PHYSICAL_CLUSTER_ID_SHARD_ID_CACHE.put(clusterPhyId, shardId);
|
||||
return shardId;
|
||||
}
|
||||
|
||||
/**************************************************** schedule flush method ****************************************************/
|
||||
|
||||
@Scheduled(cron="0 0/5 * * * ?")
|
||||
public void flush() {
|
||||
// 每个shard对应的集群ID,这里使用cache的原因是,需要将长期不使用的集群过滤掉
|
||||
Map<Long, List<Long>> shardIdPhysicalClusterIdListMap = new HashMap<>();
|
||||
for (Map.Entry<Long, Long> entry: PHYSICAL_CLUSTER_ID_SHARD_ID_CACHE.asMap().entrySet()) {
|
||||
shardIdPhysicalClusterIdListMap.putIfAbsent(entry.getValue(), new ArrayList<>());
|
||||
shardIdPhysicalClusterIdListMap.get(entry.getValue()).add(entry.getKey());
|
||||
}
|
||||
|
||||
// 集群在线程池的分布信息
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (Map.Entry<Long, FutureWaitUtil<Void>> entry: SHARD_ID_FUTURE_UTIL_MAP.entrySet()) {
|
||||
// 释放被canceled的任务
|
||||
entry.getValue().purgeExecutor();
|
||||
|
||||
sb.append("shardId:").append(entry.getKey());
|
||||
sb.append(" queueSize:").append(entry.getValue().getExecutorQueueSize());
|
||||
sb.append(" physicalClusterIdList:").append(
|
||||
CommonUtils.longList2String(shardIdPhysicalClusterIdListMap.getOrDefault(entry.getKey(), new ArrayList<>()))
|
||||
);
|
||||
sb.append("\t\t\t");
|
||||
if (entry.getValue().getExecutorQueueSize() >= this.futureUtilSuitableQueueSize) {
|
||||
LOGGER.info("JobThreadPoolInfo\t\t\t shardId:{} queueSize:{} physicalClusterIdList:{}.",
|
||||
entry.getKey(),
|
||||
entry.getValue().getExecutorQueueSize(),
|
||||
CommonUtils.longList2String(shardIdPhysicalClusterIdListMap.getOrDefault(entry.getKey(), new ArrayList<>()))
|
||||
);
|
||||
}
|
||||
}
|
||||
LOGGER.info("JobThreadPoolInfo\t\t\t {}...", sb);
|
||||
|
||||
try {
|
||||
if (futureUtilSelectSuitableEnable != null && futureUtilSelectSuitableEnable) {
|
||||
reBalancePhysicalClusterShard(shardIdPhysicalClusterIdListMap);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("rebalance job-thread-pool failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void reBalancePhysicalClusterShard(Map<Long, List<Long>> shardIdPhysicalClusterIdListMap) {
|
||||
List<Long> withoutClusterShardIdList = new ArrayList<>(); // 无集群任务的线程池
|
||||
List<Long> idleShardIdList = new ArrayList<>(); // 空闲的线程池
|
||||
List<Long> notBusyShardIdList = new ArrayList<>(); // 不忙的线程池
|
||||
List<Long> busyShardIdList = new ArrayList<>(); // 忙的线程池
|
||||
List<Long> overflowShardIdList = new ArrayList<>(); // 已处理不过来的线程池
|
||||
|
||||
// 统计各类线程池信息
|
||||
for (Map.Entry<Long, List<Long>> entry: shardIdPhysicalClusterIdListMap.entrySet()) {
|
||||
Integer queueSize = SHARD_ID_FUTURE_UTIL_MAP.get(entry.getKey()).getExecutorQueueSize();
|
||||
if (entry.getValue().isEmpty()) {
|
||||
withoutClusterShardIdList.add(entry.getKey());
|
||||
}
|
||||
|
||||
if (queueSize == 0) {
|
||||
// 队列为空
|
||||
idleShardIdList.add(entry.getKey());
|
||||
} else if (queueSize <= futureUtilSuitableQueueSize) {
|
||||
// 队列较空闲
|
||||
notBusyShardIdList.add(entry.getKey());
|
||||
} else if (queueSize >= futureUtilSuitableQueueSize - 10) {
|
||||
// 队列处理不过来
|
||||
overflowShardIdList.add(entry.getKey());
|
||||
} else {
|
||||
// 队列忙
|
||||
busyShardIdList.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
// 将队列满的线程池的集群拆分到不同的线程池中
|
||||
this.moveShardClusterToSuitableThreadPool(overflowShardIdList, shardIdPhysicalClusterIdListMap, withoutClusterShardIdList, idleShardIdList, notBusyShardIdList, true);
|
||||
|
||||
// 将busy队列的线程池的集群拆分到不同的线程池中
|
||||
this.moveShardClusterToSuitableThreadPool(busyShardIdList, shardIdPhysicalClusterIdListMap, withoutClusterShardIdList, idleShardIdList, notBusyShardIdList, false);
|
||||
}
|
||||
|
||||
private void moveShardClusterToSuitableThreadPool(List<Long> needMoveShardIdList,
|
||||
Map<Long, List<Long>> shardIdPhysicalClusterIdListMap,
|
||||
List<Long> withoutClusterShardIdList,
|
||||
List<Long> idleShardIdList,
|
||||
List<Long> notBusyShardIdList,
|
||||
boolean clearTaskIfFullAndOnlyOneCluster) {
|
||||
for (Long needMoveShardId: needMoveShardIdList) {
|
||||
List<Long> physicalClusterIdList = shardIdPhysicalClusterIdListMap.get(needMoveShardId);
|
||||
if ((physicalClusterIdList == null || physicalClusterIdList.isEmpty() || physicalClusterIdList.size() == 1) && clearTaskIfFullAndOnlyOneCluster) {
|
||||
// 仅一个集群,并且满了,则清空任务,重新跑任务
|
||||
closeOldAndCreateNew(needMoveShardId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (physicalClusterIdList == null) {
|
||||
// 无集群
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int idx = 0; idx < physicalClusterIdList.size() - 1; ++idx) {
|
||||
Long newSuitableShardId = this.selectAndEmptySuitableThreadPool(shardIdPhysicalClusterIdListMap, withoutClusterShardIdList, idleShardIdList, notBusyShardIdList);
|
||||
if (newSuitableShardId == null) {
|
||||
LOGGER.info("without suitable job-thread-pool and return.");
|
||||
return;
|
||||
}
|
||||
|
||||
modifyPhysicalClusterIdAndShardIdCache(physicalClusterIdList.get(idx), newSuitableShardId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Long selectAndEmptySuitableThreadPool(Map<Long, List<Long>> shardIdPhysicalClusterIdListMap,
|
||||
List<Long> withoutClusterShardIdList,
|
||||
List<Long> idleShardIdList,
|
||||
List<Long> notBusyShardIdList) {
|
||||
if (!withoutClusterShardIdList.isEmpty()) {
|
||||
// 先放入无集群任务的线程池
|
||||
return withoutClusterShardIdList.remove((int) 0);
|
||||
}
|
||||
|
||||
// 上一条件不满足时,优先放入比较空闲的池子
|
||||
Long newShardId = this.selectAndEmptySuitableThreadPool(shardIdPhysicalClusterIdListMap, idleShardIdList);
|
||||
|
||||
// 上一条件不满足时,最后尝试放入不忙的池子
|
||||
return newShardId == null? this.selectAndEmptySuitableThreadPool(shardIdPhysicalClusterIdListMap, notBusyShardIdList): newShardId;
|
||||
}
|
||||
|
||||
private Long selectAndEmptySuitableThreadPool(Map<Long, List<Long>> shardIdPhysicalClusterIdListMap, List<Long> taskThreadPoolList) {
|
||||
if (taskThreadPoolList.size() < 2) {
|
||||
// 没有空闲的线程池队列
|
||||
return null;
|
||||
}
|
||||
|
||||
// 将两个非忙的合并,空出一个新的交给需要的
|
||||
Long firstNotBusyShardId = taskThreadPoolList.remove((int) 0);
|
||||
Long secondNotBusyShardId = taskThreadPoolList.remove((int) 0);
|
||||
|
||||
List<Long> physicalClusterIdList = shardIdPhysicalClusterIdListMap.get(secondNotBusyShardId);
|
||||
if (physicalClusterIdList == null || physicalClusterIdList.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (Long physicalClusterId: physicalClusterIdList) {
|
||||
modifyPhysicalClusterIdAndShardIdCache(physicalClusterId, firstNotBusyShardId);
|
||||
}
|
||||
|
||||
return secondNotBusyShardId;
|
||||
}
|
||||
|
||||
private synchronized Long modifyPhysicalClusterIdAndShardIdCache(Long physicalClusterId, Long shardId) {
|
||||
if (shardId == null) {
|
||||
shardId = shardIdx.incrementAndGet() % futureUtilNum;
|
||||
}
|
||||
|
||||
PHYSICAL_CLUSTER_ID_SHARD_ID_CACHE.put(physicalClusterId, shardId);
|
||||
return shardId;
|
||||
}
|
||||
|
||||
private synchronized FutureWaitUtil<Void> closeOldAndCreateNew(Long shardId) {
|
||||
// 新的
|
||||
FutureWaitUtil<Void> newFutureUtil = FutureWaitUtil.init(
|
||||
"CollectorMetricsFutureUtil-Shard-" + shardId,
|
||||
this.futureUtilThreadNum,
|
||||
this.futureUtilThreadNum,
|
||||
this.futureUtilQueueSize
|
||||
);
|
||||
|
||||
// 存储新的,返回旧的
|
||||
FutureWaitUtil<Void> oldFutureUtil = SHARD_ID_FUTURE_UTIL_MAP.put(shardId, newFutureUtil);
|
||||
|
||||
// 为空,则直接返回
|
||||
if (oldFutureUtil == null) {
|
||||
return newFutureUtil;
|
||||
}
|
||||
|
||||
LOGGER.error("close old ThreadPoolExecutor and create new, shardId:{}.", shardId);
|
||||
try {
|
||||
oldFutureUtil.shutdownNow();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("close old ThreadPoolExecutor and create new, shutdownNow failed, shardId:{}.", shardId, e);
|
||||
}
|
||||
|
||||
return newFutureUtil;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user