From 0e49002f426d102eda69965c9676b1cf7f99b043 Mon Sep 17 00:00:00 2001 From: superspeedone <525390802@qq.com> Date: Fri, 9 Sep 2022 15:45:31 +0800 Subject: [PATCH] =?UTF-8?q?Topic=E6=B6=88=E6=81=AF=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=94=AF=E6=8C=81Timestamp=E6=8E=92=E5=BA=8F=EF=BC=8C=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E6=94=AF=E6=8C=81=E6=8C=89=E6=8C=87=E5=AE=9A=E6=97=A5?= =?UTF-8?q?=E6=9C=9F=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/biz/topic/TopicStateManager.java | 2 +- .../biz/topic/impl/TopicStateManagerImpl.java | 29 ++++++++++++++----- .../common/bean/dto/topic/TopicRecordDTO.java | 9 +++++- .../src/pages/TopicDetail/Messages.tsx | 23 ++++++++++----- .../src/pages/TopicDetail/config.tsx | 3 +- .../api/v3/topic/TopicStateController.java | 5 ++-- 6 files changed, 50 insertions(+), 21 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java index e467253c..ec3a3207 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java @@ -16,7 +16,7 @@ import java.util.List; public interface TopicStateManager { TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException; - Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException; + Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException; Result getTopicState(Long clusterPhyId, String topicName); diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 092fb571..0fb368e0 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -3,7 +3,6 @@ package com.xiaojukeji.know.streaming.km.biz.topic.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; -import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; @@ -40,10 +39,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; import org.apache.commons.lang3.ObjectUtils; import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.springframework.beans.factory.annotation.Autowired; @@ -123,7 +119,7 @@ public class TopicStateManagerImpl implements TopicStateManager { } @Override - public Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException { + public Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException { long startTime = System.currentTimeMillis(); // 获取集群 @@ -163,10 +159,29 @@ public class TopicStateManagerImpl implements TopicStateManager { } maxMessage = Math.min(maxMessage, dto.getMaxRecords()); kafkaConsumer.assign(partitionList); + + Map partitionOffsetAndTimestampMap = new HashMap<>(); + // 获取指定时间每个分区的offset(按指定开始时间查询消息时) + if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + Map timestampsToSearch = new HashMap<>(); + partitionList.forEach(topicPartition -> { + timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()); + }); + partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch); + } + for (TopicPartition partition : partitionList) { if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { + // 重置到最旧 kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); + } else if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + // 重置到指定时间 + kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset()); + } else if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) { + // 重置到指定位置 + } else { + // 默认,重置到最新 kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); } } @@ -194,7 +209,7 @@ public class TopicStateManagerImpl implements TopicStateManager { // 排序 if (ObjectUtils.isNotEmpty(voList)) { - PaginationUtil.pageBySort(voList, sortDto.getSortField(), sortDto.getSortField()); + PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType()); } return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java index 0a83ef01..74e5611b 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java @@ -2,6 +2,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.topic; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @@ -15,7 +16,7 @@ import javax.validation.constraints.NotNull; @Data @JsonIgnoreProperties(ignoreUnknown = true) @ApiModel(description = "Topic记录") -public class TopicRecordDTO extends BaseDTO { +public class TopicRecordDTO extends PaginationSortDTO { @NotNull(message = "truncate不允许为空") @ApiModelProperty(value = "是否截断", example = "true") private Boolean truncate; @@ -38,4 +39,10 @@ public class TopicRecordDTO extends BaseDTO { @ApiModelProperty(value = "offset", example = "") private Integer filterOffsetReset = 0; + @ApiModelProperty(value = "开始日期时间戳", example = "") + private Long startTimestampUnitMs; + + @ApiModelProperty(value = "结束日期时间戳", example = "") + private Long utilTimestampUnitMs; + } diff --git a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx index 220781f0..6862596e 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx @@ -32,8 +32,8 @@ const TopicMessages = (props: any) => { // 获取消息开始位置 const offsetResetList = [ - { 'label': 'latest', value: '0' }, - { 'label': 'earliest', value: '1' } + { 'label': 'latest', value: 0 }, + { 'label': 'earliest', value: 1 } ]; // 默认排序 @@ -42,6 +42,8 @@ const TopicMessages = (props: any) => { sortType: 'desc', }; + const [sorter, setSorter] = useState(defaultSorter); + // 请求接口获取数据 const genData = async () => { if (urlParams?.clusterId === undefined || hashData?.topicName === undefined) return; @@ -56,7 +58,7 @@ const TopicMessages = (props: any) => { }); setPartitionIdList(newPartitionIdList || []); }); - request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...defaultSorter }, method: 'POST' }) + request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...sorter }, method: 'POST' }) .then((res: any) => { // setPagination({ // current: res.pagination?.pageNo, @@ -94,11 +96,16 @@ const TopicMessages = (props: any) => { history.push(`/cluster/${urlParams?.clusterId}/testing/consumer`); }; - const onTableChange = (pagination: any, filters: any, sorter: any) => { - defaultSorter.sortField = sorter.field || ''; - defaultSorter.sortType = sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : ''; + const onTableChange = (pagination: any, filters: any, sorter: any, extra: any) => { setPagination(pagination); - genData(); + // 只有排序事件时,触发重新请求后端数据 + if(extra.action === 'sort') { + setSorter({ + sortField: sorter.field || '', + sortType: sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : '' + }); + genData(); + } // const asc = sorter?.order && sorter?.order === 'ascend' ? true : false; // const sortColumn = sorter.field && toLine(sorter.field); // genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams }); @@ -106,7 +113,7 @@ const TopicMessages = (props: any) => { useEffect(() => { props.positionType === 'Messages' && genData(); - }, [props, params]); + }, [props, params, sorter]); return ( <> diff --git a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx index 5bb75eb0..085ad046 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx @@ -85,7 +85,8 @@ export const getTopicMessagesColmns = () => { title: 'Timestamp', dataIndex: 'timestampUnitMs', key: 'timestampUnitMs', - render: (t: number) => (t ? moment(t).format(timeFormat) : '-'), + sorter: true, + render: (t: number) => (t ? moment(t).format(timeFormat) + '.' + moment(t).millisecond() : '-'), }, { title: 'Key', diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java index 808c7a32..d1e09e66 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java @@ -92,9 +92,8 @@ public class TopicStateController { @ResponseBody public Result> getTopicMessages(@PathVariable Long clusterPhyId, @PathVariable String topicName, - @Validated @RequestBody TopicRecordDTO dto, - @Validated PaginationSortDTO sortDto) throws Exception { - return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto, sortDto); + @Validated @RequestBody TopicRecordDTO dto) throws Exception { + return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto); } @ApiOperation(value = "Topic-ACL信息", notes = "")