mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
[Optimize]错开采集任务触发时间,降低Offset信息获取时超时情况的发生(#726)
当前指标采集任务都是整分钟触发执行的,导致会同时向Kafka请求分区Offset信息,会导致: 1、请求过多,从而出现超时; 2、同时进行,可能会导致分区重复获取Offset信息; 因此将其错开。
This commit is contained in:
@@ -3,8 +3,6 @@ package com.xiaojukeji.know.streaming.km.task.kafka.metrics;
|
|||||||
import com.didiglobal.logi.job.annotation.Task;
|
import com.didiglobal.logi.job.annotation.Task;
|
||||||
import com.didiglobal.logi.job.common.TaskResult;
|
import com.didiglobal.logi.job.common.TaskResult;
|
||||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||||
import com.didiglobal.logi.log.ILog;
|
|
||||||
import com.didiglobal.logi.log.LogFactory;
|
|
||||||
import com.xiaojukeji.know.streaming.km.collector.metric.kafka.BrokerMetricCollector;
|
import com.xiaojukeji.know.streaming.km.collector.metric.kafka.BrokerMetricCollector;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -14,13 +12,11 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
*/
|
*/
|
||||||
@Task(name = "BrokerMetricCollectorTask",
|
@Task(name = "BrokerMetricCollectorTask",
|
||||||
description = "Broker指标采集任务",
|
description = "Broker指标采集任务",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "20 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class BrokerMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
public class BrokerMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(BrokerMetricCollectorTask.class);
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private BrokerMetricCollector brokerMetricCollector;
|
private BrokerMetricCollector brokerMetricCollector;
|
||||||
|
|
||||||
|
|||||||
@@ -3,8 +3,6 @@ package com.xiaojukeji.know.streaming.km.task.kafka.metrics;
|
|||||||
import com.didiglobal.logi.job.annotation.Task;
|
import com.didiglobal.logi.job.annotation.Task;
|
||||||
import com.didiglobal.logi.job.common.TaskResult;
|
import com.didiglobal.logi.job.common.TaskResult;
|
||||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||||
import com.didiglobal.logi.log.ILog;
|
|
||||||
import com.didiglobal.logi.log.LogFactory;
|
|
||||||
import com.xiaojukeji.know.streaming.km.collector.metric.kafka.ClusterMetricCollector;
|
import com.xiaojukeji.know.streaming.km.collector.metric.kafka.ClusterMetricCollector;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -14,13 +12,11 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
*/
|
*/
|
||||||
@Task(name = "ClusterMetricCollectorTask",
|
@Task(name = "ClusterMetricCollectorTask",
|
||||||
description = "Cluster指标采集任务",
|
description = "Cluster指标采集任务",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "30 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class ClusterMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
public class ClusterMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(ClusterMetricCollectorTask.class);
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ClusterMetricCollector clusterMetricCollector;
|
private ClusterMetricCollector clusterMetricCollector;
|
||||||
|
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
*/
|
*/
|
||||||
@Task(name = "GroupMetricCollectorTask",
|
@Task(name = "GroupMetricCollectorTask",
|
||||||
description = "Group指标采集任务",
|
description = "Group指标采集任务",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "40 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
|
|||||||
@@ -3,8 +3,6 @@ package com.xiaojukeji.know.streaming.km.task.kafka.metrics;
|
|||||||
import com.didiglobal.logi.job.annotation.Task;
|
import com.didiglobal.logi.job.annotation.Task;
|
||||||
import com.didiglobal.logi.job.common.TaskResult;
|
import com.didiglobal.logi.job.common.TaskResult;
|
||||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||||
import com.didiglobal.logi.log.ILog;
|
|
||||||
import com.didiglobal.logi.log.LogFactory;
|
|
||||||
import com.xiaojukeji.know.streaming.km.collector.metric.kafka.TopicMetricCollector;
|
import com.xiaojukeji.know.streaming.km.collector.metric.kafka.TopicMetricCollector;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -14,13 +12,11 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
*/
|
*/
|
||||||
@Task(name = "TopicMetricCollectorTask",
|
@Task(name = "TopicMetricCollectorTask",
|
||||||
description = "Topic指标采集任务",
|
description = "Topic指标采集任务",
|
||||||
cron = "0 0/1 * * * ? *",
|
cron = "10 0/1 * * * ? *",
|
||||||
autoRegister = true,
|
autoRegister = true,
|
||||||
consensual = ConsensualEnum.BROADCAST,
|
consensual = ConsensualEnum.BROADCAST,
|
||||||
timeout = 2 * 60)
|
timeout = 2 * 60)
|
||||||
public class TopicMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
public class TopicMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||||
private static final ILog log = LogFactory.getLog(TopicMetricCollectorTask.class);
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TopicMetricCollector topicMetricCollector;
|
private TopicMetricCollector topicMetricCollector;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user