加快添加集群后的信息获取的速度

This commit is contained in:
zengqiao
2022-09-08 14:21:26 +08:00
parent 703d685d59
commit de977a5b32
3 changed files with 95 additions and 0 deletions

View File

@@ -0,0 +1,15 @@
package com.xiaojukeji.know.streaming.km.common.bean.event.cluster;
import lombok.Getter;
/**
* 集群新增事件
* @author zengqiao
* @date 22/02/25
*/
@Getter
public class ClusterPhyAddedEvent extends ClusterPhyBaseEvent {
public ClusterPhyAddedEvent(Object source, Long clusterPhyId) {
super(source, clusterPhyId);
}
}

View File

@@ -7,7 +7,9 @@ import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO; import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO;
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
@@ -106,6 +108,8 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
log.info("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster finished", clusterPhyPO.getId(), operator); log.info("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster finished", clusterPhyPO.getId(), operator);
// 发布添加集群事件
SpringTool.publish(new ClusterPhyAddedEvent(this, clusterPhyPO.getId()));
return clusterPhyPO.getId(); return clusterPhyPO.getId();
} catch (DuplicateKeyException dke) { } catch (DuplicateKeyException dke) {
log.warn("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=duplicate data", clusterPhyPO.getId(), operator); log.warn("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=duplicate data", clusterPhyPO.getId(), operator);

View File

@@ -0,0 +1,76 @@
package com.xiaojukeji.know.streaming.km.task.service.listener;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import com.xiaojukeji.know.streaming.km.task.metadata.AbstractAsyncMetadataDispatchTask;
import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {
private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);
@Override
public void onApplicationEvent(ClusterPhyAddedEvent event) {
LOGGER.info("class=TaskClusterAddedListener||method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());
Long now = System.currentTimeMillis();
// 交由KS自定义的线程池异步执行任务
FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));
}
private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {
ClusterPhy tempClusterPhy = null;
// 120秒内无加载进来则直接返回退出
while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {
tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
if (tempClusterPhy != null) {
break;
}
BackoffUtils.backoff(1000);
}
if (tempClusterPhy == null) {
return;
}
// 获取到之后再延迟5秒保证相关的集群都被正常加载进来这里的5秒不固定
BackoffUtils.backoff(5000);
final ClusterPhy clusterPhy = tempClusterPhy;
// 集群执行集群元信息同步
List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());
for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {
try {
dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
} catch (Exception e) {
// ignore
}
}
// 再延迟5秒保证集群元信息都已被正常同步至DB这里的5秒不固定
BackoffUtils.backoff(5000);
// 集群集群指标采集
List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());
for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {
try {
dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
} catch (Exception e) {
// ignore
}
}
}
}