mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-02 18:32:08 +08:00
support change delete metrics rate
This commit is contained in:
@@ -1,15 +1,15 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.*;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
|
||||
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
|
||||
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
|
||||
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
@@ -22,10 +22,7 @@ import java.util.List;
|
||||
*/
|
||||
@CustomScheduled(name = "deleteMetrics", cron = "0 0/2 * * * ?", threadNum = 1)
|
||||
public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
@Autowired
|
||||
private ConfigUtils configUtils;
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
@Autowired
|
||||
private TopicMetricsDao topicMetricsDao;
|
||||
@@ -45,6 +42,27 @@ public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
|
||||
@Autowired
|
||||
private TopicThrottledMetricsDao topicThrottledMetricsDao;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.delete-limit-size:1000}")
|
||||
private Integer deleteLimitSize;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.cluster-metrics-save-days:14}")
|
||||
private Integer clusterMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.broker-metrics-save-days:14}")
|
||||
private Integer brokerMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.topic-metrics-save-days:7}")
|
||||
private Integer topicMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.topic-request-time-metrics-save-days:7}")
|
||||
private Integer topicRequestTimeMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.topic-throttled-metrics-save-days:7}")
|
||||
private Integer topicThrottledMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.app-topic-metrics-save-days:7}")
|
||||
private Integer appTopicMetricsSaveDays;
|
||||
|
||||
@Override
|
||||
public List<EmptyEntry> listAllTasks() {
|
||||
EmptyEntry emptyEntry = new EmptyEntry();
|
||||
@@ -54,78 +72,73 @@ public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
|
||||
|
||||
@Override
|
||||
public void processTask(EmptyEntry entryEntry) {
|
||||
if (Constant.INVALID_CODE.equals(configUtils.getMaxMetricsSaveDays())) {
|
||||
// 无需数据删除
|
||||
return;
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
LOGGER.info("start delete metrics");
|
||||
try {
|
||||
deleteTopicMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete topic metrics failed.", e);
|
||||
|
||||
// 数据量可能比较大,一次触发多删除几次
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
try {
|
||||
boolean needReDelete = this.deleteCommunityTopicMetrics();
|
||||
if (!needReDelete) {
|
||||
break;
|
||||
}
|
||||
|
||||
// 暂停1000毫秒,避免删除太快导致DB出现问题
|
||||
BackoffUtils.backoff(1000);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete community topic metrics failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
// 数据量可能比较大,一次触发多删除几次
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
try {
|
||||
boolean needReDelete = this.deleteDiDiTopicMetrics();
|
||||
if (!needReDelete) {
|
||||
break;
|
||||
}
|
||||
|
||||
// 暂停1000毫秒,避免删除太快导致DB出现问题
|
||||
BackoffUtils.backoff(1000);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete didi topic metrics failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
deleteTopicAppMetrics();
|
||||
this.deleteClusterBrokerMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete topic app metrics failed.", e);
|
||||
LOGGER.error("delete cluster and broker metrics failed.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
deleteTopicRequestMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete topic request metrics failed.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
deleteThrottledMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete topic throttled metrics failed.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
deleteBrokerMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete broker metrics failed.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
deleteClusterMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete cluster metrics failed.", e);
|
||||
}
|
||||
LOGGER.info("finish delete metrics, costTime:{}ms.", System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
private void deleteTopicMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
topicMetricsDao.deleteBeforeTime(endTime);
|
||||
private boolean deleteCommunityTopicMetrics() {
|
||||
return topicMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize;
|
||||
}
|
||||
|
||||
private void deleteTopicAppMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
topicAppMetricsDao.deleteBeforeTime(endTime);
|
||||
private boolean deleteDiDiTopicMetrics() {
|
||||
boolean needReDelete = false;
|
||||
|
||||
if (topicAppMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.appTopicMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
|
||||
needReDelete = true;
|
||||
}
|
||||
|
||||
if (topicRequestMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicRequestTimeMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
|
||||
needReDelete = true;
|
||||
}
|
||||
|
||||
if (topicThrottledMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicThrottledMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
|
||||
needReDelete = true;
|
||||
}
|
||||
|
||||
return needReDelete;
|
||||
}
|
||||
|
||||
private void deleteTopicRequestMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
topicRequestMetricsDao.deleteBeforeTime(endTime);
|
||||
}
|
||||
private void deleteClusterBrokerMetrics() {
|
||||
brokerMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.brokerMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize);
|
||||
|
||||
private void deleteThrottledMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
topicThrottledMetricsDao.deleteBeforeTime(endTime);
|
||||
}
|
||||
|
||||
private void deleteBrokerMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
brokerMetricsDao.deleteBeforeTime(endTime);
|
||||
}
|
||||
|
||||
private void deleteClusterMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
clusterMetricsDao.deleteBeforeTime(endTime);
|
||||
clusterMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.clusterMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize);
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,7 @@ import java.util.*;
|
||||
* @date 20/9/24
|
||||
*/
|
||||
@Component("storeTopicThrottledMetrics2DB")
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics", havingValue = "true", matchIfMissing = true)
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreTopicThrottledMetrics2DB implements ApplicationListener<TopicThrottledMetricsCollectedEvent> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user