Topic消息查询支持Timestamp排序,支持查询最新消息或最早消息 #534

This commit is contained in:
yanweiwen
2022-09-05 14:46:40 +08:00
parent 47065c8042
commit fa7ad64140
4 changed files with 56 additions and 2 deletions

View File

@@ -36,6 +36,7 @@ import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -161,7 +162,11 @@ public class TopicStateManagerImpl implements TopicStateManager {
maxMessage = Math.min(maxMessage, dto.getMaxRecords()); maxMessage = Math.min(maxMessage, dto.getMaxRecords());
kafkaConsumer.assign(partitionList); kafkaConsumer.assign(partitionList);
for (TopicPartition partition : partitionList) { for (TopicPartition partition : partitionList) {
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); if (Constant.EARLIEST.equals(dto.getFilterOffsetReset())) {
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
} else {
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
}
} }
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时如果这里不减去则可能会导致poll之后超过要求的时间 // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时如果这里不减去则可能会导致poll之后超过要求的时间
@@ -185,6 +190,15 @@ public class TopicStateManagerImpl implements TopicStateManager {
} }
} }
// 排序
if (ObjectUtils.isNotEmpty(voList)) {
if (Constant.ASC.equals(dto.getSortType())) {
voList.sort((o1, o2) -> (int) (o1.getTimestampUnitMs() - o2.getTimestampUnitMs()));
} else if (Constant.DESC.equals(dto.getSortType())) {
voList.sort((o1, o2) -> (int) (o2.getTimestampUnitMs() - o1.getTimestampUnitMs()));
}
}
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
} catch (Exception e) { } catch (Exception e) {
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e); log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);

View File

@@ -34,4 +34,11 @@ public class TopicRecordDTO extends BaseDTO {
@ApiModelProperty(value = "预览超时时间", example = "10000") @ApiModelProperty(value = "预览超时时间", example = "10000")
private Long pullTimeoutUnitMs = 8000L; private Long pullTimeoutUnitMs = 8000L;
@ApiModelProperty(value = "排序", example = "desc")
private String sortType;
@ApiModelProperty(value = "offset", example = "latest")
private String filterOffsetReset;
} }

View File

@@ -64,4 +64,17 @@ public class Constant {
public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F; public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F;
public static final Integer DEFAULT_RETRY_TIME = 3; public static final Integer DEFAULT_RETRY_TIME = 3;
/**
* 排序
*/
public static final String ASC = "asc";
public static final String DESC = "desc";
/**
* 消费策略
*/
public static final String LATEST = "latest";
public static final String EARLIEST = "earliest";
} }

View File

@@ -10,6 +10,7 @@ const defaultParams: any = {
maxRecords: 100, maxRecords: 100,
pullTimeoutUnitMs: 5000, pullTimeoutUnitMs: 5000,
// filterPartitionId: 1, // filterPartitionId: 1,
filterOffsetReset: 'latest'
}; };
const defaultpaPagination = { const defaultpaPagination = {
current: 1, current: 1,
@@ -29,6 +30,12 @@ const TopicMessages = (props: any) => {
const [pagination, setPagination] = useState<any>(defaultpaPagination); const [pagination, setPagination] = useState<any>(defaultpaPagination);
const [form] = Form.useForm(); const [form] = Form.useForm();
// 获取消息开始位置
const offsetResetList = [
{ 'label': 'latest', value: 'latest' },
{ 'label': 'earliest', value: 'earliest' }
];
// 默认排序 // 默认排序
const defaultSorter = { const defaultSorter = {
sortField: 'timestampUnitMs', sortField: 'timestampUnitMs',
@@ -88,7 +95,10 @@ const TopicMessages = (props: any) => {
}; };
const onTableChange = (pagination: any, filters: any, sorter: any) => { const onTableChange = (pagination: any, filters: any, sorter: any) => {
defaultSorter.sortField = sorter.field || '';
defaultSorter.sortType = sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : '';
setPagination(pagination); setPagination(pagination);
genData();
// const asc = sorter?.order && sorter?.order === 'ascend' ? true : false; // const asc = sorter?.order && sorter?.order === 'ascend' ? true : false;
// const sortColumn = sorter.field && toLine(sorter.field); // const sortColumn = sorter.field && toLine(sorter.field);
// genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams }); // genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams });
@@ -119,6 +129,15 @@ const TopicMessages = (props: any) => {
</div> </div>
<div className="messages-query"> <div className="messages-query">
<Form form={form} layout="inline" onFinish={onFinish}> <Form form={form} layout="inline" onFinish={onFinish}>
<Form.Item name="filterOffsetReset">
<Select
options={offsetResetList}
size="small"
style={{ width: '120px' }}
className={'detail-table-select'}
placeholder="请选择offset"
/>
</Form.Item>
<Form.Item name="filterPartitionId"> <Form.Item name="filterPartitionId">
<Select <Select
options={partitionIdList} options={partitionIdList}
@@ -158,7 +177,7 @@ const TopicMessages = (props: any) => {
showQueryForm={false} showQueryForm={false}
tableProps={{ tableProps={{
showHeader: false, showHeader: false,
rowKey: 'path', rowKey: 'offset',
loading: loading, loading: loading,
columns: getTopicMessagesColmns(), columns: getTopicMessagesColmns(),
dataSource: data, dataSource: data,
@@ -169,6 +188,7 @@ const TopicMessages = (props: any) => {
bordered: false, bordered: false,
onChange: onTableChange, onChange: onTableChange,
scroll: { x: 'max-content' }, scroll: { x: 'max-content' },
sortDirections: ['descend', 'ascend', 'default']
}, },
}} }}
/> />