From c46c35b2486f1acfb28164400a4918cda1a44fe2 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 23 Jun 2021 10:11:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dpoll=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E6=97=B6,=20=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4=E4=B8=8D?= =?UTF-8?q?=E7=94=9F=E6=95=88=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/TopicServiceImpl.java | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 63191888..5c0176b1 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -648,10 +648,11 @@ public class TopicServiceImpl implements TopicService { List dataList = new ArrayList<>(); int currentSize = dataList.size(); while (dataList.size() < maxMsgNum) { + if (remainingWaitMs <= 0) { + break; + } + try { - if (remainingWaitMs <= 0) { - break; - } ConsumerRecords records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS); for (ConsumerRecord record : records) { String value = (String) record.value(); @@ -661,20 +662,22 @@ public class TopicServiceImpl implements TopicService { : value ); } - // 当前批次一条数据都没拉取到,则结束拉取 - if (dataList.size() - currentSize == 0) { - break; - } - currentSize = dataList.size(); - // 检查是否超时 - long elapsed = System.currentTimeMillis() - begin; - if (elapsed >= maxWaitMs) { - break; - } - remainingWaitMs = maxWaitMs - elapsed; } catch (Exception e) { LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e); } + + // 当前批次一条数据都没拉取到,则结束拉取 + if (dataList.size() - currentSize == 0) { + break; + } + currentSize = dataList.size(); + + // 检查是否超时 + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) { + break; + } + remainingWaitMs = maxWaitMs - elapsed; } return dataList.subList(0, Math.min(dataList.size(), maxMsgNum)); } @@ -698,14 +701,15 @@ public class TopicServiceImpl implements TopicService { : value ); } - if (System.currentTimeMillis() - timestamp > timeout - || dataList.size() >= maxMsgNum) { - break; - } Thread.sleep(10); } catch (Exception e) { LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e); } + + if (System.currentTimeMillis() - timestamp > timeout || dataList.size() >= maxMsgNum) { + // 超时或者是数据已采集足够时, 直接返回 + break; + } } return dataList.subList(0, Math.min(dataList.size(), maxMsgNum)); }