diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 63191888..5c0176b1 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -648,10 +648,11 @@ public class TopicServiceImpl implements TopicService { List dataList = new ArrayList<>(); int currentSize = dataList.size(); while (dataList.size() < maxMsgNum) { + if (remainingWaitMs <= 0) { + break; + } + try { - if (remainingWaitMs <= 0) { - break; - } ConsumerRecords records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS); for (ConsumerRecord record : records) { String value = (String) record.value(); @@ -661,20 +662,22 @@ public class TopicServiceImpl implements TopicService { : value ); } - // 当前批次一条数据都没拉取到,则结束拉取 - if (dataList.size() - currentSize == 0) { - break; - } - currentSize = dataList.size(); - // 检查是否超时 - long elapsed = System.currentTimeMillis() - begin; - if (elapsed >= maxWaitMs) { - break; - } - remainingWaitMs = maxWaitMs - elapsed; } catch (Exception e) { LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e); } + + // 当前批次一条数据都没拉取到,则结束拉取 + if (dataList.size() - currentSize == 0) { + break; + } + currentSize = dataList.size(); + + // 检查是否超时 + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) { + break; + } + remainingWaitMs = maxWaitMs - elapsed; } return dataList.subList(0, Math.min(dataList.size(), maxMsgNum)); } @@ -698,14 +701,15 @@ public class TopicServiceImpl implements TopicService { : value ); } - if (System.currentTimeMillis() - timestamp > timeout - || dataList.size() >= maxMsgNum) { - break; - } Thread.sleep(10); } catch (Exception e) { LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e); } + + if (System.currentTimeMillis() - timestamp > timeout || dataList.size() >= maxMsgNum) { + // 超时或者是数据已采集足够时, 直接返回 + break; + } } return dataList.subList(0, Math.min(dataList.size(), maxMsgNum)); }