diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/Constant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/Constant.java index 7ecc295b..02331255 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/Constant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/Constant.java @@ -9,7 +9,7 @@ public class Constant { public static final Integer MAX_AVG_BYTES_DURATION = 10; - public static final Integer BATCH_INSERT_SIZE = 50; + public static final Integer BATCH_INSERT_SIZE = 30; public static final Integer DEFAULT_SESSION_TIMEOUT_UNIT_MS = 30000; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java index 50f5f633..22aeaf2a 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store; +import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; import com.xiaojukeji.kafka.manager.common.constant.LogConstant; import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; @@ -54,8 +55,6 @@ public class StoreBrokerMetrics extends AbstractScheduledTask { @Autowired private AbstractHealthScoreStrategy healthScoreStrategy; - private static final Integer INSERT_BATCH_SIZE = 100; - @Override protected List listAllTasks() { return clusterService.list(); @@ -72,7 +71,7 @@ public class StoreBrokerMetrics extends AbstractScheduledTask { clusterDO.getId(), MetricsConvertUtils.merge2BaseMetricsByAdd(brokerMetricsList)) ); - } catch (Throwable t) { + } catch (Exception t) { LOGGER.error("collect failed, clusterId:{}.", clusterDO.getId(), t); } long endTime = System.currentTimeMillis(); @@ -82,6 +81,11 @@ public class StoreBrokerMetrics extends AbstractScheduledTask { startTime, clusterMetricsList ); + + if (ValidateUtils.isEmptyList(doList)) { + return; + } + clusterMetricsDao.batchAdd(doList); } @@ -110,9 +114,15 @@ public class StoreBrokerMetrics extends AbstractScheduledTask { MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(startTime, metricsList); int i = 0; do { - brokerMetricsDao.batchAdd(doList.subList(i, Math.min(i + INSERT_BATCH_SIZE, doList.size()))); - i += INSERT_BATCH_SIZE; + List subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())); + if (ValidateUtils.isEmptyList(subDOList)) { + break; + } + + brokerMetricsDao.batchAdd(subDOList); + i += Constant.BATCH_INSERT_SIZE; } while (i < doList.size()); + return metricsList; } diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java index ede6525d..6543f6fa 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java @@ -17,7 +17,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; import java.util.*; @@ -28,7 +27,7 @@ import java.util.*; @CustomScheduled(name = "storeDiDiAppTopicMetrics", cron = "41 0/1 * * * ?", threadNum = 5) @ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "app-topic-metrics-enabled", havingValue = "true", matchIfMissing = true) public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask { - private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); + private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); @Autowired private JmxService jmxService; @@ -50,7 +49,7 @@ public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask { try { getAndBatchAddTopicAppMetrics(startTime, clusterDO.getId()); - } catch (Throwable t) { + } catch (Exception t) { LOGGER.error("save topic metrics failed, clusterId:{}.", clusterDO.getId(), t); } } @@ -65,7 +64,12 @@ public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask { MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList); int i = 0; do { - topicAppMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()))); + List subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())); + if (ValidateUtils.isEmptyList(subDOList)) { + return; + } + + topicAppMetricsDao.batchAdd(subDOList); i += Constant.BATCH_INSERT_SIZE; } while (i < doList.size()); } diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java index c4caa229..040612f2 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java @@ -27,7 +27,7 @@ import java.util.*; @CustomScheduled(name = "storeDiDiTopicRequestTimeMetrics", cron = "51 0/1 * * * ?", threadNum = 5) @ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-request-time-metrics-enabled", havingValue = "true", matchIfMissing = true) public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask { - private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); + private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); @Autowired private JmxService jmxService; @@ -51,7 +51,7 @@ public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())); + if (ValidateUtils.isEmptyList(subDOList)) { + return; + } + + topicRequestMetricsDao.batchAdd(subDOList); i += Constant.BATCH_INSERT_SIZE; } while (i < doList.size()); } diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java index 0c0714f7..f75368d2 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java @@ -25,7 +25,7 @@ import java.util.List; @Component("storeCommunityTopicMetrics2DB") @ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "topic-metrics-enabled", havingValue = "true", matchIfMissing = true) public class StoreCommunityTopicMetrics2DB implements ApplicationListener { - private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); + private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); @Autowired private TopicMetricsDao topicMetricsDao; @@ -40,17 +40,21 @@ public class StoreCommunityTopicMetrics2DB implements ApplicationListener metricsList) throws Exception { - List doList = - MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList); + private void store2DB(Long startTime, List metricsList) { + List doList = MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList); int i = 0; do { - topicMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()))); + List subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())); + if (ValidateUtils.isEmptyList(subDOList)) { + return; + } + + topicMetricsDao.batchAdd(subDOList); i += Constant.BATCH_INSERT_SIZE; } while (i < doList.size()); }