Merge pull request #332 from didi/dev

修复poll异常时, 超时时间不生效问题
This commit is contained in:
EricZeng
2021-06-23 20:24:50 +08:00
committed by GitHub

View File

@@ -648,10 +648,11 @@ public class TopicServiceImpl implements TopicService {
List<String> dataList = new ArrayList<>();
int currentSize = dataList.size();
while (dataList.size() < maxMsgNum) {
if (remainingWaitMs <= 0) {
break;
}
try {
if (remainingWaitMs <= 0) {
break;
}
ConsumerRecords<String, String> records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS);
for (ConsumerRecord record : records) {
String value = (String) record.value();
@@ -661,20 +662,22 @@ public class TopicServiceImpl implements TopicService {
: value
);
}
// 当前批次一条数据都没拉取到,则结束拉取
if (dataList.size() - currentSize == 0) {
break;
}
currentSize = dataList.size();
// 检查是否超时
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs) {
break;
}
remainingWaitMs = maxWaitMs - elapsed;
} catch (Exception e) {
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
}
// 当前批次一条数据都没拉取到,则结束拉取
if (dataList.size() - currentSize == 0) {
break;
}
currentSize = dataList.size();
// 检查是否超时
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs) {
break;
}
remainingWaitMs = maxWaitMs - elapsed;
}
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
}
@@ -698,14 +701,15 @@ public class TopicServiceImpl implements TopicService {
: value
);
}
if (System.currentTimeMillis() - timestamp > timeout
|| dataList.size() >= maxMsgNum) {
break;
}
Thread.sleep(10);
} catch (Exception e) {
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
}
if (System.currentTimeMillis() - timestamp > timeout || dataList.size() >= maxMsgNum) {
// 超时或者是数据已采集足够时, 直接返回
break;
}
}
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
}