mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Optimize]优化线程池的名称(#789)
This commit is contained in:
@@ -237,7 +237,7 @@ public class CollectThreadPoolService {
|
|||||||
private synchronized FutureWaitUtil<Void> closeOldAndCreateNew(Long shardId) {
|
private synchronized FutureWaitUtil<Void> closeOldAndCreateNew(Long shardId) {
|
||||||
// 新的
|
// 新的
|
||||||
FutureWaitUtil<Void> newFutureUtil = FutureWaitUtil.init(
|
FutureWaitUtil<Void> newFutureUtil = FutureWaitUtil.init(
|
||||||
"CollectorMetricsFutureUtil-Shard-" + shardId,
|
"MetricCollect-Shard-" + shardId,
|
||||||
this.futureUtilThreadNum,
|
this.futureUtilThreadNum,
|
||||||
this.futureUtilThreadNum,
|
this.futureUtilThreadNum,
|
||||||
this.futureUtilQueueSize
|
this.futureUtilQueueSize
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ public class FutureNoWaitUtil<T> {
|
|||||||
private FutureNoWaitUtil() {
|
private FutureNoWaitUtil() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> FutureNoWaitUtil<T> init(String name, int corePoolSize, int maxPoolSize, int queueSize) {
|
public static <T> FutureNoWaitUtil<T> init(String threadPoolName, int corePoolSize, int maxPoolSize, int queueSize) {
|
||||||
FutureNoWaitUtil<T> futureUtil = new FutureNoWaitUtil<>();
|
FutureNoWaitUtil<T> futureUtil = new FutureNoWaitUtil<>();
|
||||||
|
|
||||||
// 创建任务线程池
|
// 创建任务线程池
|
||||||
@@ -33,7 +33,7 @@ public class FutureNoWaitUtil<T> {
|
|||||||
300,
|
300,
|
||||||
TimeUnit.SECONDS,
|
TimeUnit.SECONDS,
|
||||||
new LinkedBlockingDeque<>(queueSize),
|
new LinkedBlockingDeque<>(queueSize),
|
||||||
new NamedThreadFactory("KS-KM-FutureNoWaitUtil-" + name),
|
new NamedThreadFactory(threadPoolName),
|
||||||
new ThreadPoolExecutor.DiscardOldestPolicy() //对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
|
new ThreadPoolExecutor.DiscardOldestPolicy() //对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
|
||||||
);
|
);
|
||||||
futureUtil.executor.allowCoreThreadTimeOut(true);
|
futureUtil.executor.allowCoreThreadTimeOut(true);
|
||||||
@@ -41,7 +41,7 @@ public class FutureNoWaitUtil<T> {
|
|||||||
futureUtil.delayQueueData = new DelayQueue<>();
|
futureUtil.delayQueueData = new DelayQueue<>();
|
||||||
|
|
||||||
// 创建检查延迟队列的线程并启动
|
// 创建检查延迟队列的线程并启动
|
||||||
futureUtil.checkDelayQueueThread = new Thread(() -> futureUtil.runCheck(), "KS-KM-FutureNoWaitUtil-CheckDelayQueueData-" + name);
|
futureUtil.checkDelayQueueThread = new Thread(() -> futureUtil.runCheck(), threadPoolName + "-CheckTaskTimeout");
|
||||||
futureUtil.checkDelayQueueThread.setDaemon(true);
|
futureUtil.checkDelayQueueThread.setDaemon(true);
|
||||||
futureUtil.checkDelayQueueThread.start();
|
futureUtil.checkDelayQueueThread.start();
|
||||||
|
|
||||||
|
|||||||
@@ -9,12 +9,12 @@ import java.util.concurrent.*;
|
|||||||
public class FutureUtil<T> {
|
public class FutureUtil<T> {
|
||||||
private ThreadPoolExecutor executor;
|
private ThreadPoolExecutor executor;
|
||||||
|
|
||||||
public static final FutureUtil<Void> quickStartupFutureUtil = FutureUtil.init("QuickStartupFutureUtil", 8, 8, 10240);
|
public static final FutureUtil<Void> quickStartupFutureUtil = FutureUtil.init("QuickStartupTP", 8, 8, 10240);
|
||||||
|
|
||||||
private FutureUtil() {
|
private FutureUtil() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> FutureUtil<T> init(String name, int corePoolSize, int maxPoolSize, int queueSize) {
|
public static <T> FutureUtil<T> init(String threadPoolName, int corePoolSize, int maxPoolSize, int queueSize) {
|
||||||
FutureUtil<T> futureUtil = new FutureUtil<>();
|
FutureUtil<T> futureUtil = new FutureUtil<>();
|
||||||
|
|
||||||
futureUtil.executor = new ThreadPoolExecutor(
|
futureUtil.executor = new ThreadPoolExecutor(
|
||||||
@@ -23,7 +23,7 @@ public class FutureUtil<T> {
|
|||||||
300,
|
300,
|
||||||
TimeUnit.SECONDS,
|
TimeUnit.SECONDS,
|
||||||
new LinkedBlockingDeque<>(queueSize),
|
new LinkedBlockingDeque<>(queueSize),
|
||||||
new NamedThreadFactory("FutureUtil-" + name),
|
new NamedThreadFactory(threadPoolName),
|
||||||
new ThreadPoolExecutor.DiscardOldestPolicy() //对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
|
new ThreadPoolExecutor.DiscardOldestPolicy() //对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
|
||||||
);
|
);
|
||||||
futureUtil.executor.allowCoreThreadTimeOut(true);
|
futureUtil.executor.allowCoreThreadTimeOut(true);
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ public class FutureWaitUtil<T> {
|
|||||||
private FutureWaitUtil() {
|
private FutureWaitUtil() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> FutureWaitUtil<T> init(String name, int corePoolSize, int maxPoolSize, int queueSize) {
|
public static <T> FutureWaitUtil<T> init(String threadPoolName, int corePoolSize, int maxPoolSize, int queueSize) {
|
||||||
FutureWaitUtil<T> futureUtil = new FutureWaitUtil<>();
|
FutureWaitUtil<T> futureUtil = new FutureWaitUtil<>();
|
||||||
|
|
||||||
futureUtil.executor = new ThreadPoolExecutor(
|
futureUtil.executor = new ThreadPoolExecutor(
|
||||||
@@ -32,7 +32,7 @@ public class FutureWaitUtil<T> {
|
|||||||
300,
|
300,
|
||||||
TimeUnit.SECONDS,
|
TimeUnit.SECONDS,
|
||||||
new LinkedBlockingDeque<>(queueSize),
|
new LinkedBlockingDeque<>(queueSize),
|
||||||
new NamedThreadFactory("FutureWaitUtil-" + name),
|
new NamedThreadFactory(threadPoolName),
|
||||||
new ThreadPoolExecutor.DiscardOldestPolicy() //对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
|
new ThreadPoolExecutor.DiscardOldestPolicy() //对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
|
||||||
);
|
);
|
||||||
futureUtil.executor.allowCoreThreadTimeOut(true);
|
futureUtil.executor.allowCoreThreadTimeOut(true);
|
||||||
|
|||||||
@@ -52,21 +52,21 @@ public class TaskThreadPoolService {
|
|||||||
@PostConstruct
|
@PostConstruct
|
||||||
private void init() {
|
private void init() {
|
||||||
metricsTaskThreadPool = FutureNoWaitUtil.init(
|
metricsTaskThreadPool = FutureNoWaitUtil.init(
|
||||||
"metricsTaskThreadPool",
|
"MetricsTaskTP",
|
||||||
metricsTaskThreadNum,
|
metricsTaskThreadNum,
|
||||||
metricsTaskThreadNum,
|
metricsTaskThreadNum,
|
||||||
metricsTaskQueueSize
|
metricsTaskQueueSize
|
||||||
);
|
);
|
||||||
|
|
||||||
metadataTaskThreadPool = FutureNoWaitUtil.init(
|
metadataTaskThreadPool = FutureNoWaitUtil.init(
|
||||||
"metadataTaskThreadPool",
|
"MetadataTaskTP",
|
||||||
metadataTaskThreadNum,
|
metadataTaskThreadNum,
|
||||||
metadataTaskThreadNum,
|
metadataTaskThreadNum,
|
||||||
metadataTaskQueueSize
|
metadataTaskQueueSize
|
||||||
);
|
);
|
||||||
|
|
||||||
commonTaskThreadPool = FutureNoWaitUtil.init(
|
commonTaskThreadPool = FutureNoWaitUtil.init(
|
||||||
"commonTaskThreadPool",
|
"CommonTaskTP",
|
||||||
commonTaskThreadNum,
|
commonTaskThreadNum,
|
||||||
commonTaskThreadNum,
|
commonTaskThreadNum,
|
||||||
commonTaskQueueSize
|
commonTaskQueueSize
|
||||||
|
|||||||
Reference in New Issue
Block a user