Topic消息查询支持Timestamp排序,接口支持按指定日期查询

This commit is contained in:
superspeedone
2022-09-09 15:45:31 +08:00
parent 8e50d145d5
commit 0e49002f42
6 changed files with 50 additions and 21 deletions

View File

@@ -16,7 +16,7 @@ import java.util.List;
public interface TopicStateManager {
TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException;
Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException;
Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException;
Result<TopicStateVO> getTopicState(Long clusterPhyId, String topicName);

View File

@@ -3,7 +3,6 @@ package com.xiaojukeji.know.streaming.km.biz.topic.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
@@ -40,10 +39,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.beans.factory.annotation.Autowired;
@@ -123,7 +119,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
}
@Override
public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException {
public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException {
long startTime = System.currentTimeMillis();
// 获取集群
@@ -163,10 +159,29 @@ public class TopicStateManagerImpl implements TopicStateManager {
}
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
kafkaConsumer.assign(partitionList);
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
// 获取指定时间每个分区的offset按指定开始时间查询消息时
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
partitionList.forEach(topicPartition -> {
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
});
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
}
for (TopicPartition partition : partitionList) {
if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
// 重置到最旧
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
} else if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
// 重置到指定时间
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
} else if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
// 重置到指定位置
} else {
// 默认,重置到最新
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
}
}
@@ -194,7 +209,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
// 排序
if (ObjectUtils.isNotEmpty(voList)) {
PaginationUtil.pageBySort(voList, sortDto.getSortField(), sortDto.getSortField());
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
}
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));