梳理Task模块任务-BrokerMetrics任务梳理

This commit is contained in:
zengqiao
2022-01-17 15:28:36 +08:00
parent f6ba8bc95e
commit 2790099efa
18 changed files with 398 additions and 264 deletions

View File

@@ -1,7 +1,6 @@
package com.xiaojukeji.kafka.manager.task.component;
import com.google.common.collect.Lists;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.NetUtils;
@@ -29,7 +28,7 @@ import java.util.concurrent.*;
* @date 20/8/10
*/
public abstract class AbstractScheduledTask<E extends Comparable> implements SchedulingConfigurer {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduledTask.class);
@Autowired
private HeartbeatDao heartbeatDao;

View File

@@ -1,6 +1,5 @@
package com.xiaojukeji.kafka.manager.task.component;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -9,11 +8,11 @@ import org.slf4j.LoggerFactory;
* @date 20/8/10
*/
public class BaseBizTask<E extends Comparable> implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduledTask.class);
private E task;
private final E task;
private AbstractScheduledTask scheduledTask;
private final AbstractScheduledTask scheduledTask;
public BaseBizTask(E task, AbstractScheduledTask scheduledTask) {
this.task = task;
@@ -30,6 +29,7 @@ public class BaseBizTask<E extends Comparable> implements Runnable {
} catch (Throwable t) {
LOGGER.error("scheduled task scheduleName:{} execute failed, task:{}", scheduledTask.getScheduledName(), task, t);
}
LOGGER.info("scheduled task scheduleName:{} finished, cost-time:{}ms.", scheduledTask.getScheduledName(), System.currentTimeMillis() - startTime);
}
}

View File

@@ -0,0 +1,93 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.JmxService;
import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import java.util.ArrayList;
import java.util.List;
/**
* Broker指标信息收集
* @author zengqiao
* @date 20/5/7
*/
@CustomScheduled(name = "collectAndPublishBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2)
@ConditionalOnProperty(prefix = "task.metrics.collect", name = "broker-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class CollectAndPublishBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
private static final Logger LOGGER = LoggerFactory.getLogger(CollectAndPublishBrokerMetrics.class);
@Autowired
private JmxService jmxService;
@Autowired
private ClusterService clusterService;
@Autowired
private AbstractHealthScoreStrategy healthScoreStrategy;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
long startTime = System.currentTimeMillis();
try {
SpringTool.publish(new BatchBrokerMetricsCollectedEvent(
this,
clusterDO.getId(),
startTime,
this.getBrokerMetrics(clusterDO.getId()))
);
} catch (Exception e) {
LOGGER.error("collect broker-metrics failed, physicalClusterId:{}.", clusterDO.getId(), e);
}
LOGGER.info("collect broker-metrics finished, physicalClusterId:{} costTime:{}", clusterDO.getId(), System.currentTimeMillis() - startTime);
}
private List<BrokerMetrics> getBrokerMetrics(Long clusterId) {
List<BrokerMetrics> metricsList = new ArrayList<>();
for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) {
BrokerMetrics metrics = jmxService.getBrokerMetrics(
clusterId,
brokerId,
KafkaMetricsCollections.BROKER_TO_DB_METRICS
);
if (ValidateUtils.isNull(metrics)) {
continue;
}
metrics.getMetricsMap().put(
JmxConstant.HEALTH_SCORE,
healthScoreStrategy.calBrokerHealthScore(clusterId, brokerId, metrics)
);
metricsList.add(metrics);
}
if (ValidateUtils.isEmptyList(metricsList)) {
return new ArrayList<>();
}
return metricsList;
}
}

View File

@@ -42,25 +42,25 @@ public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
@Autowired
private TopicThrottledMetricsDao topicThrottledMetricsDao;
@Value(value = "${task.metrics.delete-metrics.delete-limit-size:1000}")
@Value(value = "${task.metrics.delete.delete-limit-size:1000}")
private Integer deleteLimitSize;
@Value(value = "${task.metrics.delete-metrics.cluster-metrics-save-days:14}")
@Value(value = "${task.metrics.delete.cluster-metrics-save-days:14}")
private Integer clusterMetricsSaveDays;
@Value(value = "${task.metrics.delete-metrics.broker-metrics-save-days:14}")
@Value(value = "${task.metrics.delete.broker-metrics-save-days:14}")
private Integer brokerMetricsSaveDays;
@Value(value = "${task.metrics.delete-metrics.topic-metrics-save-days:7}")
@Value(value = "${task.metrics.delete.topic-metrics-save-days:7}")
private Integer topicMetricsSaveDays;
@Value(value = "${task.metrics.delete-metrics.topic-request-time-metrics-save-days:7}")
@Value(value = "${task.metrics.delete.topic-request-time-metrics-save-days:7}")
private Integer topicRequestTimeMetricsSaveDays;
@Value(value = "${task.metrics.delete-metrics.topic-throttled-metrics-save-days:7}")
@Value(value = "${task.metrics.delete.topic-throttled-metrics-save-days:7}")
private Integer topicThrottledMetricsSaveDays;
@Value(value = "${task.metrics.delete-metrics.app-topic-metrics-save-days:7}")
@Value(value = "${task.metrics.delete.app-topic-metrics-save-days:7}")
private Integer appTopicMetricsSaveDays;
@Override

View File

@@ -1,146 +0,0 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.ClusterMetrics;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.dao.BrokerMetricsDao;
import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao;
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.JmxService;
import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy;
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Broker指标信息存DB, Broker流量, 集群流量
* @author zengqiao
* @date 20/5/7
*/
@CustomScheduled(name = "storeBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2)
@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "broker-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private JmxService jmxService;
@Autowired
private ClusterService clusterService;
@Autowired
private BrokerMetricsDao brokerMetricsDao;
@Autowired
private ClusterMetricsDao clusterMetricsDao;
@Autowired
private AbstractHealthScoreStrategy healthScoreStrategy;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
long startTime = System.currentTimeMillis();
List<ClusterMetrics> clusterMetricsList = new ArrayList<>();
try {
List<BrokerMetrics> brokerMetricsList = getAndBatchAddMetrics(startTime, clusterDO.getId());
clusterMetricsList.add(supplyAndConvert2ClusterMetrics(
clusterDO.getId(),
MetricsConvertUtils.merge2BaseMetricsByAdd(brokerMetricsList))
);
} catch (Exception t) {
LOGGER.error("collect failed, clusterId:{}.", clusterDO.getId(), t);
}
long endTime = System.currentTimeMillis();
LOGGER.info("collect finish, clusterId:{} costTime:{}", clusterDO.getId(), endTime - startTime);
List<ClusterMetricsDO> doList = MetricsConvertUtils.convertAndUpdateCreateTime2ClusterMetricsDOList(
startTime,
clusterMetricsList
);
if (ValidateUtils.isEmptyList(doList)) {
return;
}
clusterMetricsDao.batchAdd(doList);
}
private List<BrokerMetrics> getAndBatchAddMetrics(Long startTime, Long clusterId) {
List<BrokerMetrics> metricsList = new ArrayList<>();
for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) {
BrokerMetrics metrics = jmxService.getBrokerMetrics(
clusterId,
brokerId,
KafkaMetricsCollections.BROKER_TO_DB_METRICS
);
if (ValidateUtils.isNull(metrics)) {
continue;
}
metrics.getMetricsMap().put(
JmxConstant.HEALTH_SCORE,
healthScoreStrategy.calBrokerHealthScore(clusterId, brokerId, metrics)
);
metricsList.add(metrics);
}
if (ValidateUtils.isEmptyList(metricsList)) {
return new ArrayList<>();
}
List<BrokerMetricsDO> doList =
MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(startTime, metricsList);
int i = 0;
do {
List<BrokerMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
if (ValidateUtils.isEmptyList(subDOList)) {
break;
}
brokerMetricsDao.batchAdd(subDOList);
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
return metricsList;
}
private ClusterMetrics supplyAndConvert2ClusterMetrics(Long clusterId, BaseMetrics baseMetrics) {
ClusterMetrics metrics = new ClusterMetrics(clusterId);
Map<String, Object> metricsMap = metrics.getMetricsMap();
metricsMap.putAll(baseMetrics.getMetricsMap());
metricsMap.put(JmxConstant.TOPIC_NUM, PhysicalClusterMetadataManager.getTopicNameList(clusterId).size());
metricsMap.put(JmxConstant.BROKER_NUM, PhysicalClusterMetadataManager.getBrokerIdList(clusterId).size());
Integer partitionNum = 0;
for (String topicName : PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
if (ValidateUtils.isNull(topicMetaData)) {
continue;
}
partitionNum += topicMetaData.getPartitionNum();
}
metricsMap.put(JmxConstant.PARTITION_NUM, partitionNum);
return metrics;
}
}

View File

@@ -0,0 +1,55 @@
package com.xiaojukeji.kafka.manager.task.listener.sink.db;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO;
import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.dao.BrokerMetricsDao;
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author zengqiao
* @date 22/01/17
*/
@Component
@ConditionalOnProperty(prefix = "task.metrics.sink.broker-metrics", name = "sink-db-enabled", havingValue = "true", matchIfMissing = true)
public class SinkBrokerMetrics2DB implements ApplicationListener<BatchBrokerMetricsCollectedEvent> {
private static final Logger logger = LoggerFactory.getLogger(SinkBrokerMetrics2DB.class);
@Autowired
private BrokerMetricsDao metricsDao;
@Override
public void onApplicationEvent(BatchBrokerMetricsCollectedEvent event) {
logger.debug("sink broker-metrics to db start, event:{}.", event);
List<BrokerMetrics> metricsList = event.getMetricsList();
if (ValidateUtils.isEmptyList(metricsList)) {
logger.warn("sink broker-metrics to db finished, without need sink, event:{}.", event);
return;
}
List<BrokerMetricsDO> doList = MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(event.getCollectTime(), metricsList);
int i = 0;
while (i < doList.size()) {
List<BrokerMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
if (ValidateUtils.isEmptyList(subDOList)) {
break;
}
metricsDao.batchAdd(subDOList);
i += Constant.BATCH_INSERT_SIZE;
}
logger.debug("sink broker-metrics to db finished, event:{}.", event);
}
}

View File

@@ -0,0 +1,80 @@
package com.xiaojukeji.kafka.manager.task.listener.sink.db;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.ClusterMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO;
import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* @author zengqiao
* @date 22/01/17
*/
@Component
@ConditionalOnProperty(prefix = "task.metrics.sink.cluster-metrics", name = "sink-db-enabled", havingValue = "true", matchIfMissing = true)
public class SinkClusterMetrics2DB implements ApplicationListener<BatchBrokerMetricsCollectedEvent> {
private static final Logger logger = LoggerFactory.getLogger(SinkClusterMetrics2DB.class);
@Autowired
private ClusterMetricsDao clusterMetricsDao;
@Override
public void onApplicationEvent(BatchBrokerMetricsCollectedEvent event) {
logger.debug("sink cluster-metrics to db start, event:{}.", event);
List<BrokerMetrics> metricsList = event.getMetricsList();
if (ValidateUtils.isEmptyList(metricsList)) {
logger.warn("sink cluster-metrics to db finished, without need sink, event:{}.", event);
return;
}
List<ClusterMetricsDO> doList = MetricsConvertUtils.convertAndUpdateCreateTime2ClusterMetricsDOList(
event.getCollectTime(),
// 合并broker-metrics为cluster-metrics
Arrays.asList(supplyAndConvert2ClusterMetrics(event.getPhysicalClusterId(), MetricsConvertUtils.merge2BaseMetricsByAdd(event.getMetricsList())))
);
if (ValidateUtils.isEmptyList(doList)) {
logger.warn("sink cluster-metrics to db finished, without need sink, event:{}.", event);
return;
}
clusterMetricsDao.batchAdd(doList);
logger.debug("sink cluster-metrics to db finished, event:{}.", event);
}
private ClusterMetrics supplyAndConvert2ClusterMetrics(Long clusterId, BaseMetrics baseMetrics) {
ClusterMetrics metrics = new ClusterMetrics(clusterId);
Map<String, Object> metricsMap = metrics.getMetricsMap();
metricsMap.putAll(baseMetrics.getMetricsMap());
metricsMap.put(JmxConstant.TOPIC_NUM, PhysicalClusterMetadataManager.getTopicNameList(clusterId).size());
metricsMap.put(JmxConstant.BROKER_NUM, PhysicalClusterMetadataManager.getBrokerIdList(clusterId).size());
Integer partitionNum = 0;
for (String topicName : PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
if (ValidateUtils.isNull(topicMetaData)) {
continue;
}
partitionNum += topicMetaData.getPartitionNum();
}
metricsMap.put(JmxConstant.PARTITION_NUM, partitionNum);
return metrics;
}
}

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.kafka.manager.task.listener;
package com.xiaojukeji.kafka.manager.task.listener.sink.db;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.kafka.manager.task.listener;
package com.xiaojukeji.kafka.manager.task.listener.sink.db;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.kafka.manager.task.listener;
package com.xiaojukeji.kafka.manager.task.listener.sink.kafka;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.kafka.manager.task.listener;
package com.xiaojukeji.kafka.manager.task.listener.sink.kafka;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.kafka.manager.task.listener;
package com.xiaojukeji.kafka.manager.task.listener.sink.monitor;
import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.kafka.manager.task.listener;
package com.xiaojukeji.kafka.manager.task.listener.sink.monitor;
import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.kafka.manager.task.listener;
package com.xiaojukeji.kafka.manager.task.listener.sink.monitor;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant;