修复poll异常时, 超时时间不生效问题

This commit is contained in:
zengqiao
2021-06-23 10:11:38 +08:00
parent 7256db8c4e
commit c46c35b248

View File

@@ -648,10 +648,11 @@ public class TopicServiceImpl implements TopicService {
List<String> dataList = new ArrayList<>(); List<String> dataList = new ArrayList<>();
int currentSize = dataList.size(); int currentSize = dataList.size();
while (dataList.size() < maxMsgNum) { while (dataList.size() < maxMsgNum) {
if (remainingWaitMs <= 0) {
break;
}
try { try {
if (remainingWaitMs <= 0) {
break;
}
ConsumerRecords<String, String> records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS); ConsumerRecords<String, String> records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS);
for (ConsumerRecord record : records) { for (ConsumerRecord record : records) {
String value = (String) record.value(); String value = (String) record.value();
@@ -661,20 +662,22 @@ public class TopicServiceImpl implements TopicService {
: value : value
); );
} }
// 当前批次一条数据都没拉取到,则结束拉取
if (dataList.size() - currentSize == 0) {
break;
}
currentSize = dataList.size();
// 检查是否超时
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs) {
break;
}
remainingWaitMs = maxWaitMs - elapsed;
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e); LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
} }
// 当前批次一条数据都没拉取到,则结束拉取
if (dataList.size() - currentSize == 0) {
break;
}
currentSize = dataList.size();
// 检查是否超时
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs) {
break;
}
remainingWaitMs = maxWaitMs - elapsed;
} }
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum)); return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
} }
@@ -698,14 +701,15 @@ public class TopicServiceImpl implements TopicService {
: value : value
); );
} }
if (System.currentTimeMillis() - timestamp > timeout
|| dataList.size() >= maxMsgNum) {
break;
}
Thread.sleep(10); Thread.sleep(10);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e); LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
} }
if (System.currentTimeMillis() - timestamp > timeout || dataList.size() >= maxMsgNum) {
// 超时或者是数据已采集足够时, 直接返回
break;
}
} }
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum)); return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
} }