mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
Merge pull request #424 from didi/dev
1.add lombok; 2.support change delete metrics rate;
This commit is contained in:
@@ -55,7 +55,7 @@ data:
|
||||
didi:
|
||||
app-topic-metrics-enabled: false
|
||||
topic-request-time-metrics-enabled: false
|
||||
topic-throttled-metrics: false
|
||||
topic-throttled-metrics-enabled: false
|
||||
save-days: 7
|
||||
|
||||
# 任务相关的开关
|
||||
|
||||
@@ -46,8 +46,7 @@ custom:
|
||||
didi:
|
||||
app-topic-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
|
||||
topic-request-time-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
|
||||
topic-throttled-metrics: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
|
||||
save-days: 7 #指标在DB中保持的天数,-1表示永久保存,7表示保存近7天的数据
|
||||
topic-throttled-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
|
||||
|
||||
# 任务相关的开关
|
||||
task:
|
||||
@@ -56,6 +55,15 @@ task:
|
||||
order-auto-exec: # 工单自动化审批线程的开关
|
||||
topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启
|
||||
app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启
|
||||
metrics:
|
||||
delete-metrics:
|
||||
delete-limit-size: 1000
|
||||
cluster-metrics-save-days: 14 # 集群指标保存天数
|
||||
broker-metrics-save-days: 14 # Broker指标保存天数
|
||||
topic-metrics-save-days: 7 # Topic指标保存天数
|
||||
topic-request-time-metrics-save-days: 7 # Topic请求耗时指标保存天数
|
||||
topic-throttled-metrics-save-days: 7 # Topic限流指标保存天数
|
||||
app-topic-metrics-save-days: 7 # App+Topic指标保存天数
|
||||
|
||||
# ldap相关的配置
|
||||
account:
|
||||
|
||||
@@ -51,7 +51,7 @@ custom:
|
||||
didi:
|
||||
app-topic-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
|
||||
topic-request-time-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
|
||||
topic-throttled-metrics: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
|
||||
topic-throttled-metrics-enabled: false # 滴滴埋入的指标, 社区AK不存在该指标,因此默认关闭
|
||||
save-days: 7 #指标在DB中保持的天数,-1表示永久保存,7表示保存近7天的数据
|
||||
|
||||
# 任务相关的开关
|
||||
|
||||
@@ -109,5 +109,11 @@
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.xiaojukeji.kafka.manager.service.utils;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -8,38 +9,15 @@ import org.springframework.stereotype.Service;
|
||||
* @author zengqiao
|
||||
* @date 20/4/26
|
||||
*/
|
||||
@Data
|
||||
@Service("configUtils")
|
||||
public class ConfigUtils {
|
||||
@Value(value = "${custom.idc}")
|
||||
private ConfigUtils() {
|
||||
}
|
||||
|
||||
@Value(value = "${custom.idc:cn}")
|
||||
private String idc;
|
||||
|
||||
@Value(value = "${spring.profiles.active}")
|
||||
private String kafkaManagerEnv;
|
||||
|
||||
@Value(value = "${custom.store-metrics-task.save-days}")
|
||||
private Long maxMetricsSaveDays;
|
||||
|
||||
public String getIdc() {
|
||||
return idc;
|
||||
}
|
||||
|
||||
public void setIdc(String idc) {
|
||||
this.idc = idc;
|
||||
}
|
||||
|
||||
public String getKafkaManagerEnv() {
|
||||
return kafkaManagerEnv;
|
||||
}
|
||||
|
||||
public void setKafkaManagerEnv(String kafkaManagerEnv) {
|
||||
this.kafkaManagerEnv = kafkaManagerEnv;
|
||||
}
|
||||
|
||||
public Long getMaxMetricsSaveDays() {
|
||||
return maxMetricsSaveDays;
|
||||
}
|
||||
|
||||
public void setMaxMetricsSaveDays(Long maxMetricsSaveDays) {
|
||||
this.maxMetricsSaveDays = maxMetricsSaveDays;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,5 +20,5 @@ public interface BrokerMetricsDao {
|
||||
*/
|
||||
List<BrokerMetricsDO> getBrokerMetrics(Long clusterId, Integer brokerId, Date startTime, Date endTime);
|
||||
|
||||
int deleteBeforeTime(Date endTime);
|
||||
int deleteBeforeTime(Date endTime, Integer limitSize);
|
||||
}
|
||||
|
||||
@@ -10,5 +10,5 @@ public interface ClusterMetricsDao {
|
||||
|
||||
List<ClusterMetricsDO> getClusterMetrics(long clusterId, Date startTime, Date endTime);
|
||||
|
||||
int deleteBeforeTime(Date endTime);
|
||||
int deleteBeforeTime(Date endTime, Integer limitSize);
|
||||
}
|
||||
|
||||
@@ -30,5 +30,5 @@ public interface TopicAppMetricsDao {
|
||||
* @param endTime
|
||||
* @return
|
||||
*/
|
||||
int deleteBeforeTime(Date endTime);
|
||||
int deleteBeforeTime(Date endTime, Integer limitSize);
|
||||
}
|
||||
|
||||
@@ -22,5 +22,5 @@ public interface TopicMetricsDao {
|
||||
|
||||
List<TopicMetricsDO> getLatestTopicMetrics(Long clusterId, Date afterTime);
|
||||
|
||||
int deleteBeforeTime(Date endTime);
|
||||
int deleteBeforeTime(Date endTime, Integer limitSize);
|
||||
}
|
||||
|
||||
@@ -33,9 +33,7 @@ public interface TopicRequestMetricsDao {
|
||||
* @param endTime
|
||||
* @return
|
||||
*/
|
||||
int deleteBeforeTime(Date endTime);
|
||||
|
||||
int deleteBeforeId(Long id);
|
||||
int deleteBeforeTime(Date endTime, Integer limitSize);
|
||||
|
||||
List<TopicMetricsDO> getById(Long startId, Long endId);
|
||||
}
|
||||
@@ -32,5 +32,5 @@ public interface TopicThrottledMetricsDao {
|
||||
|
||||
List<TopicThrottledMetricsDO> getLatestTopicThrottledMetrics(Long clusterId, Date afterTime);
|
||||
|
||||
int deleteBeforeTime(Date endTime);
|
||||
int deleteBeforeTime(Date endTime, Integer limitSize);
|
||||
}
|
||||
|
||||
@@ -37,7 +37,10 @@ public class BrokerMetricsImpl implements BrokerMetricsDao {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteBeforeTime(Date endTime) {
|
||||
return sqlSession.delete("BrokerMetricsDao.deleteBeforeTime", endTime);
|
||||
public int deleteBeforeTime(Date endTime, Integer limitSize) {
|
||||
Map<String, Object> params = new HashMap<>(2);
|
||||
params.put("endTime", endTime);
|
||||
params.put("limitSize", limitSize);
|
||||
return sqlSession.delete("BrokerMetricsDao.deleteBeforeTime", params);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ public class ClusterMetricsDaoImpl implements ClusterMetricsDao {
|
||||
|
||||
@Override
|
||||
public List<ClusterMetricsDO> getClusterMetrics(long clusterId, Date startTime, Date endTime) {
|
||||
Map<String, Object> map = new HashMap<String, Object>(3);
|
||||
Map<String, Object> map = new HashMap<>(3);
|
||||
map.put("clusterId", clusterId);
|
||||
map.put("startTime", startTime);
|
||||
map.put("endTime", endTime);
|
||||
@@ -35,7 +35,10 @@ public class ClusterMetricsDaoImpl implements ClusterMetricsDao {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteBeforeTime(Date endTime) {
|
||||
return sqlSession.delete("ClusterMetricsDao.deleteBeforeTime", endTime);
|
||||
public int deleteBeforeTime(Date endTime, Integer limitSize) {
|
||||
Map<String, Object> params = new HashMap<>(2);
|
||||
params.put("endTime", endTime);
|
||||
params.put("limitSize", limitSize);
|
||||
return sqlSession.delete("ClusterMetricsDao.deleteBeforeTime", params);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +46,10 @@ public class TopicAppMetricsDaoImpl implements TopicAppMetricsDao {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteBeforeTime(Date endTime) {
|
||||
return sqlSession.delete("TopicAppMetricsDao.deleteBeforeTime", endTime);
|
||||
public int deleteBeforeTime(Date endTime, Integer limitSize) {
|
||||
Map<String, Object> params = new HashMap<>(2);
|
||||
params.put("endTime", endTime);
|
||||
params.put("limitSize", limitSize);
|
||||
return sqlSession.delete("TopicAppMetricsDao.deleteBeforeTime", params);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,7 +60,10 @@ public class TopicMetricsDaoImpl implements TopicMetricsDao {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteBeforeTime(Date endTime) {
|
||||
return sqlSession.delete("TopicMetricsDao.deleteBeforeTime", endTime);
|
||||
public int deleteBeforeTime(Date endTime, Integer limitSize) {
|
||||
Map<String, Object> params = new HashMap<>(2);
|
||||
params.put("endTime", endTime);
|
||||
params.put("limitSize", limitSize);
|
||||
return sqlSession.delete("TopicMetricsDao.deleteBeforeTime", params);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,13 +45,11 @@ public class TopicRequestMetricsDaoImpl implements TopicRequestMetricsDao {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteBeforeTime(Date endTime) {
|
||||
return sqlSession.delete("TopicRequestMetricsDao.deleteBeforeTime", endTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteBeforeId(Long id) {
|
||||
return sqlSession.delete("TopicRequestMetricsDao.deleteBeforeId", id);
|
||||
public int deleteBeforeTime(Date endTime, Integer limitSize) {
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put("endTime", endTime);
|
||||
params.put("limitSize", limitSize);
|
||||
return sqlSession.delete("TopicRequestMetricsDao.deleteBeforeTime", params);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -75,7 +75,10 @@ public class TopicThrottledMetricsDaoImpl implements TopicThrottledMetricsDao {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int deleteBeforeTime(Date endTime) {
|
||||
return sqlSession.delete("TopicThrottledMetricsDao.deleteBeforeTime", endTime);
|
||||
public int deleteBeforeTime(Date endTime, Integer limitSize) {
|
||||
Map<String, Object> params = new HashMap<>(2);
|
||||
params.put("endTime", endTime);
|
||||
params.put("limitSize", limitSize);
|
||||
return sqlSession.delete("TopicThrottledMetricsDao.deleteBeforeTime", params);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,9 +29,9 @@
|
||||
]]>
|
||||
</select>
|
||||
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Date">
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Map">
|
||||
<![CDATA[
|
||||
DELETE FROM broker_metrics WHERE gmt_create < #{endTime} LIMIT 1000
|
||||
DELETE FROM broker_metrics WHERE gmt_create < #{endTime} LIMIT #{limitSize}
|
||||
]]>
|
||||
</delete>
|
||||
</mapper>
|
||||
@@ -27,9 +27,9 @@
|
||||
</foreach>
|
||||
</insert>
|
||||
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Date">
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Map">
|
||||
<![CDATA[
|
||||
DELETE FROM cluster_metrics WHERE gmt_create < #{endTime} LIMIT 200
|
||||
DELETE FROM cluster_metrics WHERE gmt_create < #{endTime} LIMIT #{limitSize}
|
||||
]]>
|
||||
</delete>
|
||||
</mapper>
|
||||
@@ -30,9 +30,9 @@
|
||||
]]>
|
||||
</select>
|
||||
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Date">
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Map">
|
||||
<![CDATA[
|
||||
DELETE FROM topic_app_metrics WHERE gmt_create < #{endTime} LIMIT 3000
|
||||
DELETE FROM topic_app_metrics WHERE gmt_create < #{endTime} LIMIT #{limitSize}
|
||||
]]>
|
||||
</delete>
|
||||
</mapper>
|
||||
@@ -37,9 +37,9 @@
|
||||
]]>
|
||||
</select>
|
||||
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Date">
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Map">
|
||||
<![CDATA[
|
||||
DELETE FROM topic_metrics WHERE gmt_create < #{endTime} LIMIT 3000
|
||||
DELETE FROM topic_metrics WHERE gmt_create < #{endTime} LIMIT #{limitSize}
|
||||
]]>
|
||||
</delete>
|
||||
</mapper>
|
||||
@@ -34,15 +34,9 @@
|
||||
ORDER BY gmt_create ASC
|
||||
</select>
|
||||
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Date">
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Map">
|
||||
<![CDATA[
|
||||
DELETE FROM topic_request_time_metrics WHERE gmt_create < #{endTime} LIMIT 2000
|
||||
]]>
|
||||
</delete>
|
||||
|
||||
<delete id="deleteBeforeId" parameterType="java.lang.Long">
|
||||
<![CDATA[
|
||||
DELETE FROM topic_request_time_metrics WHERE id < #{id} LIMIT 20000
|
||||
DELETE FROM topic_request_time_metrics WHERE gmt_create < #{endTime} LIMIT #{limitSize}
|
||||
]]>
|
||||
</delete>
|
||||
|
||||
|
||||
@@ -54,9 +54,9 @@
|
||||
AND gmt_create > #{afterTime}
|
||||
</select>
|
||||
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Date">
|
||||
<delete id="deleteBeforeTime" parameterType="java.util.Map">
|
||||
<![CDATA[
|
||||
DELETE FROM topic_throttled_metrics WHERE gmt_create < #{endTime} LIMIT 3000
|
||||
DELETE FROM topic_throttled_metrics WHERE gmt_create < #{endTime} LIMIT #{limitSize}
|
||||
]]>
|
||||
</delete>
|
||||
</mapper>
|
||||
@@ -1,15 +1,15 @@
|
||||
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.*;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
|
||||
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
|
||||
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
|
||||
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
@@ -22,10 +22,7 @@ import java.util.List;
|
||||
*/
|
||||
@CustomScheduled(name = "deleteMetrics", cron = "0 0/2 * * * ?", threadNum = 1)
|
||||
public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
@Autowired
|
||||
private ConfigUtils configUtils;
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
@Autowired
|
||||
private TopicMetricsDao topicMetricsDao;
|
||||
@@ -45,6 +42,27 @@ public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
|
||||
@Autowired
|
||||
private TopicThrottledMetricsDao topicThrottledMetricsDao;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.delete-limit-size:1000}")
|
||||
private Integer deleteLimitSize;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.cluster-metrics-save-days:14}")
|
||||
private Integer clusterMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.broker-metrics-save-days:14}")
|
||||
private Integer brokerMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.topic-metrics-save-days:7}")
|
||||
private Integer topicMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.topic-request-time-metrics-save-days:7}")
|
||||
private Integer topicRequestTimeMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.topic-throttled-metrics-save-days:7}")
|
||||
private Integer topicThrottledMetricsSaveDays;
|
||||
|
||||
@Value(value = "${task.metrics.delete-metrics.app-topic-metrics-save-days:7}")
|
||||
private Integer appTopicMetricsSaveDays;
|
||||
|
||||
@Override
|
||||
public List<EmptyEntry> listAllTasks() {
|
||||
EmptyEntry emptyEntry = new EmptyEntry();
|
||||
@@ -54,78 +72,73 @@ public class DeleteMetrics extends AbstractScheduledTask<EmptyEntry> {
|
||||
|
||||
@Override
|
||||
public void processTask(EmptyEntry entryEntry) {
|
||||
if (Constant.INVALID_CODE.equals(configUtils.getMaxMetricsSaveDays())) {
|
||||
// 无需数据删除
|
||||
return;
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
LOGGER.info("start delete metrics");
|
||||
try {
|
||||
deleteTopicMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete topic metrics failed.", e);
|
||||
|
||||
// 数据量可能比较大,一次触发多删除几次
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
try {
|
||||
boolean needReDelete = this.deleteCommunityTopicMetrics();
|
||||
if (!needReDelete) {
|
||||
break;
|
||||
}
|
||||
|
||||
// 暂停1000毫秒,避免删除太快导致DB出现问题
|
||||
BackoffUtils.backoff(1000);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete community topic metrics failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
// 数据量可能比较大,一次触发多删除几次
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
try {
|
||||
boolean needReDelete = this.deleteDiDiTopicMetrics();
|
||||
if (!needReDelete) {
|
||||
break;
|
||||
}
|
||||
|
||||
// 暂停1000毫秒,避免删除太快导致DB出现问题
|
||||
BackoffUtils.backoff(1000);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete didi topic metrics failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
deleteTopicAppMetrics();
|
||||
this.deleteClusterBrokerMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete topic app metrics failed.", e);
|
||||
LOGGER.error("delete cluster and broker metrics failed.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
deleteTopicRequestMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete topic request metrics failed.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
deleteThrottledMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete topic throttled metrics failed.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
deleteBrokerMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete broker metrics failed.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
deleteClusterMetrics();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("delete cluster metrics failed.", e);
|
||||
}
|
||||
LOGGER.info("finish delete metrics, costTime:{}ms.", System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
private void deleteTopicMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
topicMetricsDao.deleteBeforeTime(endTime);
|
||||
private boolean deleteCommunityTopicMetrics() {
|
||||
return topicMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize;
|
||||
}
|
||||
|
||||
private void deleteTopicAppMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
topicAppMetricsDao.deleteBeforeTime(endTime);
|
||||
private boolean deleteDiDiTopicMetrics() {
|
||||
boolean needReDelete = false;
|
||||
|
||||
if (topicAppMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.appTopicMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
|
||||
needReDelete = true;
|
||||
}
|
||||
|
||||
if (topicRequestMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicRequestTimeMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
|
||||
needReDelete = true;
|
||||
}
|
||||
|
||||
if (topicThrottledMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicThrottledMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
|
||||
needReDelete = true;
|
||||
}
|
||||
|
||||
return needReDelete;
|
||||
}
|
||||
|
||||
private void deleteTopicRequestMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
topicRequestMetricsDao.deleteBeforeTime(endTime);
|
||||
}
|
||||
private void deleteClusterBrokerMetrics() {
|
||||
brokerMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.brokerMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize);
|
||||
|
||||
private void deleteThrottledMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
topicThrottledMetricsDao.deleteBeforeTime(endTime);
|
||||
}
|
||||
|
||||
private void deleteBrokerMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
brokerMetricsDao.deleteBeforeTime(endTime);
|
||||
}
|
||||
|
||||
private void deleteClusterMetrics() {
|
||||
Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
|
||||
clusterMetricsDao.deleteBeforeTime(endTime);
|
||||
clusterMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.clusterMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize);
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,7 @@ import java.util.*;
|
||||
* @date 20/9/24
|
||||
*/
|
||||
@Component("storeTopicThrottledMetrics2DB")
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics", havingValue = "true", matchIfMissing = true)
|
||||
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics-enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class StoreTopicThrottledMetrics2DB implements ApplicationListener<TopicThrottledMetricsCollectedEvent> {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
|
||||
@@ -30,25 +30,31 @@ logging:
|
||||
|
||||
custom:
|
||||
idc: cn
|
||||
jmx:
|
||||
max-conn: 10 # 2.3版本配置不在这个地方生效
|
||||
store-metrics-task:
|
||||
community:
|
||||
broker-metrics-enabled: true
|
||||
topic-metrics-enabled: true
|
||||
didi:
|
||||
didi: # 滴滴Kafka特有的指标
|
||||
app-topic-metrics-enabled: false
|
||||
topic-request-time-metrics-enabled: false
|
||||
topic-throttled-metrics: false
|
||||
save-days: 7
|
||||
topic-throttled-metrics-enabled: false
|
||||
|
||||
# 任务相关的开关
|
||||
# 任务相关的配置
|
||||
task:
|
||||
op:
|
||||
sync-topic-enabled: false # 未落盘的Topic定期同步到DB中
|
||||
order-auto-exec: # 工单自动化审批线程的开关
|
||||
topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启
|
||||
app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启
|
||||
metrics:
|
||||
delete-metrics:
|
||||
delete-limit-size: 1000
|
||||
cluster-metrics-save-days: 14 # 集群指标保存天数
|
||||
broker-metrics-save-days: 14 # Broker指标保存天数
|
||||
topic-metrics-save-days: 7 # Topic指标保存天数
|
||||
topic-request-time-metrics-save-days: 7 # Topic请求耗时指标保存天数
|
||||
topic-throttled-metrics-save-days: 7 # Topic限流指标保存天数
|
||||
app-topic-metrics-save-days: 7 # App+Topic指标保存天数
|
||||
|
||||
account:
|
||||
ldap:
|
||||
|
||||
Reference in New Issue
Block a user