mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
Task任务分为metrics,common,metaddata三类,每一类任务的执行对应一个线程池,减少对Job模块线程池的依赖
This commit is contained in:
@@ -60,9 +60,15 @@ thread-pool:
|
||||
suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改
|
||||
|
||||
task: # 任务模块的配置
|
||||
heaven: # 采集任务配置
|
||||
thread-num: 20 # 采集任务线程池核心线程数
|
||||
queue-size: 1000 # 采集任务线程池队列大小
|
||||
metrics: # metrics采集任务配置
|
||||
thread-num: 18 # metrics采集任务线程池核心线程数
|
||||
queue-size: 180 # metrics采集任务线程池队列大小
|
||||
metadata: # metadata同步任务配置
|
||||
thread-num: 27 # metadata同步任务线程池核心线程数
|
||||
queue-size: 270 # metadata同步任务线程池队列大小
|
||||
common: # 剩余其他任务配置
|
||||
thread-num: 15 # 剩余其他任务线程池核心线程数
|
||||
queue-size: 150 # 剩余其他任务线程池队列大小
|
||||
|
||||
|
||||
# 客户端池大小相关配置
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.xiaojukeji.know.streaming.km.task;
|
||||
|
||||
import com.didiglobal.logi.job.common.TaskResult;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* other相关任务
|
||||
*/
|
||||
public abstract class AbstractAsyncCommonDispatchTask extends AbstractClusterPhyDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(AbstractAsyncCommonDispatchTask.class);
|
||||
|
||||
public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception;
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Override
|
||||
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs);
|
||||
}
|
||||
|
||||
public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
taskThreadPoolService.submitCommonTask(
|
||||
String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
this.timeoutUnitSec.intValue() * 1000,
|
||||
() -> {
|
||||
try {
|
||||
TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs);
|
||||
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
|
||||
log.error("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr);
|
||||
} else {
|
||||
log.debug("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("class=AbstractAsyncCommonDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
}
|
||||
@@ -130,4 +130,12 @@ public abstract class AbstractDispatchTask<E extends Comparable & EntifyIdInterf
|
||||
}
|
||||
return allTaskList.subList(idx * count, Math.min(idx * count + count, allTaskList.size()));
|
||||
}
|
||||
|
||||
public String getTaskName() {
|
||||
return taskName;
|
||||
}
|
||||
|
||||
public Long getTimeoutUnitSec() {
|
||||
return timeoutUnitSec;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
@Task(name = "CheckJmxClientTask",
|
||||
description = "检查Jmx客户端,",
|
||||
description = "检查Jmx客户端",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
timeout = 2 * 60,
|
||||
|
||||
@@ -15,8 +15,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimension
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -31,12 +30,9 @@ import java.util.*;
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
|
||||
public class HealthCheckTask extends AbstractAsyncMetricsDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(HealthCheckTask.class);
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Autowired
|
||||
private HealthCheckResultService healthCheckResultService;
|
||||
|
||||
@@ -45,17 +41,11 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
|
||||
);
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
taskThreadPoolService.submitHeavenTask(
|
||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
100000,
|
||||
() -> this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs)
|
||||
);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
return this.calAndUpdateHealthCheckResult(clusterPhy, triggerTimeUnitMs);
|
||||
}
|
||||
|
||||
private void calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
private TaskResult calAndUpdateHealthCheckResult(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
// 获取配置,<配置名,配置信息>
|
||||
Map<String, BaseClusterHealthConfig> healthConfigMap = healthCheckResultService.getClusterHealthConfig(clusterPhy.getId());
|
||||
|
||||
@@ -91,6 +81,8 @@ public class HealthCheckTask extends AbstractClusterPhyDispatchTask {
|
||||
} catch (Exception e) {
|
||||
log.error("method=processSubTask||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
|
||||
}
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
private List<HealthCheckResult> getNoResResult(Long clusterPhyId, AbstractHealthCheckService healthCheckService, Map<String, BaseClusterHealthConfig> healthConfigMap) {
|
||||
|
||||
@@ -8,7 +8,7 @@ import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractAsyncCommonDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
@Task(name = "CommunityReassignJobTask",
|
||||
@@ -17,14 +17,14 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 6 * 60)
|
||||
public class CommunityReassignJobTask extends AbstractClusterPhyDispatchTask {
|
||||
public class CommunityReassignJobTask extends AbstractAsyncCommonDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(CommunityReassignJobTask.class);
|
||||
|
||||
@Autowired
|
||||
private ReassignJobService reassignJobService;
|
||||
|
||||
@Override
|
||||
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
// 获取迁移中的任务
|
||||
Long jobId = reassignJobService.getOneRunningJobId(clusterPhy.getId());
|
||||
if (jobId == null) {
|
||||
|
||||
@@ -5,7 +5,7 @@ import com.didiglobal.logi.job.common.TaskResult;
|
||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.job.JobService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractAsyncCommonDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
@Task(name = "kmJobTask",
|
||||
@@ -14,13 +14,13 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 6 * 60)
|
||||
public class KMJobTask extends AbstractClusterPhyDispatchTask {
|
||||
public class KMJobTask extends AbstractAsyncCommonDispatchTask {
|
||||
|
||||
@Autowired
|
||||
private JobService jobService;
|
||||
|
||||
@Override
|
||||
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
jobService.scheduleJobByClusterId(clusterPhy.getId());
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.xiaojukeji.know.streaming.km.task.metadata;
|
||||
|
||||
import com.didiglobal.logi.job.common.TaskResult;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* 元数据同步相关任务
|
||||
*/
|
||||
public abstract class AbstractAsyncMetadataDispatchTask extends AbstractClusterPhyDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(AbstractAsyncMetadataDispatchTask.class);
|
||||
|
||||
public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception;
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Override
|
||||
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs);
|
||||
}
|
||||
|
||||
public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
taskThreadPoolService.submitMetadataTask(
|
||||
String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
this.timeoutUnitSec.intValue() * 1000,
|
||||
() -> {
|
||||
try {
|
||||
TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs);
|
||||
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
|
||||
log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr);
|
||||
} else {
|
||||
log.debug("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("class=AbstractAsyncMetadataDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerConfigPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.config.ConfigDiffTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.*;
|
||||
@@ -25,12 +24,12 @@ import java.util.stream.Collectors;
|
||||
* @date 22/02/25
|
||||
*/
|
||||
@Task(name = "SyncBrokerConfigDiffTask",
|
||||
description = "Broker配置的Diff信息同步到DB,",
|
||||
description = "Broker配置的Diff信息同步到DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 6 * 60)
|
||||
public class SyncBrokerConfigDiffTask extends AbstractClusterPhyDispatchTask {
|
||||
public class SyncBrokerConfigDiffTask extends AbstractAsyncMetadataDispatchTask {
|
||||
protected static final ILog log = LogFactory.getLog(SyncBrokerConfigDiffTask.class);
|
||||
|
||||
@Autowired
|
||||
@@ -40,7 +39,7 @@ public class SyncBrokerConfigDiffTask extends AbstractClusterPhyDispatchTask {
|
||||
private BrokerConfigService brokerConfigService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
// <configName, <BrokerId, ConfigValue>>
|
||||
Map<String, Map<Integer, String>> allConfigMap = new HashMap<>();
|
||||
|
||||
|
||||
@@ -9,25 +9,24 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Task(name = "SyncBrokerTask",
|
||||
description = "Broker信息同步到DB,",
|
||||
description = "Broker信息同步到DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class SyncBrokerTask extends AbstractClusterPhyDispatchTask {
|
||||
public class SyncBrokerTask extends AbstractAsyncMetadataDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(SyncBrokerTask.class);
|
||||
|
||||
@Autowired
|
||||
private BrokerService brokerService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
Result<List<Broker>> brokersResult = brokerService.listBrokersFromKafka(clusterPhy);
|
||||
if (brokersResult.failed()) {
|
||||
return new TaskResult(TaskResult.FAIL_CODE, brokersResult.getMessage());
|
||||
|
||||
@@ -9,24 +9,23 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
|
||||
@Task(name = "SyncControllerTask",
|
||||
description = "Controller信息同步到DB,",
|
||||
description = "Controller信息同步到DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class SyncControllerTask extends AbstractClusterPhyDispatchTask {
|
||||
public class SyncControllerTask extends AbstractAsyncMetadataDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(SyncControllerTask.class);
|
||||
|
||||
@Autowired
|
||||
private KafkaControllerService kafkaControllerService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
Result<KafkaController> controllerResult = kafkaControllerService.getControllerFromKafka(clusterPhy);
|
||||
if (controllerResult.failed()) {
|
||||
return new TaskResult(TaskResult.FAIL_CODE, controllerResult.getMessage());
|
||||
|
||||
@@ -11,7 +11,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import org.apache.kafka.common.acl.AclBinding;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
@@ -19,12 +18,12 @@ import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Task(name = "SyncKafkaAclTask",
|
||||
description = "KafkaAcl信息同步到DB,",
|
||||
description = "KafkaAcl信息同步到DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class SyncKafkaAclTask extends AbstractClusterPhyDispatchTask {
|
||||
public class SyncKafkaAclTask extends AbstractAsyncMetadataDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(SyncKafkaAclTask.class);
|
||||
|
||||
@Autowired
|
||||
@@ -34,7 +33,7 @@ public class SyncKafkaAclTask extends AbstractClusterPhyDispatchTask {
|
||||
private OpKafkaAclService opKafkaAclService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
Result<List<AclBinding>> aclBindingListResult = kafkaAclService.getAclFromKafka(clusterPhy.getId());
|
||||
if (aclBindingListResult.failed()) {
|
||||
return TaskResult.FAIL;
|
||||
|
||||
@@ -12,7 +12,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import org.apache.kafka.clients.admin.*;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -22,19 +21,19 @@ import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Task(name = "SyncKafkaGroupTask",
|
||||
description = "KafkaGroup信息同步到DB,",
|
||||
description = "KafkaGroup信息同步到DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class SyncKafkaGroupTask extends AbstractClusterPhyDispatchTask {
|
||||
public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(SyncKafkaGroupTask.class);
|
||||
|
||||
@Autowired
|
||||
private GroupService groupService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
TaskResult tr = TaskResult.SUCCESS;
|
||||
|
||||
List<String> groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId());
|
||||
|
||||
@@ -9,26 +9,25 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkauser.KafkaUser;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkauser.KafkaUserService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Task(name = "SyncKafkaUserTask",
|
||||
description = "KafkaUser信息同步到DB,",
|
||||
description = "KafkaUser信息同步到DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class SyncKafkaUserTask extends AbstractClusterPhyDispatchTask {
|
||||
public class SyncKafkaUserTask extends AbstractAsyncMetadataDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(SyncKafkaUserTask.class);
|
||||
|
||||
@Autowired
|
||||
private KafkaUserService kafkaUserService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
// 查询KafkaUser数据
|
||||
Result<List<KafkaUser>> kafkaUserResult = kafkaUserService.getKafkaUserFromKafka(clusterPhy.getId());
|
||||
if (kafkaUserResult.failed()) {
|
||||
|
||||
@@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.partition.PartitionPO;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@@ -19,19 +18,19 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Task(name = "SyncPartitionTask",
|
||||
description = "Partition信息同步到DB,",
|
||||
description = "Partition信息同步到DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class SyncPartitionTask extends AbstractClusterPhyDispatchTask {
|
||||
public class SyncPartitionTask extends AbstractAsyncMetadataDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(SyncPartitionTask.class);
|
||||
|
||||
@Autowired
|
||||
private PartitionService partitionService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
Result<Map<String, List<Partition>>> partitionsResult = partitionService.listPartitionsFromKafka(clusterPhy);
|
||||
if (partitionsResult.failed()) {
|
||||
return new TaskResult(TaskResult.FAIL_CODE, partitionsResult.getMessage());
|
||||
|
||||
@@ -12,7 +12,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@@ -27,12 +26,12 @@ import java.util.Map;
|
||||
* @date 22/02/25
|
||||
*/
|
||||
@Task(name = "SyncTopicConfigTask",
|
||||
description = "Topic保存时间配置同步DB,",
|
||||
description = "Topic保存时间配置同步DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class SyncTopicConfigTask extends AbstractClusterPhyDispatchTask {
|
||||
public class SyncTopicConfigTask extends AbstractAsyncMetadataDispatchTask {
|
||||
protected static final ILog log = LogFactory.getLog(SyncTopicConfigTask.class);
|
||||
|
||||
@Autowired
|
||||
@@ -42,7 +41,7 @@ public class SyncTopicConfigTask extends AbstractClusterPhyDispatchTask {
|
||||
private TopicConfigService kafkaConfigService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
boolean success = true;
|
||||
|
||||
List<TopicConfig> topicConfigList = new ArrayList<>();
|
||||
|
||||
@@ -9,25 +9,24 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Task(name = "SyncTopicTask",
|
||||
description = "Topic信息同步到DB,",
|
||||
description = "Topic信息同步到DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class SyncTopicTask extends AbstractClusterPhyDispatchTask {
|
||||
public class SyncTopicTask extends AbstractAsyncMetadataDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(SyncTopicTask.class);
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
Result<List<Topic>> topicsResult = topicService.listTopicsFromKafka(clusterPhy);
|
||||
if (topicsResult.failed()) {
|
||||
return new TaskResult(TaskResult.FAIL_CODE, topicsResult.getMessage());
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.xiaojukeji.know.streaming.km.task.metrics;
|
||||
|
||||
import com.didiglobal.logi.job.common.TaskResult;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* 指标采集相关任务
|
||||
*/
|
||||
public abstract class AbstractAsyncMetricsDispatchTask extends AbstractClusterPhyDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(AbstractAsyncMetricsDispatchTask.class);
|
||||
|
||||
public abstract TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception;
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Override
|
||||
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
return this.asyncProcessSubTask(clusterPhy, triggerTimeUnitMs);
|
||||
}
|
||||
|
||||
public TaskResult asyncProcessSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
taskThreadPoolService.submitMetricsTask(
|
||||
String.format("taskName=%s||clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
this.timeoutUnitSec.intValue() * 1000,
|
||||
() -> {
|
||||
try {
|
||||
TaskResult tr = this.processClusterTask(clusterPhy, triggerTimeUnitMs);
|
||||
if (TaskResult.SUCCESS_CODE != tr.getCode()) {
|
||||
log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||clusterPhyId={}||taskResult={}||msg=failed", this.taskName, clusterPhy.getId(), tr);
|
||||
} else {
|
||||
log.debug("class=AbstractAsyncMetricsDispatchTask||taskName={}||clusterPhyId={}||msg=success", this.taskName, clusterPhy.getId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("class=AbstractAsyncMetricsDispatchTask||taskName={}||clusterPhyId={}||errMsg=exception", this.taskName, clusterPhy.getId(), e);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
}
|
||||
@@ -7,35 +7,26 @@ import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.collector.metric.BrokerMetricCollector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Task(name = "BrokerMetricCollectorTask",
|
||||
description = "Broker指标采集任务,",
|
||||
description = "Broker指标采集任务",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class BrokerMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
||||
public class BrokerMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(BrokerMetricCollectorTask.class);
|
||||
|
||||
@Autowired
|
||||
private BrokerMetricCollector brokerMetricCollector;
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
taskThreadPoolService.submitHeavenTask(
|
||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
100000,
|
||||
() -> brokerMetricCollector.collectMetrics(clusterPhy)
|
||||
);
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
brokerMetricCollector.collectMetrics(clusterPhy);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
@@ -7,35 +7,26 @@ import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.collector.metric.ClusterMetricCollector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Task(name = "ClusterMetricCollectorTask",
|
||||
description = "Cluster指标采集任务,",
|
||||
description = "Cluster指标采集任务",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class ClusterMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
||||
public class ClusterMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(ClusterMetricCollectorTask.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterMetricCollector clusterMetricCollector;
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
taskThreadPoolService.submitHeavenTask(
|
||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
100000,
|
||||
() -> clusterMetricCollector.collectMetrics(clusterPhy)
|
||||
);
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
clusterMetricCollector.collectMetrics(clusterPhy);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
@@ -7,35 +7,26 @@ import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.collector.metric.GroupMetricCollector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Task(name = "GroupMetricCollectorTask",
|
||||
description = "Group指标采集任务,",
|
||||
description = "Group指标采集任务",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class GroupMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
||||
public class GroupMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(GroupMetricCollectorTask.class);
|
||||
|
||||
@Autowired
|
||||
private GroupMetricCollector groupMetricCollector;
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
taskThreadPoolService.submitHeavenTask(
|
||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
100000,
|
||||
() -> groupMetricCollector.collectMetrics(clusterPhy)
|
||||
);
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
groupMetricCollector.collectMetrics(clusterPhy);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
@@ -5,34 +5,25 @@ import com.didiglobal.logi.job.common.TaskResult;
|
||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||
import com.xiaojukeji.know.streaming.km.collector.metric.PartitionMetricCollector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Task(name = "PartitionMetricCollectorTask",
|
||||
description = "Partition指标采集任务,",
|
||||
description = "Partition指标采集任务",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class PartitionMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
||||
public class PartitionMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||
|
||||
@Autowired
|
||||
private PartitionMetricCollector partitionMetricCollector;
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
taskThreadPoolService.submitHeavenTask(
|
||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
100000,
|
||||
() -> partitionMetricCollector.collectMetrics(clusterPhy)
|
||||
);
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
partitionMetricCollector.collectMetrics(clusterPhy);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
@@ -5,8 +5,6 @@ import com.didiglobal.logi.job.common.TaskResult;
|
||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||
import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
@@ -15,26 +13,19 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
*/
|
||||
@Slf4j
|
||||
@Task(name = "ReplicaMetricCollectorTask",
|
||||
description = "Replica指标采集任务,",
|
||||
description = "Replica指标采集任务",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class ReplicaMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
||||
public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||
|
||||
@Autowired
|
||||
private ReplicaMetricCollector replicaMetricCollector;
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Override
|
||||
protected TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
taskThreadPoolService.submitHeavenTask(
|
||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
100000,
|
||||
() -> replicaMetricCollector.collectMetrics(clusterPhy)
|
||||
);
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
replicaMetricCollector.collectMetrics(clusterPhy);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
@@ -7,8 +7,6 @@ import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.collector.metric.TopicMetricCollector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.task.AbstractClusterPhyDispatchTask;
|
||||
import com.xiaojukeji.know.streaming.km.task.service.TaskThreadPoolService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
@@ -20,22 +18,15 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class TopicMetricCollectorTask extends AbstractClusterPhyDispatchTask {
|
||||
public class TopicMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(TopicMetricCollectorTask.class);
|
||||
|
||||
@Autowired
|
||||
private TopicMetricCollector topicMetricCollector;
|
||||
|
||||
@Autowired
|
||||
private TaskThreadPoolService taskThreadPoolService;
|
||||
|
||||
@Override
|
||||
public TaskResult processSubTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
taskThreadPoolService.submitHeavenTask(
|
||||
String.format("TaskName=%s clusterPhyId=%d", this.taskName, clusterPhy.getId()),
|
||||
100000,
|
||||
() -> topicMetricCollector.collectMetrics(clusterPhy)
|
||||
);
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
topicMetricCollector.collectMetrics(clusterPhy);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
|
||||
@@ -16,23 +16,72 @@ import javax.annotation.PostConstruct;
|
||||
@NoArgsConstructor
|
||||
public class TaskThreadPoolService {
|
||||
/**
|
||||
* 较重任务,比如指标采集
|
||||
* metrics任务,比如指标采集
|
||||
*/
|
||||
private FutureNoWaitUtil<Object> heavenTaskThreadPool;
|
||||
private FutureNoWaitUtil<Object> metricsTaskThreadPool;
|
||||
|
||||
@Value(value = "${thread-pool.task.metrics.thread-num:18}")
|
||||
private Integer metricsTaskThreadNum;
|
||||
|
||||
@Value(value = "${thread-pool.task.metrics.queue-size:180}")
|
||||
private Integer metricsTaskQueueSize;
|
||||
|
||||
|
||||
@Value(value = "${thread-pool.task.heaven.thread-num:12}")
|
||||
private Integer heavenTaskThreadNum;
|
||||
/**
|
||||
* metadata任务
|
||||
*/
|
||||
private FutureNoWaitUtil<Object> metadataTaskThreadPool;
|
||||
|
||||
@Value(value = "${thread-pool.task.heaven.queue-size:1000}")
|
||||
private Integer heavenTaskQueueSize;
|
||||
@Value(value = "${thread-pool.task.metadata.thread-num:27}")
|
||||
private Integer metadataTaskThreadNum;
|
||||
|
||||
@Value(value = "${thread-pool.task.metadata.queue-size:270}")
|
||||
private Integer metadataTaskQueueSize;
|
||||
|
||||
/**
|
||||
* common任务
|
||||
*/
|
||||
private FutureNoWaitUtil<Object> commonTaskThreadPool;
|
||||
|
||||
@Value(value = "${thread-pool.task.common.thread-num:15}")
|
||||
private Integer commonTaskThreadNum;
|
||||
|
||||
@Value(value = "${thread-pool.task.common.queue-size:150}")
|
||||
private Integer commonTaskQueueSize;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
heavenTaskThreadPool = FutureNoWaitUtil.init("heavenTaskThreadPool", heavenTaskThreadNum, heavenTaskThreadNum, heavenTaskQueueSize);
|
||||
metricsTaskThreadPool = FutureNoWaitUtil.init(
|
||||
"metricsTaskThreadPool",
|
||||
metricsTaskThreadNum,
|
||||
metricsTaskThreadNum,
|
||||
metricsTaskQueueSize
|
||||
);
|
||||
|
||||
metadataTaskThreadPool = FutureNoWaitUtil.init(
|
||||
"metadataTaskThreadPool",
|
||||
metadataTaskThreadNum,
|
||||
metadataTaskThreadNum,
|
||||
metadataTaskQueueSize
|
||||
);
|
||||
|
||||
commonTaskThreadPool = FutureNoWaitUtil.init(
|
||||
"commonTaskThreadPool",
|
||||
commonTaskThreadNum,
|
||||
commonTaskThreadNum,
|
||||
commonTaskQueueSize
|
||||
);
|
||||
}
|
||||
|
||||
public void submitHeavenTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
|
||||
heavenTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable);
|
||||
public void submitMetricsTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
|
||||
metricsTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable);
|
||||
}
|
||||
|
||||
public void submitMetadataTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
|
||||
metadataTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable);
|
||||
}
|
||||
|
||||
public void submitCommonTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
|
||||
commonTaskThreadPool.runnableTask(taskName, timeoutUnisMs, runnable);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user