diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java index 803daa26..99f2747f 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java @@ -48,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -597,7 +598,7 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { private List completeClusterInfo(List mirrorMakerVOList) { - Map connectorInfoMap = new HashMap<>(); + Map connectorInfoMap = new ConcurrentHashMap<>(); for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) { ApiCallThreadPoolService.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()), @@ -607,12 +608,10 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager { if (connectorInfoRet.hasData()) { connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData()); } - - return connectorInfoRet.getData(); }); } - ApiCallThreadPoolService.waitResult(1000); + ApiCallThreadPoolService.waitResult(); List newMirrorMakerVOList = new ArrayList<>(); for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) { diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index c2a4f244..d973ece9 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -47,6 +47,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems; +import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; @@ -60,7 +61,7 @@ import java.util.stream.Collectors; @Component public class TopicStateManagerImpl implements TopicStateManager { - private static final ILog log = LogFactory.getLog(TopicStateManagerImpl.class); + private static final ILog LOGGER = LogFactory.getLog(TopicStateManagerImpl.class); @Autowired private TopicService topicService; @@ -232,26 +233,37 @@ public class TopicStateManagerImpl implements TopicStateManager { @Override public Result> getTopicPartitions(Long clusterPhyId, String topicName, List metricsNames) { + long startTime = System.currentTimeMillis(); + List partitionList = partitionService.listPartitionByTopic(clusterPhyId, topicName); if (ValidateUtils.isEmptyList(partitionList)) { return Result.buildSuc(); } - Result> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames); - if (metricsResult.failed()) { - // 仅打印错误日志,但是不直接返回错误 - log.error( - "method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from es failed", - clusterPhyId, topicName, metricsResult - ); - } - - // 转map Map metricsMap = new HashMap<>(); - if (metricsResult.hasData()) { - for (PartitionMetrics metrics: metricsResult.getData()) { - metricsMap.put(metrics.getPartitionId(), metrics); - } + ApiCallThreadPoolService.runnableTask( + String.format("clusterPhyId=%d||topicName=%s||method=getTopicPartitions", clusterPhyId, topicName), + ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTime), + () -> { + Result> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames); + if (metricsResult.failed()) { + // 仅打印错误日志,但是不直接返回错误 + LOGGER.error( + "method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from kafka failed", + clusterPhyId, topicName, metricsResult + ); + } + + for (PartitionMetrics metrics: metricsResult.getData()) { + metricsMap.put(metrics.getPartitionId(), metrics); + } + } + ); + boolean finished = ApiCallThreadPoolService.waitResultAndReturnFinished(1); + + if (!finished && metricsMap.isEmpty()) { + // 未完成 -> 打印日志 + LOGGER.error("method=getTopicPartitions||clusterPhyId={}||topicName={}||msg=get metrics from kafka failed", clusterPhyId, topicName); } List voList = new ArrayList<>(); @@ -423,7 +435,7 @@ public class TopicStateManagerImpl implements TopicStateManager { return voList; } catch (Exception e) { - log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e); + LOGGER.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e); throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java index cce34c6f..67161657 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/utils/ApiCallThreadPoolService.java @@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import java.util.List; import java.util.concurrent.Callable; /** @@ -21,7 +22,7 @@ public class ApiCallThreadPoolService { @Value(value = "${thread-pool.api.queue-size:500}") private Integer queueSize; - private static FutureWaitUtil apiFutureUtil; + private static FutureWaitUtil apiFutureUtil; @PostConstruct private void init() { @@ -33,7 +34,7 @@ public class ApiCallThreadPoolService { ); } - public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable callable) { + public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable callable) { apiFutureUtil.runnableTask(taskName, timeoutUnisMs, callable); } @@ -41,12 +42,13 @@ public class ApiCallThreadPoolService { apiFutureUtil.runnableTask(taskName, timeoutUnisMs, runnable); } - @Deprecated - public static void waitResult(Integer stepWaitTimeUnitMs) { - apiFutureUtil.waitResult(stepWaitTimeUnitMs); - } - public static void waitResult() { apiFutureUtil.waitResult(0); } + + public static boolean waitResultAndReturnFinished(int taskNum) { + List resultList = apiFutureUtil.waitResult(0); + + return resultList != null && resultList.size() == taskNum; + } } \ No newline at end of file diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java index b0371537..c1b79c90 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java @@ -74,7 +74,7 @@ public class TopicStateController { @GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/brokers-partitions-summary") @ResponseBody public Result getTopicBrokersPartitionsSummary(@PathVariable Long clusterPhyId, - @PathVariable String topicName) throws Exception { + @PathVariable String topicName) { return topicStateManager.getTopicBrokersPartitionsSummary(clusterPhyId, topicName); } @@ -83,7 +83,7 @@ public class TopicStateController { @ResponseBody public Result> getTopicPartitions(@PathVariable Long clusterPhyId, @PathVariable String topicName, - @RequestBody List metricsNames) throws Exception { + @RequestBody List metricsNames) { return topicStateManager.getTopicPartitions(clusterPhyId, topicName, metricsNames); }