mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-05 13:08:48 +08:00
@@ -41,6 +41,7 @@ import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetr
|
|||||||
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
|
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
|
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
|
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
|
||||||
|
import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService;
|
||||||
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
|
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -296,7 +297,9 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
|||||||
|
|
||||||
List<ClusterMirrorMakerOverviewVO> mirrorMakerOverviewVOList = this.convert2ClusterMirrorMakerOverviewVO(mirrorMakerList, connectClusterList, latestMetricsResult.getData());
|
List<ClusterMirrorMakerOverviewVO> mirrorMakerOverviewVOList = this.convert2ClusterMirrorMakerOverviewVO(mirrorMakerList, connectClusterList, latestMetricsResult.getData());
|
||||||
|
|
||||||
PaginationResult<ClusterMirrorMakerOverviewVO> voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerOverviewVOList, dto);
|
List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList = this.completeClusterInfo(mirrorMakerOverviewVOList);
|
||||||
|
|
||||||
|
PaginationResult<ClusterMirrorMakerOverviewVO> voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerVOList, dto);
|
||||||
|
|
||||||
if (voPaginationResult.failed()) {
|
if (voPaginationResult.failed()) {
|
||||||
LOGGER.error("method=ClusterMirrorMakerOverviewVO||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult);
|
LOGGER.error("method=ClusterMirrorMakerOverviewVO||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult);
|
||||||
@@ -304,10 +307,6 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
|||||||
return PaginationResult.buildFailure(voPaginationResult, dto);
|
return PaginationResult.buildFailure(voPaginationResult, dto);
|
||||||
}
|
}
|
||||||
|
|
||||||
//这里再补充源集群和目的集群信息,减少网络请求。
|
|
||||||
this.completeClusterInfo(voPaginationResult.getData().getBizData());
|
|
||||||
|
|
||||||
|
|
||||||
// 查询历史指标
|
// 查询历史指标
|
||||||
Result<List<MetricMultiLinesVO>> lineMetricsResult = mirrorMakerMetricService.listMirrorMakerClusterMetricsFromES(
|
Result<List<MetricMultiLinesVO>> lineMetricsResult = mirrorMakerMetricService.listMirrorMakerClusterMetricsFromES(
|
||||||
clusterPhyId,
|
clusterPhyId,
|
||||||
@@ -596,14 +595,31 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
|||||||
return voList;
|
return voList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {
|
private List<ClusterMirrorMakerOverviewVO> completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {
|
||||||
|
|
||||||
|
Map<String, KSConnectorInfo> connectorInfoMap = new HashMap<>();
|
||||||
|
|
||||||
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
|
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
|
||||||
Result<KSConnectorInfo> connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName());
|
ApiCallThreadPoolService.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()),
|
||||||
if (!connectorInfoRet.hasData()) {
|
3000
|
||||||
|
, () -> {
|
||||||
|
Result<KSConnectorInfo> connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName());
|
||||||
|
if (connectorInfoRet.hasData()) {
|
||||||
|
connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData());
|
||||||
|
}
|
||||||
|
|
||||||
|
return connectorInfoRet.getData();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
ApiCallThreadPoolService.waitResult(1000);
|
||||||
|
|
||||||
|
List<ClusterMirrorMakerOverviewVO> newMirrorMakerVOList = new ArrayList<>();
|
||||||
|
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
|
||||||
|
KSConnectorInfo connectorInfo = connectorInfoMap.get(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName());
|
||||||
|
if (connectorInfo == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
KSConnectorInfo connectorInfo = connectorInfoRet.getData();
|
|
||||||
|
|
||||||
String sourceClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME);
|
String sourceClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME);
|
||||||
String targetClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME);
|
String targetClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME);
|
||||||
@@ -627,6 +643,10 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
newMirrorMakerVOList.add(mirrorMakerVO);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return newMirrorMakerVOList;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,43 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.core.utils;
|
||||||
|
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author wyb
|
||||||
|
* @date 2023/2/22
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class ApiCallThreadPoolService {
|
||||||
|
@Value(value = "${thread-pool.api.thread-num:2}")
|
||||||
|
private Integer threadNum;
|
||||||
|
|
||||||
|
@Value(value = "${thread-pool.api.queue-size:500}")
|
||||||
|
private Integer queueSize;
|
||||||
|
|
||||||
|
private static FutureWaitUtil<Object> apiFutureUtil;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
private void init() {
|
||||||
|
apiFutureUtil = FutureWaitUtil.init(
|
||||||
|
"ApiCallTP",
|
||||||
|
threadNum,
|
||||||
|
threadNum,
|
||||||
|
queueSize
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable<Object> callable) {
|
||||||
|
apiFutureUtil.runnableTask(taskName, timeoutUnisMs, callable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void waitResult(Integer stepWaitTimeUnitMs) {
|
||||||
|
apiFutureUtil.waitResult(stepWaitTimeUnitMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user