From 8e50d145d57fb054b61af49d31c033119529e069 Mon Sep 17 00:00:00 2001 From: superspeedone <525390802@qq.com> Date: Wed, 7 Sep 2022 11:17:59 +0800 Subject: [PATCH] =?UTF-8?q?Topic=E6=B6=88=E6=81=AF=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=94=AF=E6=8C=81Timestamp=E6=8E=92=E5=BA=8F=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=9F=A5=E8=AF=A2=E6=9C=80=E6=96=B0=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=88=96=E6=9C=80=E6=97=A9=E6=B6=88=E6=81=AF=20#534?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/biz/topic/TopicStateManager.java | 3 ++- .../km/biz/topic/impl/TopicStateManagerImpl.java | 16 +++++++--------- .../km/common/bean/dto/topic/TopicRecordDTO.java | 7 ++----- .../streaming/km/common/constant/Constant.java | 12 ------------ .../src/pages/TopicDetail/Messages.tsx | 6 +++--- .../rest/api/v3/topic/TopicStateController.java | 6 ++++-- 6 files changed, 18 insertions(+), 32 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java index 0e8436d9..e467253c 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java @@ -1,5 +1,6 @@ package com.xiaojukeji.know.streaming.km.biz.topic; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; @@ -15,7 +16,7 @@ import java.util.List; public interface TopicStateManager { TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException; - Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException; + Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException; Result getTopicState(Long clusterPhyId, String topicName); diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 59e91b1c..092fb571 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.biz.topic.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; @@ -22,17 +23,18 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; -import com.xiaojukeji.know.streaming.km.common.converter.PartitionConverter; import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter; +import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; -import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; +import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; @@ -121,7 +123,7 @@ public class TopicStateManagerImpl implements TopicStateManager { } @Override - public Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException { + public Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException { long startTime = System.currentTimeMillis(); // 获取集群 @@ -162,7 +164,7 @@ public class TopicStateManagerImpl implements TopicStateManager { maxMessage = Math.min(maxMessage, dto.getMaxRecords()); kafkaConsumer.assign(partitionList); for (TopicPartition partition : partitionList) { - if (Constant.EARLIEST.equals(dto.getFilterOffsetReset())) { + if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); } else { kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); @@ -192,11 +194,7 @@ public class TopicStateManagerImpl implements TopicStateManager { // 排序 if (ObjectUtils.isNotEmpty(voList)) { - if (Constant.ASC.equals(dto.getSortType())) { - voList.sort((o1, o2) -> (int) (o1.getTimestampUnitMs() - o2.getTimestampUnitMs())); - } else if (Constant.DESC.equals(dto.getSortType())) { - voList.sort((o1, o2) -> (int) (o2.getTimestampUnitMs() - o1.getTimestampUnitMs())); - } + PaginationUtil.pageBySort(voList, sortDto.getSortField(), sortDto.getSortField()); } return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java index 972e2492..0a83ef01 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java @@ -35,10 +35,7 @@ public class TopicRecordDTO extends BaseDTO { @ApiModelProperty(value = "预览超时时间", example = "10000") private Long pullTimeoutUnitMs = 8000L; - @ApiModelProperty(value = "排序", example = "desc") - private String sortType; - - @ApiModelProperty(value = "offset", example = "latest") - private String filterOffsetReset; + @ApiModelProperty(value = "offset", example = "") + private Integer filterOffsetReset = 0; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index 2ef958b0..36575938 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -65,16 +65,4 @@ public class Constant { public static final Integer DEFAULT_RETRY_TIME = 3; - /** - * 排序 - */ - public static final String ASC = "asc"; - public static final String DESC = "desc"; - - /** - * 消费策略 - */ - public static final String LATEST = "latest"; - public static final String EARLIEST = "earliest"; - } diff --git a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx index 4e7fa8ff..220781f0 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx @@ -10,7 +10,7 @@ const defaultParams: any = { maxRecords: 100, pullTimeoutUnitMs: 5000, // filterPartitionId: 1, - filterOffsetReset: 'latest' + filterOffsetReset: 0 }; const defaultpaPagination = { current: 1, @@ -32,8 +32,8 @@ const TopicMessages = (props: any) => { // 获取消息开始位置 const offsetResetList = [ - { 'label': 'latest', value: 'latest' }, - { 'label': 'earliest', value: 'earliest' } + { 'label': 'latest', value: '0' }, + { 'label': 'earliest', value: '1' } ]; // 默认排序 diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java index 52a279e9..808c7a32 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.topic; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; @@ -91,8 +92,9 @@ public class TopicStateController { @ResponseBody public Result> getTopicMessages(@PathVariable Long clusterPhyId, @PathVariable String topicName, - @Validated @RequestBody TopicRecordDTO dto) throws Exception { - return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto); + @Validated @RequestBody TopicRecordDTO dto, + @Validated PaginationSortDTO sortDto) throws Exception { + return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto, sortDto); } @ApiOperation(value = "Topic-ACL信息", notes = "")