mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
采样优化
This commit is contained in:
@@ -129,7 +129,12 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
|
||||
}
|
||||
|
||||
// 获取分区offset
|
||||
// 获取分区beginOffset
|
||||
Result<Map<TopicPartition, Long>> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.earliest(), null);
|
||||
if (beginOffsetsMapResult.failed()) {
|
||||
return Result.buildFromIgnoreData(beginOffsetsMapResult);
|
||||
}
|
||||
// 获取分区endOffset
|
||||
Result<Map<TopicPartition, Long>> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.latest(), null);
|
||||
if (endOffsetsMapResult.failed()) {
|
||||
return Result.buildFromIgnoreData(endOffsetsMapResult);
|
||||
@@ -142,13 +147,25 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
// 创建kafka-consumer
|
||||
kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()));
|
||||
|
||||
kafkaConsumer.assign(endOffsetsMapResult.getData().keySet());
|
||||
for (Map.Entry<TopicPartition, Long> entry: endOffsetsMapResult.getData().entrySet()) {
|
||||
kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords()));
|
||||
List<TopicPartition> partitionList = new ArrayList<>();
|
||||
long maxMessage = 0;
|
||||
for (Map.Entry<TopicPartition, Long> entry : endOffsetsMapResult.getData().entrySet()) {
|
||||
long begin = beginOffsetsMapResult.getData().get(entry.getKey());
|
||||
long end = entry.getValue();
|
||||
if (begin == end){
|
||||
continue;
|
||||
}
|
||||
maxMessage += end - begin;
|
||||
partitionList.add(entry.getKey());
|
||||
}
|
||||
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
|
||||
kafkaConsumer.assign(partitionList);
|
||||
for (TopicPartition partition : partitionList) {
|
||||
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
|
||||
}
|
||||
|
||||
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
|
||||
while (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS <= dto.getPullTimeoutUnitMs() && voList.size() < dto.getMaxRecords()) {
|
||||
while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) {
|
||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
|
||||
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
|
||||
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
|
||||
|
||||
Reference in New Issue
Block a user