diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index 15aafdb5..1095d5ee 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedD import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; -import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum; +import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; @@ -199,12 +199,12 @@ public class GroupManagerImpl implements GroupManager { return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(dto.getClusterId(), dto.getTopicName())); } - if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType() + if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType() && ValidateUtils.isEmptyList(dto.getOffsetList())) { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定offset重置需传offset信息"); } - if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType() + if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType() && ValidateUtils.isNull(dto.getTimestamp())) { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定时间重置需传时间信息"); } @@ -213,7 +213,7 @@ public class GroupManagerImpl implements GroupManager { } private Result> getPartitionOffset(GroupOffsetResetDTO dto) { - if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) { + if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) { return Result.buildSuc(dto.getOffsetList().stream().collect(Collectors.toMap( elem -> new TopicPartition(dto.getTopicName(), elem.getPartitionId()), PartitionOffsetDTO::getOffset, @@ -222,9 +222,9 @@ public class GroupManagerImpl implements GroupManager { } OffsetSpec offsetSpec = null; - if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) { + if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) { offsetSpec = OffsetSpec.forTimestamp(dto.getTimestamp()); - } else if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getResetType()) { + } else if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getResetType()) { offsetSpec = OffsetSpec.earliest(); } else { offsetSpec = OffsetSpec.latest(); diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 1afb1210..9c03737a 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -23,7 +23,7 @@ import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter; -import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum; +import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; @@ -164,7 +164,7 @@ public class TopicStateManagerImpl implements TopicStateManager { Map partitionOffsetAndTimestampMap = new HashMap<>(); // 获取指定时间每个分区的offset(按指定开始时间查询消息时) - if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { Map timestampsToSearch = new HashMap<>(); partitionList.forEach(topicPartition -> { timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()); @@ -173,13 +173,13 @@ public class TopicStateManagerImpl implements TopicStateManager { } for (TopicPartition partition : partitionList) { - if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { + if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { // 重置到最旧 kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); - } else if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + } else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { // 重置到指定时间 kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset()); - } else if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) { + } else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) { // 重置到指定位置 } else { diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java index 09d12bcd..378709dd 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.group; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.ClusterTopicDTO; +import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @@ -23,7 +24,7 @@ public class GroupOffsetResetDTO extends ClusterTopicDTO { private String groupName; /** - * @see com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum + * @see OffsetTypeEnum */ @NotNull(message = "resetType不允许为空") @ApiModelProperty(value = "重置方式", example = "1") diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java index 74e5611b..ca625f01 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java @@ -1,8 +1,8 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.topic; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; +import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @@ -36,13 +36,12 @@ public class TopicRecordDTO extends PaginationSortDTO { @ApiModelProperty(value = "预览超时时间", example = "10000") private Long pullTimeoutUnitMs = 8000L; + /** + * @see OffsetTypeEnum + */ @ApiModelProperty(value = "offset", example = "") private Integer filterOffsetReset = 0; @ApiModelProperty(value = "开始日期时间戳", example = "") private Long startTimestampUnitMs; - - @ApiModelProperty(value = "结束日期时间戳", example = "") - private Long utilTimestampUnitMs; - } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/GroupOffsetResetEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/OffsetTypeEnum.java similarity index 50% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/GroupOffsetResetEnum.java rename to km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/OffsetTypeEnum.java index 178fb90e..6bdc8b80 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/GroupOffsetResetEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/OffsetTypeEnum.java @@ -3,19 +3,19 @@ package com.xiaojukeji.know.streaming.km.common.enums; import lombok.Getter; /** - * 重置offset + * offset类型 * @author zengqiao * @date 19/4/8 */ @Getter -public enum GroupOffsetResetEnum { - LATEST(0, "重置到最新"), +public enum OffsetTypeEnum { + LATEST(0, "最新"), - EARLIEST(1, "重置到最旧"), + EARLIEST(1, "最旧"), - PRECISE_TIMESTAMP(2, "按时间进行重置"), + PRECISE_TIMESTAMP(2, "指定时间"), - PRECISE_OFFSET(3, "重置到指定位置"), + PRECISE_OFFSET(3, "指定位置"), ; @@ -23,7 +23,7 @@ public enum GroupOffsetResetEnum { private final String message; - GroupOffsetResetEnum(int resetType, String message) { + OffsetTypeEnum(int resetType, String message) { this.resetType = resetType; this.message = message; }