Topic消息查询支持Timestamp排序,支持查询最新消息或最早消息 #534

This commit is contained in:
superspeedone
2022-09-07 11:17:59 +08:00
parent 0f35427645
commit 8e50d145d5
6 changed files with 18 additions and 32 deletions

View File

@@ -1,5 +1,6 @@
package com.xiaojukeji.know.streaming.km.biz.topic; package com.xiaojukeji.know.streaming.km.biz.topic;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
@@ -15,7 +16,7 @@ import java.util.List;
public interface TopicStateManager { public interface TopicStateManager {
TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException; TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException;
Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException; Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException;
Result<TopicStateVO> getTopicState(Long clusterPhyId, String topicName); Result<TopicStateVO> getTopicState(Long clusterPhyId, String topicName);

View File

@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.biz.topic.impl;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
@@ -22,17 +23,18 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.converter.PartitionConverter;
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter; import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
@@ -121,7 +123,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
} }
@Override @Override
public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException { public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
// 获取集群 // 获取集群
@@ -162,7 +164,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
maxMessage = Math.min(maxMessage, dto.getMaxRecords()); maxMessage = Math.min(maxMessage, dto.getMaxRecords());
kafkaConsumer.assign(partitionList); kafkaConsumer.assign(partitionList);
for (TopicPartition partition : partitionList) { for (TopicPartition partition : partitionList) {
if (Constant.EARLIEST.equals(dto.getFilterOffsetReset())) { if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
} else { } else {
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
@@ -192,11 +194,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
// 排序 // 排序
if (ObjectUtils.isNotEmpty(voList)) { if (ObjectUtils.isNotEmpty(voList)) {
if (Constant.ASC.equals(dto.getSortType())) { PaginationUtil.pageBySort(voList, sortDto.getSortField(), sortDto.getSortField());
voList.sort((o1, o2) -> (int) (o1.getTimestampUnitMs() - o2.getTimestampUnitMs()));
} else if (Constant.DESC.equals(dto.getSortType())) {
voList.sort((o1, o2) -> (int) (o2.getTimestampUnitMs() - o1.getTimestampUnitMs()));
}
} }
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));

View File

@@ -35,10 +35,7 @@ public class TopicRecordDTO extends BaseDTO {
@ApiModelProperty(value = "预览超时时间", example = "10000") @ApiModelProperty(value = "预览超时时间", example = "10000")
private Long pullTimeoutUnitMs = 8000L; private Long pullTimeoutUnitMs = 8000L;
@ApiModelProperty(value = "排序", example = "desc") @ApiModelProperty(value = "offset", example = "")
private String sortType; private Integer filterOffsetReset = 0;
@ApiModelProperty(value = "offset", example = "latest")
private String filterOffsetReset;
} }

View File

@@ -65,16 +65,4 @@ public class Constant {
public static final Integer DEFAULT_RETRY_TIME = 3; public static final Integer DEFAULT_RETRY_TIME = 3;
/**
* 排序
*/
public static final String ASC = "asc";
public static final String DESC = "desc";
/**
* 消费策略
*/
public static final String LATEST = "latest";
public static final String EARLIEST = "earliest";
} }

View File

@@ -10,7 +10,7 @@ const defaultParams: any = {
maxRecords: 100, maxRecords: 100,
pullTimeoutUnitMs: 5000, pullTimeoutUnitMs: 5000,
// filterPartitionId: 1, // filterPartitionId: 1,
filterOffsetReset: 'latest' filterOffsetReset: 0
}; };
const defaultpaPagination = { const defaultpaPagination = {
current: 1, current: 1,
@@ -32,8 +32,8 @@ const TopicMessages = (props: any) => {
// 获取消息开始位置 // 获取消息开始位置
const offsetResetList = [ const offsetResetList = [
{ 'label': 'latest', value: 'latest' }, { 'label': 'latest', value: '0' },
{ 'label': 'earliest', value: 'earliest' } { 'label': 'earliest', value: '1' }
]; ];
// 默认排序 // 默认排序

View File

@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.topic;
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
@@ -91,8 +92,9 @@ public class TopicStateController {
@ResponseBody @ResponseBody
public Result<List<TopicRecordVO>> getTopicMessages(@PathVariable Long clusterPhyId, public Result<List<TopicRecordVO>> getTopicMessages(@PathVariable Long clusterPhyId,
@PathVariable String topicName, @PathVariable String topicName,
@Validated @RequestBody TopicRecordDTO dto) throws Exception { @Validated @RequestBody TopicRecordDTO dto,
return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto); @Validated PaginationSortDTO sortDto) throws Exception {
return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto, sortDto);
} }
@ApiOperation(value = "Topic-ACL信息", notes = "") @ApiOperation(value = "Topic-ACL信息", notes = "")