mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-04 11:52:07 +08:00
@@ -1,5 +1,6 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.topic;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
|
||||
|
||||
@@ -22,25 +22,26 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.PartitionConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.config.TopicConfig;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -160,8 +161,31 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
}
|
||||
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
|
||||
kafkaConsumer.assign(partitionList);
|
||||
|
||||
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
|
||||
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
|
||||
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
||||
partitionList.forEach(topicPartition -> {
|
||||
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
|
||||
});
|
||||
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
|
||||
}
|
||||
|
||||
for (TopicPartition partition : partitionList) {
|
||||
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
|
||||
if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到最旧
|
||||
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
|
||||
} else if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到指定时间
|
||||
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
|
||||
} else if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
|
||||
// 重置到指定位置
|
||||
|
||||
} else {
|
||||
// 默认,重置到最新
|
||||
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
|
||||
}
|
||||
}
|
||||
|
||||
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
|
||||
@@ -185,6 +209,15 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
||||
}
|
||||
}
|
||||
|
||||
// 排序
|
||||
if (ObjectUtils.isNotEmpty(voList)) {
|
||||
// 默认按时间倒序排序
|
||||
if (StringUtils.isBlank(dto.getSortType())) {
|
||||
dto.setSortType(SortTypeEnum.DESC.getSortType());
|
||||
}
|
||||
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
|
||||
}
|
||||
|
||||
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
|
||||
} catch (Exception e) {
|
||||
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);
|
||||
|
||||
Reference in New Issue
Block a user