From de977a5b3296509402928244b8b13eb01986cecd Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 8 Sep 2022 14:21:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=BF=AB=E6=B7=BB=E5=8A=A0=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=E5=90=8E=E7=9A=84=E4=BF=A1=E6=81=AF=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E7=9A=84=E9=80=9F=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../event/cluster/ClusterPhyAddedEvent.java | 15 ++++ .../cluster/impl/ClusterPhyServiceImpl.java | 4 + .../listener/TaskClusterAddedListener.java | 76 +++++++++++++++++++ 3 files changed, 95 insertions(+) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java create mode 100644 km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java new file mode 100644 index 00000000..67aa707e --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java @@ -0,0 +1,15 @@ +package com.xiaojukeji.know.streaming.km.common.bean.event.cluster; + +import lombok.Getter; + +/** + * 集群新增事件 + * @author zengqiao + * @date 22/02/25 + */ +@Getter +public class ClusterPhyAddedEvent extends ClusterPhyBaseEvent { + public ClusterPhyAddedEvent(Object source, Long clusterPhyId) { + super(source, clusterPhyId); + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java index 2ba13738..43a0557c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java @@ -7,7 +7,9 @@ import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent; import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO; +import com.xiaojukeji.know.streaming.km.common.component.SpringTool; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; @@ -106,6 +108,8 @@ public class ClusterPhyServiceImpl implements ClusterPhyService { log.info("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster finished", clusterPhyPO.getId(), operator); + // 发布添加集群事件 + SpringTool.publish(new ClusterPhyAddedEvent(this, clusterPhyPO.getId())); return clusterPhyPO.getId(); } catch (DuplicateKeyException dke) { log.warn("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=duplicate data", clusterPhyPO.getId(), operator); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java new file mode 100644 index 00000000..b0886754 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java @@ -0,0 +1,76 @@ +package com.xiaojukeji.know.streaming.km.task.service.listener; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent; +import com.xiaojukeji.know.streaming.km.common.component.SpringTool; +import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; +import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; +import com.xiaojukeji.know.streaming.km.task.metadata.AbstractAsyncMetadataDispatchTask; +import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; + +@Service +public class TaskClusterAddedListener implements ApplicationListener { + private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class); + + @Override + public void onApplicationEvent(ClusterPhyAddedEvent event) { + LOGGER.info("class=TaskClusterAddedListener||method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId()); + Long now = System.currentTimeMillis(); + + // 交由KS自定义的线程池,异步执行任务 + FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now)); + } + + private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) { + ClusterPhy tempClusterPhy = null; + + // 120秒内无加载进来,则直接返回退出 + while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) { + tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId); + if (tempClusterPhy != null) { + break; + } + + BackoffUtils.backoff(1000); + } + + if (tempClusterPhy == null) { + return; + } + + // 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定 + BackoffUtils.backoff(5000); + final ClusterPhy clusterPhy = tempClusterPhy; + + // 集群执行集群元信息同步 + List metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values()); + for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) { + try { + dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs); + } catch (Exception e) { + // ignore + } + } + + // 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定 + BackoffUtils.backoff(5000); + + // 集群集群指标采集 + List metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values()); + for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) { + try { + dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs); + } catch (Exception e) { + // ignore + } + } + } +}