diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/FutureTaskDelayQueueData.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/FutureTaskDelayQueueData.java new file mode 100644 index 00000000..0a96d3fe --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/FutureTaskDelayQueueData.java @@ -0,0 +1,40 @@ +package com.xiaojukeji.kafka.manager.common.entity.ao.common; + +import lombok.Getter; + +import java.util.concurrent.Delayed; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +@Getter +public class FutureTaskDelayQueueData implements Delayed { + private final String taskName; + + private final Future futureTask; + + private final long timeoutTimeUnitMs; + + private final long createTimeUnitMs; + + public FutureTaskDelayQueueData(String taskName, Future futureTask, long timeoutTimeUnitMs) { + this.taskName = taskName; + this.futureTask = futureTask; + this.timeoutTimeUnitMs = timeoutTimeUnitMs; + this.createTimeUnitMs = System.currentTimeMillis(); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(timeoutTimeUnitMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed delayed) { + FutureTaskDelayQueueData other = (FutureTaskDelayQueueData) delayed; + if (this.timeoutTimeUnitMs == other.timeoutTimeUnitMs) { + return (this.timeoutTimeUnitMs + "_" + this.createTimeUnitMs).compareTo((other.timeoutTimeUnitMs + "_" + other.createTimeUnitMs)); + } + + return (this.timeoutTimeUnitMs - other.timeoutTimeUnitMs) <= 0 ? -1: 1; + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/FutureUtil.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/FutureUtil.java new file mode 100644 index 00000000..b061ebed --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/FutureUtil.java @@ -0,0 +1,150 @@ +package com.xiaojukeji.kafka.manager.common.utils; + +import com.xiaojukeji.kafka.manager.common.entity.ao.common.FutureTaskDelayQueueData; +import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +/** + * Future工具类 + */ +public class FutureUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(FutureUtil.class); + + private ThreadPoolExecutor executor; + + private Map>> futuresMap; + + private FutureUtil() { + } + + public static FutureUtil init(String name, int corePoolSize, int maxPoolSize, int queueSize) { + FutureUtil futureUtil = new FutureUtil<>(); + + futureUtil.executor = new ThreadPoolExecutor( + corePoolSize, + maxPoolSize, + 3000, + TimeUnit.MILLISECONDS, + new LinkedBlockingDeque<>(queueSize), + new DefaultThreadFactory("KM-FutureUtil-" + name), + new ThreadPoolExecutor.DiscardOldestPolicy() //对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。 + ); + + futureUtil.futuresMap = new ConcurrentHashMap<>(); + return futureUtil; + } + + /** + * 必须配合 waitExecute使用 否则容易会撑爆内存 + */ + public FutureUtil runnableTask(String taskName, Integer timeoutUnisMs, Callable callable) { + Long currentThreadId = Thread.currentThread().getId(); + + futuresMap.putIfAbsent(currentThreadId, new DelayQueue<>()); + + DelayQueue> delayQueueData = futuresMap.get(currentThreadId); + + delayQueueData.put(new FutureTaskDelayQueueData<>(taskName, executor.submit(callable), timeoutUnisMs + System.currentTimeMillis())); + + return this; + } + + public FutureUtil runnableTask(String taskName, Integer timeoutUnisMs, Runnable runnable) { + Long currentThreadId = Thread.currentThread().getId(); + + futuresMap.putIfAbsent(currentThreadId, new DelayQueue<>()); + + DelayQueue> delayQueueData = futuresMap.get(currentThreadId); + + delayQueueData.put(new FutureTaskDelayQueueData(taskName, (Future) executor.submit(runnable), timeoutUnisMs + System.currentTimeMillis())); + + return this; + } + + public void waitExecute() { + this.waitResult(); + } + + public void waitExecute(Integer stepWaitTimeUnitMs) { + this.waitResult(stepWaitTimeUnitMs); + } + + public List waitResult() { + return waitResult(null); + } + + /** + * 等待结果 + * @param stepWaitTimeUnitMs 超时时间达到后,没有完成时,继续等待的时间 + */ + public List waitResult(Integer stepWaitTimeUnitMs) { + Long currentThreadId = Thread.currentThread().getId(); + + DelayQueue> delayQueueData = futuresMap.remove(currentThreadId); + if(delayQueueData == null || delayQueueData.isEmpty()) { + return new ArrayList<>(); + } + + List resultList = new ArrayList<>(); + while (!delayQueueData.isEmpty()) { + try { + // 不进行阻塞,直接获取第一个任务 + FutureTaskDelayQueueData queueData = delayQueueData.peek(); + if (queueData.getFutureTask().isDone()) { + // 如果第一个已经完成了,则移除掉第一个,然后获取其result + delayQueueData.remove(queueData); + resultList.add(queueData.getFutureTask().get()); + continue; + } + + // 如果第一个未完成,则阻塞10ms,判断是否达到超时时间了。 + // 这里的10ms不建议设置较大,因为任务可能在这段时间内完成了,此时如果设置的较大,会导致迟迟不能返回,从而影响接口调用的性能 + queueData = delayQueueData.poll(10, TimeUnit.MILLISECONDS); + if (queueData == null) { + continue; + } + + // 在到达超时时间后,任务没有完成,但是没有完成的原因可能是因为任务一直处于等待状态导致的。 + // 因此这里再给一段补充时间,看这段时间内是否可以完成任务。 + stepWaitResult(queueData, stepWaitTimeUnitMs); + + // 达到超时时间 + if (queueData.getFutureTask().isDone()) { + // 任务已经完成 + resultList.add(queueData.getFutureTask().get()); + continue; + } + + // 达到超时时间,但是任务未完成,则打印日志并强制取消 + LOGGER.error("class=FutureUtil||method=waitExecute||taskName={}||msg=cancel task", queueData.getTaskName()); + + queueData.getFutureTask().cancel(true); + } catch (Exception e) { + LOGGER.error("class=FutureUtil||method=waitExecute||msg=exception", e); + } + } + + return resultList; + } + + private T stepWaitResult(FutureTaskDelayQueueData queueData, Integer stepWaitTimeUnitMs) { + if (stepWaitTimeUnitMs == null) { + return null; + } + + try { + return queueData.getFutureTask().get(stepWaitTimeUnitMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + // 达到超时时间,但是任务未完成,则打印日志并强制取消 + LOGGER.error("class=FutureUtil||method=stepWaitResult||taskName={}||errMsg=exception", queueData.getTaskName(), e); + } + + return null; + } +}