diff --git a/container/helm/templates/configmap.yaml b/container/helm/templates/configmap.yaml
index b487f2bd..cefa9d0d 100644
--- a/container/helm/templates/configmap.yaml
+++ b/container/helm/templates/configmap.yaml
@@ -55,7 +55,7 @@ data:
didi:
app-topic-metrics-enabled: false
topic-request-time-metrics-enabled: false
- topic-throttled-metrics: false
+ topic-throttled-metrics-enabled: false
save-days: 7
# 任务相关的开关
diff --git a/distribution/conf/application.yml.example b/distribution/conf/application.yml.example
index 31c135af..138a44fe 100644
--- a/distribution/conf/application.yml.example
+++ b/distribution/conf/application.yml.example
@@ -46,8 +46,7 @@ custom:
didi:
app-topic-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
topic-request-time-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
- topic-throttled-metrics: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
- save-days: 7 #指标在DB中保持的天数,-1表示永久保存,7表示保存近7天的数据
+ topic-throttled-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
# 任务相关的开关
task:
@@ -56,6 +55,15 @@ task:
order-auto-exec: # 工单自动化审批线程的开关
topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启
app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启
+ metrics:
+ delete-metrics:
+ delete-limit-size: 1000
+ cluster-metrics-save-days: 14 # 集群指标保存天数
+ broker-metrics-save-days: 14 # Broker指标保存天数
+ topic-metrics-save-days: 7 # Topic指标保存天数
+ topic-request-time-metrics-save-days: 7 # Topic请求耗时指标保存天数
+ topic-throttled-metrics-save-days: 7 # Topic限流指标保存天数
+ app-topic-metrics-save-days: 7 # App+Topic指标保存天数
# ldap相关的配置
account:
diff --git a/docs/install_guide/config_description.md b/docs/install_guide/config_description.md
index f615eacb..04335e29 100644
--- a/docs/install_guide/config_description.md
+++ b/docs/install_guide/config_description.md
@@ -51,7 +51,7 @@ custom:
didi:
app-topic-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
topic-request-time-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
- topic-throttled-metrics: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
+ topic-throttled-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
save-days: 7 #指标在DB中保持的天数,-1表示永久保存,7表示保存近7天的数据
# 任务相关的开关
diff --git a/kafka-manager-common/pom.xml b/kafka-manager-common/pom.xml
index 6a8ff0cb..f6c33def 100644
--- a/kafka-manager-common/pom.xml
+++ b/kafka-manager-common/pom.xml
@@ -109,5 +109,11 @@
junit
junit
+
+
+ org.projectlombok
+ lombok
+ compile
+
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java
index 5df85b5e..751e08c2 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java
@@ -1,5 +1,6 @@
package com.xiaojukeji.kafka.manager.service.utils;
+import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@@ -8,38 +9,15 @@ import org.springframework.stereotype.Service;
* @author zengqiao
* @date 20/4/26
*/
+@Data
@Service("configUtils")
public class ConfigUtils {
- @Value(value = "${custom.idc}")
+ private ConfigUtils() {
+ }
+
+ @Value(value = "${custom.idc:cn}")
private String idc;
@Value(value = "${spring.profiles.active}")
private String kafkaManagerEnv;
-
- @Value(value = "${custom.store-metrics-task.save-days}")
- private Long maxMetricsSaveDays;
-
- public String getIdc() {
- return idc;
- }
-
- public void setIdc(String idc) {
- this.idc = idc;
- }
-
- public String getKafkaManagerEnv() {
- return kafkaManagerEnv;
- }
-
- public void setKafkaManagerEnv(String kafkaManagerEnv) {
- this.kafkaManagerEnv = kafkaManagerEnv;
- }
-
- public Long getMaxMetricsSaveDays() {
- return maxMetricsSaveDays;
- }
-
- public void setMaxMetricsSaveDays(Long maxMetricsSaveDays) {
- this.maxMetricsSaveDays = maxMetricsSaveDays;
- }
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/BrokerMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/BrokerMetricsDao.java
index 75399538..9f1d36eb 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/BrokerMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/BrokerMetricsDao.java
@@ -20,5 +20,5 @@ public interface BrokerMetricsDao {
*/
List getBrokerMetrics(Long clusterId, Integer brokerId, Date startTime, Date endTime);
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/ClusterMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/ClusterMetricsDao.java
index d0731508..0e2e68a7 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/ClusterMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/ClusterMetricsDao.java
@@ -10,5 +10,5 @@ public interface ClusterMetricsDao {
List getClusterMetrics(long clusterId, Date startTime, Date endTime);
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicAppMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicAppMetricsDao.java
index 9d02c5d5..e0c3f84e 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicAppMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicAppMetricsDao.java
@@ -30,5 +30,5 @@ public interface TopicAppMetricsDao {
* @param endTime
* @return
*/
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicMetricsDao.java
index 58029f36..5d7af6e0 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicMetricsDao.java
@@ -22,5 +22,5 @@ public interface TopicMetricsDao {
List getLatestTopicMetrics(Long clusterId, Date afterTime);
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicRequestMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicRequestMetricsDao.java
index e7fd5169..5e6b237d 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicRequestMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicRequestMetricsDao.java
@@ -33,9 +33,7 @@ public interface TopicRequestMetricsDao {
* @param endTime
* @return
*/
- int deleteBeforeTime(Date endTime);
-
- int deleteBeforeId(Long id);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
List getById(Long startId, Long endId);
}
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java
index 1010cc17..cc975c52 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java
@@ -32,5 +32,5 @@ public interface TopicThrottledMetricsDao {
List getLatestTopicThrottledMetrics(Long clusterId, Date afterTime);
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerMetricsImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerMetricsImpl.java
index 5a06e5ce..bba58185 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerMetricsImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerMetricsImpl.java
@@ -37,7 +37,10 @@ public class BrokerMetricsImpl implements BrokerMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("BrokerMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("BrokerMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/ClusterMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/ClusterMetricsDaoImpl.java
index b05d3c0f..08948871 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/ClusterMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/ClusterMetricsDaoImpl.java
@@ -27,7 +27,7 @@ public class ClusterMetricsDaoImpl implements ClusterMetricsDao {
@Override
public List getClusterMetrics(long clusterId, Date startTime, Date endTime) {
- Map map = new HashMap(3);
+ Map map = new HashMap<>(3);
map.put("clusterId", clusterId);
map.put("startTime", startTime);
map.put("endTime", endTime);
@@ -35,7 +35,10 @@ public class ClusterMetricsDaoImpl implements ClusterMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("ClusterMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("ClusterMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicAppMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicAppMetricsDaoImpl.java
index fe55a1ab..90ce7e3e 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicAppMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicAppMetricsDaoImpl.java
@@ -46,7 +46,10 @@ public class TopicAppMetricsDaoImpl implements TopicAppMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("TopicAppMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("TopicAppMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicMetricsDaoImpl.java
index 7397a28c..a7eae32c 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicMetricsDaoImpl.java
@@ -60,7 +60,10 @@ public class TopicMetricsDaoImpl implements TopicMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("TopicMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("TopicMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicRequestMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicRequestMetricsDaoImpl.java
index bfaa552c..e59324f5 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicRequestMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicRequestMetricsDaoImpl.java
@@ -45,13 +45,11 @@ public class TopicRequestMetricsDaoImpl implements TopicRequestMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("TopicRequestMetricsDao.deleteBeforeTime", endTime);
- }
-
- @Override
- public int deleteBeforeId(Long id) {
- return sqlSession.delete("TopicRequestMetricsDao.deleteBeforeId", id);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>();
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("TopicRequestMetricsDao.deleteBeforeTime", params);
}
@Override
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java
index 784bc242..b1f64d43 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java
@@ -75,7 +75,10 @@ public class TopicThrottledMetricsDaoImpl implements TopicThrottledMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("TopicThrottledMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("TopicThrottledMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/resources/mapper/BrokerMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/BrokerMetricsDao.xml
index 49746df7..b5115e10 100644
--- a/kafka-manager-dao/src/main/resources/mapper/BrokerMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/BrokerMetricsDao.xml
@@ -29,9 +29,9 @@
]]>
-
+
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/resources/mapper/ClusterMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/ClusterMetricsDao.xml
index 11614d2d..8aca62ee 100644
--- a/kafka-manager-dao/src/main/resources/mapper/ClusterMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/ClusterMetricsDao.xml
@@ -27,9 +27,9 @@
-
+
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicAppMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicAppMetricsDao.xml
index 1c64c0ce..fff5037a 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicAppMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicAppMetricsDao.xml
@@ -30,9 +30,9 @@
]]>
-
+
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml
index 53e13b2d..249863f4 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml
@@ -37,9 +37,9 @@
]]>
-
+
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicRequestMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicRequestMetricsDao.xml
index b9aaa35b..7ad5e679 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicRequestMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicRequestMetricsDao.xml
@@ -34,15 +34,9 @@
ORDER BY gmt_create ASC
-
+
-
-
-
-
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml
index c5b6474d..e163d30f 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml
@@ -54,9 +54,9 @@
AND gmt_create > #{afterTime}
-
+
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java
index b8632971..16c2a012 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java
@@ -1,15 +1,15 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete;
-import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
+import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
import com.xiaojukeji.kafka.manager.dao.*;
-import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import java.util.Arrays;
import java.util.Date;
@@ -22,10 +22,7 @@ import java.util.List;
*/
@CustomScheduled(name = "deleteMetrics", cron = "0 0/2 * * * ?", threadNum = 1)
public class DeleteMetrics extends AbstractScheduledTask {
- private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
-
- @Autowired
- private ConfigUtils configUtils;
+ private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private TopicMetricsDao topicMetricsDao;
@@ -45,6 +42,27 @@ public class DeleteMetrics extends AbstractScheduledTask {
@Autowired
private TopicThrottledMetricsDao topicThrottledMetricsDao;
+ @Value(value = "${task.metrics.delete-metrics.delete-limit-size:1000}")
+ private Integer deleteLimitSize;
+
+ @Value(value = "${task.metrics.delete-metrics.cluster-metrics-save-days:14}")
+ private Integer clusterMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete-metrics.broker-metrics-save-days:14}")
+ private Integer brokerMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete-metrics.topic-metrics-save-days:7}")
+ private Integer topicMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete-metrics.topic-request-time-metrics-save-days:7}")
+ private Integer topicRequestTimeMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete-metrics.topic-throttled-metrics-save-days:7}")
+ private Integer topicThrottledMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete-metrics.app-topic-metrics-save-days:7}")
+ private Integer appTopicMetricsSaveDays;
+
@Override
public List listAllTasks() {
EmptyEntry emptyEntry = new EmptyEntry();
@@ -54,78 +72,73 @@ public class DeleteMetrics extends AbstractScheduledTask {
@Override
public void processTask(EmptyEntry entryEntry) {
- if (Constant.INVALID_CODE.equals(configUtils.getMaxMetricsSaveDays())) {
- // 无需数据删除
- return;
- }
-
long startTime = System.currentTimeMillis();
LOGGER.info("start delete metrics");
- try {
- deleteTopicMetrics();
- } catch (Exception e) {
- LOGGER.error("delete topic metrics failed.", e);
+
+ // 数据量可能比较大,一次触发多删除几次
+ for (int i = 0; i < 10; ++i) {
+ try {
+ boolean needReDelete = this.deleteCommunityTopicMetrics();
+ if (!needReDelete) {
+ break;
+ }
+
+ // 暂停1000毫秒,避免删除太快导致DB出现问题
+ BackoffUtils.backoff(1000);
+ } catch (Exception e) {
+ LOGGER.error("delete community topic metrics failed.", e);
+ }
+ }
+
+ // 数据量可能比较大,一次触发多删除几次
+ for (int i = 0; i < 10; ++i) {
+ try {
+ boolean needReDelete = this.deleteDiDiTopicMetrics();
+ if (!needReDelete) {
+ break;
+ }
+
+ // 暂停1000毫秒,避免删除太快导致DB出现问题
+ BackoffUtils.backoff(1000);
+ } catch (Exception e) {
+ LOGGER.error("delete didi topic metrics failed.", e);
+ }
}
try {
- deleteTopicAppMetrics();
+ this.deleteClusterBrokerMetrics();
} catch (Exception e) {
- LOGGER.error("delete topic app metrics failed.", e);
+ LOGGER.error("delete cluster and broker metrics failed.", e);
}
- try {
- deleteTopicRequestMetrics();
- } catch (Exception e) {
- LOGGER.error("delete topic request metrics failed.", e);
- }
-
- try {
- deleteThrottledMetrics();
- } catch (Exception e) {
- LOGGER.error("delete topic throttled metrics failed.", e);
- }
-
- try {
- deleteBrokerMetrics();
- } catch (Exception e) {
- LOGGER.error("delete broker metrics failed.", e);
- }
-
- try {
- deleteClusterMetrics();
- } catch (Exception e) {
- LOGGER.error("delete cluster metrics failed.", e);
- }
LOGGER.info("finish delete metrics, costTime:{}ms.", System.currentTimeMillis() - startTime);
}
- private void deleteTopicMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- topicMetricsDao.deleteBeforeTime(endTime);
+ private boolean deleteCommunityTopicMetrics() {
+ return topicMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize;
}
- private void deleteTopicAppMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- topicAppMetricsDao.deleteBeforeTime(endTime);
+ private boolean deleteDiDiTopicMetrics() {
+ boolean needReDelete = false;
+
+ if (topicAppMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.appTopicMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
+ needReDelete = true;
+ }
+
+ if (topicRequestMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicRequestTimeMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
+ needReDelete = true;
+ }
+
+ if (topicThrottledMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicThrottledMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
+ needReDelete = true;
+ }
+
+ return needReDelete;
}
- private void deleteTopicRequestMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- topicRequestMetricsDao.deleteBeforeTime(endTime);
- }
+ private void deleteClusterBrokerMetrics() {
+ brokerMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.brokerMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize);
- private void deleteThrottledMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- topicThrottledMetricsDao.deleteBeforeTime(endTime);
- }
-
- private void deleteBrokerMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- brokerMetricsDao.deleteBeforeTime(endTime);
- }
-
- private void deleteClusterMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- clusterMetricsDao.deleteBeforeTime(endTime);
+ clusterMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.clusterMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize);
}
}
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java
index 4e34e732..e94a2793 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java
@@ -22,7 +22,7 @@ import java.util.*;
* @date 20/9/24
*/
@Component("storeTopicThrottledMetrics2DB")
-@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics", havingValue = "true", matchIfMissing = true)
+@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class StoreTopicThrottledMetrics2DB implements ApplicationListener {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml
index a4648a46..1d816604 100644
--- a/kafka-manager-web/src/main/resources/application.yml
+++ b/kafka-manager-web/src/main/resources/application.yml
@@ -30,25 +30,31 @@ logging:
custom:
idc: cn
- jmx:
- max-conn: 10 # 2.3版本配置不在这个地方生效
store-metrics-task:
community:
broker-metrics-enabled: true
topic-metrics-enabled: true
- didi:
+ didi: # 滴滴Kafka特有的指标
app-topic-metrics-enabled: false
topic-request-time-metrics-enabled: false
- topic-throttled-metrics: false
- save-days: 7
+ topic-throttled-metrics-enabled: false
-# 任务相关的开关
+# 任务相关的配置
task:
op:
sync-topic-enabled: false # 未落盘的Topic定期同步到DB中
order-auto-exec: # 工单自动化审批线程的开关
topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启
app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启
+ metrics:
+ delete-metrics:
+ delete-limit-size: 1000
+ cluster-metrics-save-days: 14 # 集群指标保存天数
+ broker-metrics-save-days: 14 # Broker指标保存天数
+ topic-metrics-save-days: 7 # Topic指标保存天数
+ topic-request-time-metrics-save-days: 7 # Topic请求耗时指标保存天数
+ topic-throttled-metrics-save-days: 7 # Topic限流指标保存天数
+ app-topic-metrics-save-days: 7 # App+Topic指标保存天数
account:
ldap:
diff --git a/pom.xml b/pom.xml
index 55d4ab46..9d226f87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -232,6 +232,13 @@
minio
7.1.0
+
+
+ org.projectlombok
+ lombok
+ 1.18.2
+ provided
+