diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index a0418bb2..59e91b1c 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -36,6 +36,7 @@ import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; +import org.apache.commons.lang3.ObjectUtils; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -161,7 +162,11 @@ public class TopicStateManagerImpl implements TopicStateManager { maxMessage = Math.min(maxMessage, dto.getMaxRecords()); kafkaConsumer.assign(partitionList); for (TopicPartition partition : partitionList) { - kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); + if (Constant.EARLIEST.equals(dto.getFilterOffsetReset())) { + kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); + } else { + kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); + } } // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间 @@ -185,6 +190,15 @@ public class TopicStateManagerImpl implements TopicStateManager { } } + // 排序 + if (ObjectUtils.isNotEmpty(voList)) { + if (Constant.ASC.equals(dto.getSortType())) { + voList.sort((o1, o2) -> (int) (o1.getTimestampUnitMs() - o2.getTimestampUnitMs())); + } else if (Constant.DESC.equals(dto.getSortType())) { + voList.sort((o1, o2) -> (int) (o2.getTimestampUnitMs() - o1.getTimestampUnitMs())); + } + } + return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); } catch (Exception e) { log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java index b16f2e52..972e2492 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java @@ -34,4 +34,11 @@ public class TopicRecordDTO extends BaseDTO { @ApiModelProperty(value = "预览超时时间", example = "10000") private Long pullTimeoutUnitMs = 8000L; + + @ApiModelProperty(value = "排序", example = "desc") + private String sortType; + + @ApiModelProperty(value = "offset", example = "latest") + private String filterOffsetReset; + } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index fae5db21..2ef958b0 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -64,4 +64,17 @@ public class Constant { public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F; public static final Integer DEFAULT_RETRY_TIME = 3; + + /** + * 排序 + */ + public static final String ASC = "asc"; + public static final String DESC = "desc"; + + /** + * 消费策略 + */ + public static final String LATEST = "latest"; + public static final String EARLIEST = "earliest"; + } diff --git a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx index 4771e0c7..4e7fa8ff 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx @@ -10,6 +10,7 @@ const defaultParams: any = { maxRecords: 100, pullTimeoutUnitMs: 5000, // filterPartitionId: 1, + filterOffsetReset: 'latest' }; const defaultpaPagination = { current: 1, @@ -29,6 +30,12 @@ const TopicMessages = (props: any) => { const [pagination, setPagination] = useState(defaultpaPagination); const [form] = Form.useForm(); + // 获取消息开始位置 + const offsetResetList = [ + { 'label': 'latest', value: 'latest' }, + { 'label': 'earliest', value: 'earliest' } + ]; + // 默认排序 const defaultSorter = { sortField: 'timestampUnitMs', @@ -88,7 +95,10 @@ const TopicMessages = (props: any) => { }; const onTableChange = (pagination: any, filters: any, sorter: any) => { + defaultSorter.sortField = sorter.field || ''; + defaultSorter.sortType = sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : ''; setPagination(pagination); + genData(); // const asc = sorter?.order && sorter?.order === 'ascend' ? true : false; // const sortColumn = sorter.field && toLine(sorter.field); // genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams }); @@ -119,6 +129,15 @@ const TopicMessages = (props: any) => {
+ + { showQueryForm={false} tableProps={{ showHeader: false, - rowKey: 'path', + rowKey: 'offset', loading: loading, columns: getTopicMessagesColmns(), dataSource: data, @@ -169,6 +188,7 @@ const TopicMessages = (props: any) => { bordered: false, onChange: onTableChange, scroll: { x: 'max-content' }, + sortDirections: ['descend', 'ascend', 'default'] }, }} />