diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java index 62354118..803daa26 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java @@ -41,6 +41,7 @@ import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetr import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; +import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService; import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -296,7 +297,9 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { List mirrorMakerOverviewVOList = this.convert2ClusterMirrorMakerOverviewVO(mirrorMakerList, connectClusterList, latestMetricsResult.getData()); - PaginationResult voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerOverviewVOList, dto); + List mirrorMakerVOList = this.completeClusterInfo(mirrorMakerOverviewVOList); + + PaginationResult voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerVOList, dto); if (voPaginationResult.failed()) { LOGGER.error("method=ClusterMirrorMakerOverviewVO||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult); @@ -304,10 +307,6 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { return PaginationResult.buildFailure(voPaginationResult, dto); } - //这里再补充源集群和目的集群信息,减少网络请求。 - this.completeClusterInfo(voPaginationResult.getData().getBizData()); - - // 查询历史指标 Result> lineMetricsResult = mirrorMakerMetricService.listMirrorMakerClusterMetricsFromES( clusterPhyId, @@ -596,14 +595,31 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { return voList; } - private void completeClusterInfo(List mirrorMakerVOList) { + private List completeClusterInfo(List mirrorMakerVOList) { + + Map connectorInfoMap = new HashMap<>(); for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) { - Result connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()); - if (!connectorInfoRet.hasData()) { + ApiCallThreadPoolService.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()), + 3000 + , () -> { + Result connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()); + if (connectorInfoRet.hasData()) { + connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData()); + } + + return connectorInfoRet.getData(); + }); + } + + ApiCallThreadPoolService.waitResult(1000); + + List newMirrorMakerVOList = new ArrayList<>(); + for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) { + KSConnectorInfo connectorInfo = connectorInfoMap.get(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName()); + if (connectorInfo == null) { continue; } - KSConnectorInfo connectorInfo = connectorInfoRet.getData(); String sourceClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME); String targetClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME); @@ -627,6 +643,10 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { } } + newMirrorMakerVOList.add(mirrorMakerVO); + } + + return newMirrorMakerVOList; } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java new file mode 100644 index 00000000..e66b4aa5 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java @@ -0,0 +1,43 @@ +package com.xiaojukeji.know.streaming.km.core.utils; + +import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil; +import lombok.NoArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.concurrent.Callable; + +/** + * @author wyb + * @date 2023/2/22 + */ +@Service +@NoArgsConstructor +public class ApiCallThreadPoolService { + @Value(value = "${thread-pool.api.thread-num:2}") + private Integer threadNum; + + @Value(value = "${thread-pool.api.queue-size:500}") + private Integer queueSize; + + private static FutureWaitUtil apiFutureUtil; + + @PostConstruct + private void init() { + apiFutureUtil = FutureWaitUtil.init( + "ApiCallTP", + threadNum, + threadNum, + queueSize + ); + } + + public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable callable) { + apiFutureUtil.runnableTask(taskName, timeoutUnisMs, callable); + } + + public static void waitResult(Integer stepWaitTimeUnitMs) { + apiFutureUtil.waitResult(stepWaitTimeUnitMs); + } +} \ No newline at end of file