mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
修复批量往DB写入空指标数组时报SQL语法异常的问题
This commit is contained in:
@@ -9,7 +9,7 @@ public class Constant {
|
||||
|
||||
public static final Integer MAX_AVG_BYTES_DURATION = 10;
|
||||
|
||||
public static final Integer BATCH_INSERT_SIZE = 50;
|
||||
public static final Integer BATCH_INSERT_SIZE = 30;
|
||||
|
||||
public static final Integer DEFAULT_SESSION_TIMEOUT_UNIT_MS = 30000;
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||
@@ -54,8 +55,6 @@ public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
@Autowired
|
||||
private AbstractHealthScoreStrategy healthScoreStrategy;
|
||||
|
||||
private static final Integer INSERT_BATCH_SIZE = 100;
|
||||
|
||||
@Override
|
||||
protected List<ClusterDO> listAllTasks() {
|
||||
return clusterService.list();
|
||||
@@ -72,7 +71,7 @@ public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
clusterDO.getId(),
|
||||
MetricsConvertUtils.merge2BaseMetricsByAdd(brokerMetricsList))
|
||||
);
|
||||
} catch (Throwable t) {
|
||||
} catch (Exception t) {
|
||||
LOGGER.error("collect failed, clusterId:{}.", clusterDO.getId(), t);
|
||||
}
|
||||
long endTime = System.currentTimeMillis();
|
||||
@@ -82,6 +81,11 @@ public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
startTime,
|
||||
clusterMetricsList
|
||||
);
|
||||
|
||||
if (ValidateUtils.isEmptyList(doList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
clusterMetricsDao.batchAdd(doList);
|
||||
}
|
||||
|
||||
@@ -110,9 +114,15 @@ public class StoreBrokerMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(startTime, metricsList);
|
||||
int i = 0;
|
||||
do {
|
||||
brokerMetricsDao.batchAdd(doList.subList(i, Math.min(i + INSERT_BATCH_SIZE, doList.size())));
|
||||
i += INSERT_BATCH_SIZE;
|
||||
List<BrokerMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
|
||||
if (ValidateUtils.isEmptyList(subDOList)) {
|
||||
break;
|
||||
}
|
||||
|
||||
brokerMetricsDao.batchAdd(subDOList);
|
||||
i += Constant.BATCH_INSERT_SIZE;
|
||||
} while (i < doList.size());
|
||||
|
||||
return metricsList;
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@@ -28,7 +27,7 @@ import java.util.*;
|
||||
@CustomScheduled(name = "storeDiDiAppTopicMetrics", cron = "41 0/1 * * * ?", threadNum = 5)
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "app-topic-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
@Autowired
|
||||
private JmxService jmxService;
|
||||
@@ -50,7 +49,7 @@ public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
|
||||
try {
|
||||
getAndBatchAddTopicAppMetrics(startTime, clusterDO.getId());
|
||||
} catch (Throwable t) {
|
||||
} catch (Exception t) {
|
||||
LOGGER.error("save topic metrics failed, clusterId:{}.", clusterDO.getId(), t);
|
||||
}
|
||||
}
|
||||
@@ -65,7 +64,12 @@ public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
|
||||
int i = 0;
|
||||
do {
|
||||
topicAppMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
|
||||
List<TopicMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
|
||||
if (ValidateUtils.isEmptyList(subDOList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
topicAppMetricsDao.batchAdd(subDOList);
|
||||
i += Constant.BATCH_INSERT_SIZE;
|
||||
} while (i < doList.size());
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ import java.util.*;
|
||||
@CustomScheduled(name = "storeDiDiTopicRequestTimeMetrics", cron = "51 0/1 * * * ?", threadNum = 5)
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-request-time-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask<ClusterDO> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
@Autowired
|
||||
private JmxService jmxService;
|
||||
@@ -51,7 +51,7 @@ public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask<Clus
|
||||
LOGGER.info("save topic metrics, clusterId:{}, start.", clusterDO.getId());
|
||||
getAndBatchAddTopicRequestTimeMetrics(startTime, clusterDO.getId());
|
||||
LOGGER.info("save topic metrics, clusterId:{}, end costTime:{}.", clusterDO.getId(), System.currentTimeMillis() - startTime);
|
||||
} catch (Throwable t) {
|
||||
} catch (Exception t) {
|
||||
LOGGER.error("save topic metrics failed, clusterId:{}.", clusterDO.getId(), t);
|
||||
}
|
||||
}
|
||||
@@ -69,7 +69,12 @@ public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask<Clus
|
||||
|
||||
int i = 0;
|
||||
do {
|
||||
topicRequestMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
|
||||
List<TopicMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
|
||||
if (ValidateUtils.isEmptyList(subDOList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
topicRequestMetricsDao.batchAdd(subDOList);
|
||||
i += Constant.BATCH_INSERT_SIZE;
|
||||
} while (i < doList.size());
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ import java.util.List;
|
||||
@Component("storeCommunityTopicMetrics2DB")
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "topic-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreCommunityTopicMetrics2DB implements ApplicationListener<TopicMetricsCollectedEvent> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
@Autowired
|
||||
private TopicMetricsDao topicMetricsDao;
|
||||
@@ -40,17 +40,21 @@ public class StoreCommunityTopicMetrics2DB implements ApplicationListener<TopicM
|
||||
|
||||
try {
|
||||
store2DB(System.currentTimeMillis(), metricsList);
|
||||
} catch (Throwable t) {
|
||||
} catch (Exception t) {
|
||||
LOGGER.error("save topic metrics failed, clusterId:{}.", event.getClusterId(), t);
|
||||
}
|
||||
}
|
||||
|
||||
private void store2DB(Long startTime, List<TopicMetrics> metricsList) throws Exception {
|
||||
List<TopicMetricsDO> doList =
|
||||
MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
|
||||
private void store2DB(Long startTime, List<TopicMetrics> metricsList) {
|
||||
List<TopicMetricsDO> doList = MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
|
||||
int i = 0;
|
||||
do {
|
||||
topicMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
|
||||
List<TopicMetricsDO> subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
|
||||
if (ValidateUtils.isEmptyList(subDOList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
topicMetricsDao.batchAdd(subDOList);
|
||||
i += Constant.BATCH_INSERT_SIZE;
|
||||
} while (i < doList.size());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user