采样调整

This commit is contained in:
zengqiao
2022-08-25 20:30:15 +08:00
parent 7f251679fa
commit 5262ae8907
3 changed files with 8 additions and 7 deletions

View File

@@ -142,12 +142,13 @@ public class TopicStateManagerImpl implements TopicStateManager {
// 创建kafka-consumer
kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()));
kafkaConsumer.assign(endOffsetsMapResult.getData().keySet());
for (Map.Entry<TopicPartition, Long> entry: endOffsetsMapResult.getData().entrySet()) {
kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords()));
}
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时如果这里不减去则可能会导致poll之后超过要求的时间
while (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS <= dto.getPullTimeoutUnitMs() && voList.size() < dto.getMaxRecords()) {
for (Map.Entry<TopicPartition, Long> entry: endOffsetsMapResult.getData().entrySet()) {
kafkaConsumer.assign(Arrays.asList(entry.getKey()));
kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords()));
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
@@ -165,7 +166,6 @@ public class TopicStateManagerImpl implements TopicStateManager {
|| voList.size() > dto.getMaxRecords()) {
break;
}
}
}
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));