diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java new file mode 100644 index 00000000..67aa707e --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/ClusterPhyAddedEvent.java @@ -0,0 +1,15 @@ +package com.xiaojukeji.know.streaming.km.common.bean.event.cluster; + +import lombok.Getter; + +/** + * 集群新增事件 + * @author zengqiao + * @date 22/02/25 + */ +@Getter +public class ClusterPhyAddedEvent extends ClusterPhyBaseEvent { + public ClusterPhyAddedEvent(Object source, Long clusterPhyId) { + super(source, clusterPhyId); + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java index 2ba13738..43a0557c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java @@ -7,7 +7,9 @@ import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent; import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO; +import com.xiaojukeji.know.streaming.km.common.component.SpringTool; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; @@ -106,6 +108,8 @@ public class ClusterPhyServiceImpl implements ClusterPhyService { log.info("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster finished", clusterPhyPO.getId(), operator); + // 发布添加集群事件 + SpringTool.publish(new ClusterPhyAddedEvent(this, clusterPhyPO.getId())); return clusterPhyPO.getId(); } catch (DuplicateKeyException dke) { log.warn("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=duplicate data", clusterPhyPO.getId(), operator); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java new file mode 100644 index 00000000..b0886754 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java @@ -0,0 +1,76 @@ +package com.xiaojukeji.know.streaming.km.task.service.listener; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent; +import com.xiaojukeji.know.streaming.km.common.component.SpringTool; +import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; +import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; +import com.xiaojukeji.know.streaming.km.task.metadata.AbstractAsyncMetadataDispatchTask; +import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; + +@Service +public class TaskClusterAddedListener implements ApplicationListener { + private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class); + + @Override + public void onApplicationEvent(ClusterPhyAddedEvent event) { + LOGGER.info("class=TaskClusterAddedListener||method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId()); + Long now = System.currentTimeMillis(); + + // 交由KS自定义的线程池,异步执行任务 + FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now)); + } + + private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) { + ClusterPhy tempClusterPhy = null; + + // 120秒内无加载进来,则直接返回退出 + while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) { + tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId); + if (tempClusterPhy != null) { + break; + } + + BackoffUtils.backoff(1000); + } + + if (tempClusterPhy == null) { + return; + } + + // 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定 + BackoffUtils.backoff(5000); + final ClusterPhy clusterPhy = tempClusterPhy; + + // 集群执行集群元信息同步 + List metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values()); + for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) { + try { + dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs); + } catch (Exception e) { + // ignore + } + } + + // 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定 + BackoffUtils.backoff(5000); + + // 集群集群指标采集 + List metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values()); + for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) { + try { + dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs); + } catch (Exception e) { + // ignore + } + } + } +}