mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Optimize]Topic-Partitions增加主动超时功能 (#1076)
问题: leader=-1的分区获取offset信息时,耗时时间过久会导致前端超时,进而整个页面的数据都获取不到; 解决: 后端主动在前端超时前,对一些请求进行超时,避免导致所有的信息都没有返回给前端;
This commit is contained in:
@@ -48,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -597,7 +598,7 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
||||
|
||||
private List<ClusterMirrorMakerOverviewVO> completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {
|
||||
|
||||
Map<String, KSConnectorInfo> connectorInfoMap = new HashMap<>();
|
||||
Map<String, KSConnectorInfo> connectorInfoMap = new ConcurrentHashMap<>();
|
||||
|
||||
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
|
||||
ApiCallThreadPoolService.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()),
|
||||
@@ -607,12 +608,10 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
|
||||
if (connectorInfoRet.hasData()) {
|
||||
connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData());
|
||||
}
|
||||
|
||||
return connectorInfoRet.getData();
|
||||
});
|
||||
}
|
||||
|
||||
ApiCallThreadPoolService.waitResult(1000);
|
||||
ApiCallThreadPoolService.waitResult();
|
||||
|
||||
List<ClusterMirrorMakerOverviewVO> newMirrorMakerVOList = new ArrayList<>();
|
||||
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
|
||||
|
||||
@@ -47,6 +47,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems;
|
||||
import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService;
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.config.TopicConfig;
|
||||
@@ -60,7 +61,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
@Component
|
||||
public class TopicStateManagerImpl implements TopicStateManager {
|
||||
private static final ILog log = LogFactory.getLog(TopicStateManagerImpl.class);
|
||||
private static final ILog LOGGER = LogFactory.getLog(TopicStateManagerImpl.class);
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
@@ -232,26 +233,37 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
|
||||
@Override
|
||||
public Result<List<TopicPartitionVO>> getTopicPartitions(Long clusterPhyId, String topicName, List<String> metricsNames) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
List<Partition> partitionList = partitionService.listPartitionByTopic(clusterPhyId, topicName);
|
||||
if (ValidateUtils.isEmptyList(partitionList)) {
|
||||
return Result.buildSuc();
|
||||
}
|
||||
|
||||
Result<List<PartitionMetrics>> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames);
|
||||
if (metricsResult.failed()) {
|
||||
// 仅打印错误日志,但是不直接返回错误
|
||||
log.error(
|
||||
"method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from es failed",
|
||||
clusterPhyId, topicName, metricsResult
|
||||
);
|
||||
}
|
||||
|
||||
// 转map
|
||||
Map<Integer, PartitionMetrics> metricsMap = new HashMap<>();
|
||||
if (metricsResult.hasData()) {
|
||||
for (PartitionMetrics metrics: metricsResult.getData()) {
|
||||
metricsMap.put(metrics.getPartitionId(), metrics);
|
||||
}
|
||||
ApiCallThreadPoolService.runnableTask(
|
||||
String.format("clusterPhyId=%d||topicName=%s||method=getTopicPartitions", clusterPhyId, topicName),
|
||||
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTime),
|
||||
() -> {
|
||||
Result<List<PartitionMetrics>> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames);
|
||||
if (metricsResult.failed()) {
|
||||
// 仅打印错误日志,但是不直接返回错误
|
||||
LOGGER.error(
|
||||
"method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from kafka failed",
|
||||
clusterPhyId, topicName, metricsResult
|
||||
);
|
||||
}
|
||||
|
||||
for (PartitionMetrics metrics: metricsResult.getData()) {
|
||||
metricsMap.put(metrics.getPartitionId(), metrics);
|
||||
}
|
||||
}
|
||||
);
|
||||
boolean finished = ApiCallThreadPoolService.waitResultAndReturnFinished(1);
|
||||
|
||||
if (!finished && metricsMap.isEmpty()) {
|
||||
// 未完成 -> 打印日志
|
||||
LOGGER.error("method=getTopicPartitions||clusterPhyId={}||topicName={}||msg=get metrics from kafka failed", clusterPhyId, topicName);
|
||||
}
|
||||
|
||||
List<TopicPartitionVO> voList = new ArrayList<>();
|
||||
@@ -423,7 +435,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
|
||||
return voList;
|
||||
} catch (Exception e) {
|
||||
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e);
|
||||
LOGGER.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e);
|
||||
|
||||
throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED);
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
@@ -21,7 +22,7 @@ public class ApiCallThreadPoolService {
|
||||
@Value(value = "${thread-pool.api.queue-size:500}")
|
||||
private Integer queueSize;
|
||||
|
||||
private static FutureWaitUtil<Object> apiFutureUtil;
|
||||
private static FutureWaitUtil<Boolean> apiFutureUtil;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
@@ -33,7 +34,7 @@ public class ApiCallThreadPoolService {
|
||||
);
|
||||
}
|
||||
|
||||
public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable<Object> callable) {
|
||||
public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable<Boolean> callable) {
|
||||
apiFutureUtil.runnableTask(taskName, timeoutUnisMs, callable);
|
||||
}
|
||||
|
||||
@@ -41,12 +42,13 @@ public class ApiCallThreadPoolService {
|
||||
apiFutureUtil.runnableTask(taskName, timeoutUnisMs, runnable);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static void waitResult(Integer stepWaitTimeUnitMs) {
|
||||
apiFutureUtil.waitResult(stepWaitTimeUnitMs);
|
||||
}
|
||||
|
||||
public static void waitResult() {
|
||||
apiFutureUtil.waitResult(0);
|
||||
}
|
||||
|
||||
public static boolean waitResultAndReturnFinished(int taskNum) {
|
||||
List<Boolean> resultList = apiFutureUtil.waitResult(0);
|
||||
|
||||
return resultList != null && resultList.size() == taskNum;
|
||||
}
|
||||
}
|
||||
@@ -74,7 +74,7 @@ public class TopicStateController {
|
||||
@GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/brokers-partitions-summary")
|
||||
@ResponseBody
|
||||
public Result<TopicBrokersPartitionsSummaryVO> getTopicBrokersPartitionsSummary(@PathVariable Long clusterPhyId,
|
||||
@PathVariable String topicName) throws Exception {
|
||||
@PathVariable String topicName) {
|
||||
return topicStateManager.getTopicBrokersPartitionsSummary(clusterPhyId, topicName);
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ public class TopicStateController {
|
||||
@ResponseBody
|
||||
public Result<List<TopicPartitionVO>> getTopicPartitions(@PathVariable Long clusterPhyId,
|
||||
@PathVariable String topicName,
|
||||
@RequestBody List<String> metricsNames) throws Exception {
|
||||
@RequestBody List<String> metricsNames) {
|
||||
return topicStateManager.getTopicPartitions(clusterPhyId, topicName, metricsNames);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user