Merge pull request #421 from didi/dev

修复批量往DB写入空指标数组时报SQL语法异常的问题
This commit is contained in:
EricZeng
2022-01-07 14:16:07 +08:00
committed by GitHub
5 changed files with 42 additions and 19 deletions

View File

@@ -9,7 +9,7 @@ public class Constant {
public static final Integer MAX_AVG_BYTES_DURATION = 10;
public static final Integer BATCH_INSERT_SIZE = 50;
public static final Integer BATCH_INSERT_SIZE = 30;
public static final Integer DEFAULT_SESSION_TIMEOUT_UNIT_MS = 30000;

View File

@@ -1,5 +1,6 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
@@ -54,8 +55,6 @@ public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
@Autowired
private AbstractHealthScoreStrategy healthScoreStrategy;
private static final Integer INSERT_BATCH_SIZE = 100;
@Override
protected List<ClusterDO> listAllTasks() {
return clusterService.list();
@@ -72,7 +71,7 @@ public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
clusterDO.getId(),
MetricsConvertUtils.merge2BaseMetricsByAdd(brokerMetricsList))
);
} catch (Throwable t) {
} catch (Exception t) {
LOGGER.error("collect failed, clusterId:{}.", clusterDO.getId(), t);
}
long endTime = System.currentTimeMillis();
@@ -82,6 +81,11 @@ public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
startTime,
clusterMetricsList
);
if (ValidateUtils.isEmptyList(doList)) {
return;
}
clusterMetricsDao.batchAdd(doList);
}
@@ -110,9 +114,15 @@ public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(startTime, metricsList);
int i = 0;
do {
brokerMetricsDao.batchAdd(doList.subList(i, Math.min(i + INSERT_BATCH_SIZE, doList.size())));
i += INSERT_BATCH_SIZE;
List<BrokerMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
if (ValidateUtils.isEmptyList(subDOList)) {
break;
}
brokerMetricsDao.batchAdd(subDOList);
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
return metricsList;
}

View File

@@ -17,7 +17,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.*;
@@ -28,7 +27,7 @@ import java.util.*;
@CustomScheduled(name = "storeDiDiAppTopicMetrics", cron = "41 0/1 * * * ?", threadNum = 5)
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "app-topic-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private JmxService jmxService;
@@ -50,7 +49,7 @@ public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask<ClusterDO> {
try {
getAndBatchAddTopicAppMetrics(startTime, clusterDO.getId());
} catch (Throwable t) {
} catch (Exception t) {
LOGGER.error("save topic metrics failed, clusterId:{}.", clusterDO.getId(), t);
}
}
@@ -65,7 +64,12 @@ public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask<ClusterDO> {
MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
int i = 0;
do {
topicAppMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
List<TopicMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
if (ValidateUtils.isEmptyList(subDOList)) {
return;
}
topicAppMetricsDao.batchAdd(subDOList);
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}

View File

@@ -27,7 +27,7 @@ import java.util.*;
@CustomScheduled(name = "storeDiDiTopicRequestTimeMetrics", cron = "51 0/1 * * * ?", threadNum = 5)
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-request-time-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask<ClusterDO> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private JmxService jmxService;
@@ -51,7 +51,7 @@ public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask<Clus
LOGGER.info("save topic metrics, clusterId:{}, start.", clusterDO.getId());
getAndBatchAddTopicRequestTimeMetrics(startTime, clusterDO.getId());
LOGGER.info("save topic metrics, clusterId:{}, end costTime:{}.", clusterDO.getId(), System.currentTimeMillis() - startTime);
} catch (Throwable t) {
} catch (Exception t) {
LOGGER.error("save topic metrics failed, clusterId:{}.", clusterDO.getId(), t);
}
}
@@ -69,7 +69,12 @@ public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask<Clus
int i = 0;
do {
topicRequestMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
List<TopicMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
if (ValidateUtils.isEmptyList(subDOList)) {
return;
}
topicRequestMetricsDao.batchAdd(subDOList);
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}

View File

@@ -25,7 +25,7 @@ import java.util.List;
@Component("storeCommunityTopicMetrics2DB")
@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "topic-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class StoreCommunityTopicMetrics2DB implements ApplicationListener<TopicMetricsCollectedEvent> {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private TopicMetricsDao topicMetricsDao;
@@ -40,17 +40,21 @@ public class StoreCommunityTopicMetrics2DB implements ApplicationListener<TopicM
try {
store2DB(System.currentTimeMillis(), metricsList);
} catch (Throwable t) {
} catch (Exception t) {
LOGGER.error("save topic metrics failed, clusterId:{}.", event.getClusterId(), t);
}
}
private void store2DB(Long startTime, List<TopicMetrics> metricsList) throws Exception {
List<TopicMetricsDO> doList =
MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
private void store2DB(Long startTime, List<TopicMetrics> metricsList) {
List<TopicMetricsDO> doList = MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
int i = 0;
do {
topicMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
List<TopicMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
if (ValidateUtils.isEmptyList(subDOList)) {
return;
}
topicMetricsDao.batchAdd(subDOList);
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}