From fa7ad6414083b7a5bfcdd62ce1921bab11f139cc Mon Sep 17 00:00:00 2001 From: yanweiwen <525390802@qq.com> Date: Mon, 5 Sep 2022 14:46:40 +0800 Subject: [PATCH 01/15] =?UTF-8?q?Topic=E6=B6=88=E6=81=AF=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=94=AF=E6=8C=81Timestamp=E6=8E=92=E5=BA=8F=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=9F=A5=E8=AF=A2=E6=9C=80=E6=96=B0=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=88=96=E6=9C=80=E6=97=A9=E6=B6=88=E6=81=AF=20#534?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../biz/topic/impl/TopicStateManagerImpl.java | 16 +++++++++++++- .../common/bean/dto/topic/TopicRecordDTO.java | 7 ++++++ .../km/common/constant/Constant.java | 13 +++++++++++ .../src/pages/TopicDetail/Messages.tsx | 22 ++++++++++++++++++- 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index a0418bb2..59e91b1c 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -36,6 +36,7 @@ import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; +import org.apache.commons.lang3.ObjectUtils; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -161,7 +162,11 @@ public class TopicStateManagerImpl implements TopicStateManager { maxMessage = Math.min(maxMessage, dto.getMaxRecords()); kafkaConsumer.assign(partitionList); for (TopicPartition partition : partitionList) { - kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); + if (Constant.EARLIEST.equals(dto.getFilterOffsetReset())) { + kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); + } else { + kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); + } } // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间 @@ -185,6 +190,15 @@ public class TopicStateManagerImpl implements TopicStateManager { } } + // 排序 + if (ObjectUtils.isNotEmpty(voList)) { + if (Constant.ASC.equals(dto.getSortType())) { + voList.sort((o1, o2) -> (int) (o1.getTimestampUnitMs() - o2.getTimestampUnitMs())); + } else if (Constant.DESC.equals(dto.getSortType())) { + voList.sort((o1, o2) -> (int) (o2.getTimestampUnitMs() - o1.getTimestampUnitMs())); + } + } + return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); } catch (Exception e) { log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java index b16f2e52..972e2492 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java @@ -34,4 +34,11 @@ public class TopicRecordDTO extends BaseDTO { @ApiModelProperty(value = "预览超时时间", example = "10000") private Long pullTimeoutUnitMs = 8000L; + + @ApiModelProperty(value = "排序", example = "desc") + private String sortType; + + @ApiModelProperty(value = "offset", example = "latest") + private String filterOffsetReset; + } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index fae5db21..2ef958b0 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -64,4 +64,17 @@ public class Constant { public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F; public static final Integer DEFAULT_RETRY_TIME = 3; + + /** + * 排序 + */ + public static final String ASC = "asc"; + public static final String DESC = "desc"; + + /** + * 消费策略 + */ + public static final String LATEST = "latest"; + public static final String EARLIEST = "earliest"; + } diff --git a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx index 4771e0c7..4e7fa8ff 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx @@ -10,6 +10,7 @@ const defaultParams: any = { maxRecords: 100, pullTimeoutUnitMs: 5000, // filterPartitionId: 1, + filterOffsetReset: 'latest' }; const defaultpaPagination = { current: 1, @@ -29,6 +30,12 @@ const TopicMessages = (props: any) => { const [pagination, setPagination] = useState(defaultpaPagination); const [form] = Form.useForm(); + // 获取消息开始位置 + const offsetResetList = [ + { 'label': 'latest', value: 'latest' }, + { 'label': 'earliest', value: 'earliest' } + ]; + // 默认排序 const defaultSorter = { sortField: 'timestampUnitMs', @@ -88,7 +95,10 @@ const TopicMessages = (props: any) => { }; const onTableChange = (pagination: any, filters: any, sorter: any) => { + defaultSorter.sortField = sorter.field || ''; + defaultSorter.sortType = sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : ''; setPagination(pagination); + genData(); // const asc = sorter?.order && sorter?.order === 'ascend' ? true : false; // const sortColumn = sorter.field && toLine(sorter.field); // genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams }); @@ -119,6 +129,15 @@ const TopicMessages = (props: any) => {
+ + { showQueryForm={false} tableProps={{ showHeader: false, - rowKey: 'path', + rowKey: 'offset', loading: loading, columns: getTopicMessagesColmns(), dataSource: data, @@ -169,6 +188,7 @@ const TopicMessages = (props: any) => { bordered: false, onChange: onTableChange, scroll: { x: 'max-content' }, + sortDirections: ['descend', 'ascend', 'default'] }, }} /> From 8e50d145d57fb054b61af49d31c033119529e069 Mon Sep 17 00:00:00 2001 From: superspeedone <525390802@qq.com> Date: Wed, 7 Sep 2022 11:17:59 +0800 Subject: [PATCH 02/15] =?UTF-8?q?Topic=E6=B6=88=E6=81=AF=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=94=AF=E6=8C=81Timestamp=E6=8E=92=E5=BA=8F=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=9F=A5=E8=AF=A2=E6=9C=80=E6=96=B0=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=88=96=E6=9C=80=E6=97=A9=E6=B6=88=E6=81=AF=20#534?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/biz/topic/TopicStateManager.java | 3 ++- .../km/biz/topic/impl/TopicStateManagerImpl.java | 16 +++++++--------- .../km/common/bean/dto/topic/TopicRecordDTO.java | 7 ++----- .../streaming/km/common/constant/Constant.java | 12 ------------ .../src/pages/TopicDetail/Messages.tsx | 6 +++--- .../rest/api/v3/topic/TopicStateController.java | 6 ++++-- 6 files changed, 18 insertions(+), 32 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java index 0e8436d9..e467253c 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java @@ -1,5 +1,6 @@ package com.xiaojukeji.know.streaming.km.biz.topic; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; @@ -15,7 +16,7 @@ import java.util.List; public interface TopicStateManager { TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException; - Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException; + Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException; Result getTopicState(Long clusterPhyId, String topicName); diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 59e91b1c..092fb571 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.biz.topic.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; @@ -22,17 +23,18 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; -import com.xiaojukeji.know.streaming.km.common.converter.PartitionConverter; import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter; +import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; -import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; +import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; @@ -121,7 +123,7 @@ public class TopicStateManagerImpl implements TopicStateManager { } @Override - public Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException { + public Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException { long startTime = System.currentTimeMillis(); // 获取集群 @@ -162,7 +164,7 @@ public class TopicStateManagerImpl implements TopicStateManager { maxMessage = Math.min(maxMessage, dto.getMaxRecords()); kafkaConsumer.assign(partitionList); for (TopicPartition partition : partitionList) { - if (Constant.EARLIEST.equals(dto.getFilterOffsetReset())) { + if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); } else { kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); @@ -192,11 +194,7 @@ public class TopicStateManagerImpl implements TopicStateManager { // 排序 if (ObjectUtils.isNotEmpty(voList)) { - if (Constant.ASC.equals(dto.getSortType())) { - voList.sort((o1, o2) -> (int) (o1.getTimestampUnitMs() - o2.getTimestampUnitMs())); - } else if (Constant.DESC.equals(dto.getSortType())) { - voList.sort((o1, o2) -> (int) (o2.getTimestampUnitMs() - o1.getTimestampUnitMs())); - } + PaginationUtil.pageBySort(voList, sortDto.getSortField(), sortDto.getSortField()); } return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java index 972e2492..0a83ef01 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java @@ -35,10 +35,7 @@ public class TopicRecordDTO extends BaseDTO { @ApiModelProperty(value = "预览超时时间", example = "10000") private Long pullTimeoutUnitMs = 8000L; - @ApiModelProperty(value = "排序", example = "desc") - private String sortType; - - @ApiModelProperty(value = "offset", example = "latest") - private String filterOffsetReset; + @ApiModelProperty(value = "offset", example = "") + private Integer filterOffsetReset = 0; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index 2ef958b0..36575938 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -65,16 +65,4 @@ public class Constant { public static final Integer DEFAULT_RETRY_TIME = 3; - /** - * 排序 - */ - public static final String ASC = "asc"; - public static final String DESC = "desc"; - - /** - * 消费策略 - */ - public static final String LATEST = "latest"; - public static final String EARLIEST = "earliest"; - } diff --git a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx index 4e7fa8ff..220781f0 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx @@ -10,7 +10,7 @@ const defaultParams: any = { maxRecords: 100, pullTimeoutUnitMs: 5000, // filterPartitionId: 1, - filterOffsetReset: 'latest' + filterOffsetReset: 0 }; const defaultpaPagination = { current: 1, @@ -32,8 +32,8 @@ const TopicMessages = (props: any) => { // 获取消息开始位置 const offsetResetList = [ - { 'label': 'latest', value: 'latest' }, - { 'label': 'earliest', value: 'earliest' } + { 'label': 'latest', value: '0' }, + { 'label': 'earliest', value: '1' } ]; // 默认排序 diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java index 52a279e9..808c7a32 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.topic; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; @@ -91,8 +92,9 @@ public class TopicStateController { @ResponseBody public Result> getTopicMessages(@PathVariable Long clusterPhyId, @PathVariable String topicName, - @Validated @RequestBody TopicRecordDTO dto) throws Exception { - return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto); + @Validated @RequestBody TopicRecordDTO dto, + @Validated PaginationSortDTO sortDto) throws Exception { + return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto, sortDto); } @ApiOperation(value = "Topic-ACL信息", notes = "") From 0e49002f426d102eda69965c9676b1cf7f99b043 Mon Sep 17 00:00:00 2001 From: superspeedone <525390802@qq.com> Date: Fri, 9 Sep 2022 15:45:31 +0800 Subject: [PATCH 03/15] =?UTF-8?q?Topic=E6=B6=88=E6=81=AF=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=94=AF=E6=8C=81Timestamp=E6=8E=92=E5=BA=8F=EF=BC=8C=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E6=94=AF=E6=8C=81=E6=8C=89=E6=8C=87=E5=AE=9A=E6=97=A5?= =?UTF-8?q?=E6=9C=9F=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/biz/topic/TopicStateManager.java | 2 +- .../biz/topic/impl/TopicStateManagerImpl.java | 29 ++++++++++++++----- .../common/bean/dto/topic/TopicRecordDTO.java | 9 +++++- .../src/pages/TopicDetail/Messages.tsx | 23 ++++++++++----- .../src/pages/TopicDetail/config.tsx | 3 +- .../api/v3/topic/TopicStateController.java | 5 ++-- 6 files changed, 50 insertions(+), 21 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java index e467253c..ec3a3207 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java @@ -16,7 +16,7 @@ import java.util.List; public interface TopicStateManager { TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException; - Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException; + Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException; Result getTopicState(Long clusterPhyId, String topicName); diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 092fb571..0fb368e0 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -3,7 +3,6 @@ package com.xiaojukeji.know.streaming.km.biz.topic.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; -import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; @@ -40,10 +39,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; import org.apache.commons.lang3.ObjectUtils; import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.springframework.beans.factory.annotation.Autowired; @@ -123,7 +119,7 @@ public class TopicStateManagerImpl implements TopicStateManager { } @Override - public Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException { + public Result> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException { long startTime = System.currentTimeMillis(); // 获取集群 @@ -163,10 +159,29 @@ public class TopicStateManagerImpl implements TopicStateManager { } maxMessage = Math.min(maxMessage, dto.getMaxRecords()); kafkaConsumer.assign(partitionList); + + Map partitionOffsetAndTimestampMap = new HashMap<>(); + // 获取指定时间每个分区的offset(按指定开始时间查询消息时) + if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + Map timestampsToSearch = new HashMap<>(); + partitionList.forEach(topicPartition -> { + timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()); + }); + partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch); + } + for (TopicPartition partition : partitionList) { if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { + // 重置到最旧 kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); + } else if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + // 重置到指定时间 + kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset()); + } else if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) { + // 重置到指定位置 + } else { + // 默认,重置到最新 kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); } } @@ -194,7 +209,7 @@ public class TopicStateManagerImpl implements TopicStateManager { // 排序 if (ObjectUtils.isNotEmpty(voList)) { - PaginationUtil.pageBySort(voList, sortDto.getSortField(), sortDto.getSortField()); + PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType()); } return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java index 0a83ef01..74e5611b 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java @@ -2,6 +2,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.topic; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @@ -15,7 +16,7 @@ import javax.validation.constraints.NotNull; @Data @JsonIgnoreProperties(ignoreUnknown = true) @ApiModel(description = "Topic记录") -public class TopicRecordDTO extends BaseDTO { +public class TopicRecordDTO extends PaginationSortDTO { @NotNull(message = "truncate不允许为空") @ApiModelProperty(value = "是否截断", example = "true") private Boolean truncate; @@ -38,4 +39,10 @@ public class TopicRecordDTO extends BaseDTO { @ApiModelProperty(value = "offset", example = "") private Integer filterOffsetReset = 0; + @ApiModelProperty(value = "开始日期时间戳", example = "") + private Long startTimestampUnitMs; + + @ApiModelProperty(value = "结束日期时间戳", example = "") + private Long utilTimestampUnitMs; + } diff --git a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx index 220781f0..6862596e 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx @@ -32,8 +32,8 @@ const TopicMessages = (props: any) => { // 获取消息开始位置 const offsetResetList = [ - { 'label': 'latest', value: '0' }, - { 'label': 'earliest', value: '1' } + { 'label': 'latest', value: 0 }, + { 'label': 'earliest', value: 1 } ]; // 默认排序 @@ -42,6 +42,8 @@ const TopicMessages = (props: any) => { sortType: 'desc', }; + const [sorter, setSorter] = useState(defaultSorter); + // 请求接口获取数据 const genData = async () => { if (urlParams?.clusterId === undefined || hashData?.topicName === undefined) return; @@ -56,7 +58,7 @@ const TopicMessages = (props: any) => { }); setPartitionIdList(newPartitionIdList || []); }); - request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...defaultSorter }, method: 'POST' }) + request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...sorter }, method: 'POST' }) .then((res: any) => { // setPagination({ // current: res.pagination?.pageNo, @@ -94,11 +96,16 @@ const TopicMessages = (props: any) => { history.push(`/cluster/${urlParams?.clusterId}/testing/consumer`); }; - const onTableChange = (pagination: any, filters: any, sorter: any) => { - defaultSorter.sortField = sorter.field || ''; - defaultSorter.sortType = sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : ''; + const onTableChange = (pagination: any, filters: any, sorter: any, extra: any) => { setPagination(pagination); - genData(); + // 只有排序事件时,触发重新请求后端数据 + if(extra.action === 'sort') { + setSorter({ + sortField: sorter.field || '', + sortType: sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : '' + }); + genData(); + } // const asc = sorter?.order && sorter?.order === 'ascend' ? true : false; // const sortColumn = sorter.field && toLine(sorter.field); // genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams }); @@ -106,7 +113,7 @@ const TopicMessages = (props: any) => { useEffect(() => { props.positionType === 'Messages' && genData(); - }, [props, params]); + }, [props, params, sorter]); return ( <> diff --git a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx index 5bb75eb0..085ad046 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx @@ -85,7 +85,8 @@ export const getTopicMessagesColmns = () => { title: 'Timestamp', dataIndex: 'timestampUnitMs', key: 'timestampUnitMs', - render: (t: number) => (t ? moment(t).format(timeFormat) : '-'), + sorter: true, + render: (t: number) => (t ? moment(t).format(timeFormat) + '.' + moment(t).millisecond() : '-'), }, { title: 'Key', diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java index 808c7a32..d1e09e66 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java @@ -92,9 +92,8 @@ public class TopicStateController { @ResponseBody public Result> getTopicMessages(@PathVariable Long clusterPhyId, @PathVariable String topicName, - @Validated @RequestBody TopicRecordDTO dto, - @Validated PaginationSortDTO sortDto) throws Exception { - return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto, sortDto); + @Validated @RequestBody TopicRecordDTO dto) throws Exception { + return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto); } @ApiOperation(value = "Topic-ACL信息", notes = "") From 0d227aef498d8f53c49eda2d2bb3b54675712bc6 Mon Sep 17 00:00:00 2001 From: superspeedone <525390802@qq.com> Date: Fri, 9 Sep 2022 17:29:22 +0800 Subject: [PATCH 04/15] =?UTF-8?q?Topic=E6=B6=88=E6=81=AF=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=94=AF=E6=8C=81Timestamp=E6=8E=92=E5=BA=8F=EF=BC=8C=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E6=94=AF=E6=8C=81=E6=8C=89=E6=8C=87=E5=AE=9A=E6=97=A5?= =?UTF-8?q?=E6=9C=9F=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../layout-clusters-fe/src/pages/TopicDetail/Messages.tsx | 1 - 1 file changed, 1 deletion(-) diff --git a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx index 6862596e..eecf792a 100644 --- a/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx +++ b/km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx @@ -104,7 +104,6 @@ const TopicMessages = (props: any) => { sortField: sorter.field || '', sortType: sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : '' }); - genData(); } // const asc = sorter?.order && sorter?.order === 'ascend' ? true : false; // const sortColumn = sorter.field && toLine(sorter.field); From 405e6e0c1d73ceabe44e907d2b4371c2bbb3b412 Mon Sep 17 00:00:00 2001 From: superspeedone <525390802@qq.com> Date: Fri, 9 Sep 2022 18:56:45 +0800 Subject: [PATCH 05/15] =?UTF-8?q?Topic=E6=B6=88=E6=81=AF=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=94=AF=E6=8C=81Timestamp=E6=8E=92=E5=BA=8F=EF=BC=8C=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E6=94=AF=E6=8C=81=E6=8C=89=E6=8C=87=E5=AE=9A=E6=97=A5?= =?UTF-8?q?=E6=9C=9F=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../streaming/km/biz/topic/impl/TopicStateManagerImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 0fb368e0..1afb1210 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -24,6 +24,7 @@ import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter; import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum; +import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; @@ -38,6 +39,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; @@ -209,6 +211,10 @@ public class TopicStateManagerImpl implements TopicStateManager { // 排序 if (ObjectUtils.isNotEmpty(voList)) { + // 默认按时间倒序排序 + if (StringUtils.isBlank(dto.getSortType())) { + dto.setSortType(SortTypeEnum.DESC.getSortType()); + } PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType()); } From d4f416de144fce95eea4556b5bfdadf68176b746 Mon Sep 17 00:00:00 2001 From: pokemeng Date: Fri, 16 Sep 2022 11:34:03 +0800 Subject: [PATCH 06/15] fix: adjust os judgment method with uname --- bin/startup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/startup.sh b/bin/startup.sh index 8081f2dd..cbde7c56 100644 --- a/bin/startup.sh +++ b/bin/startup.sh @@ -9,7 +9,7 @@ error_exit () [ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME if [ -z "$JAVA_HOME" ]; then - if $darwin; then + if [ "Darwin" = "$(uname -s)" ]; then if [ -x '/usr/libexec/java_home' ] ; then export JAVA_HOME=`/usr/libexec/java_home` From 85e3f2a94693cae690bfeac951bb99a1b67e6431 Mon Sep 17 00:00:00 2001 From: wangdongfang-aden <49342405+wangdongfang-aden@users.noreply.github.com> Date: Fri, 16 Sep 2022 14:40:34 +0800 Subject: [PATCH 07/15] helm update 3.0.0-beta.2 --- km-dist/helm/Chart.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/km-dist/helm/Chart.yaml b/km-dist/helm/Chart.yaml index bd9b7d98..4b764af4 100644 --- a/km-dist/helm/Chart.yaml +++ b/km-dist/helm/Chart.yaml @@ -4,13 +4,13 @@ description: knowstreaming-manager Helm chart type: application -version: 0.1.3 +version: 0.1.4 maintainers: - email: didicloud@didiglobal.com name: didicloud -appVersion: "3.0.0-beta.1" +appVersion: "3.0.0-beta.2" dependencies: - name: knowstreaming-web From a6f1fe07b351d3341a6ec73c09aca1aff4e17069 Mon Sep 17 00:00:00 2001 From: wangdongfang-aden <49342405+wangdongfang-aden@users.noreply.github.com> Date: Fri, 16 Sep 2022 14:41:02 +0800 Subject: [PATCH 08/15] helm update 3.0.0-beta.2 --- km-dist/helm/values.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/km-dist/helm/values.yaml b/km-dist/helm/values.yaml index 036f5cc8..2c422f61 100644 --- a/km-dist/helm/values.yaml +++ b/km-dist/helm/values.yaml @@ -3,7 +3,7 @@ replicaCount: 2 image: repository: knowstreaming/knowstreaming-manager pullPolicy: IfNotPresent - tag: "0.1.0" + tag: "0.2.0" imagePullSecrets: [] nameOverride: "" @@ -73,7 +73,7 @@ knowstreaming-web: image: repository: knowstreaming/knowstreaming-ui pullPolicy: IfNotPresent - tag: "0.1.0" + tag: "0.2.0" service: type: NodePort From 12b5acd073dc8356955a467e4aac1a6db244ea98 Mon Sep 17 00:00:00 2001 From: wangdongfang-aden <49342405+wangdongfang-aden@users.noreply.github.com> Date: Fri, 16 Sep 2022 14:41:40 +0800 Subject: [PATCH 09/15] helm update 3.0.0-beta.2 --- km-dist/helm/charts/ksmysql/templates/statefulset.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/km-dist/helm/charts/ksmysql/templates/statefulset.yaml b/km-dist/helm/charts/ksmysql/templates/statefulset.yaml index 15ab2858..a536f2f8 100644 --- a/km-dist/helm/charts/ksmysql/templates/statefulset.yaml +++ b/km-dist/helm/charts/ksmysql/templates/statefulset.yaml @@ -21,7 +21,7 @@ spec: {{- include "ksmysql.selectorLabels" . | nindent 8 }} spec: containers: - - image: knowstreaming/knowstreaming-mysql:0.1.0 + - image: knowstreaming/knowstreaming-mysql:0.2.0 name: {{ .Chart.Name }} env: - name: MYSQL_DATABASE From fe6ddebc493a99f7f4bd764ae149896d7379286d Mon Sep 17 00:00:00 2001 From: wyb <1164642317@qq.com> Date: Fri, 16 Sep 2022 14:41:45 +0800 Subject: [PATCH 10/15] =?UTF-8?q?=E6=96=87=E6=A1=A3=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/install_guide/版本升级手册.md | 25 +++++++++++-------------- docs/user_guide/用户使用手册.md | 2 +- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/docs/install_guide/版本升级手册.md b/docs/install_guide/版本升级手册.md index 48417bb4..2af3f69a 100644 --- a/docs/install_guide/版本升级手册.md +++ b/docs/install_guide/版本升级手册.md @@ -40,8 +40,7 @@ thread-pool: ``` - -**SQL变更** +**SQL 变更** ```sql -- 多集群管理权限2022-09-06新增 @@ -80,12 +79,11 @@ ALTER TABLE `logi_security_oplog` ### 6.2.2、升级至 `v3.0.0-beta.1`版本 - -**SQL变更** +**SQL 变更** 1、在`ks_km_broker`表增加了一个监听信息字段。 -2、为`logi_security_oplog`表operation_methods字段设置默认值''。 -因此需要执行下面的sql对数据库表进行更新。 +2、为`logi_security_oplog`表 operation_methods 字段设置默认值''。 +因此需要执行下面的 sql 对数据库表进行更新。 ```sql ALTER TABLE `ks_km_broker` @@ -98,7 +96,6 @@ ALTER COLUMN `operation_methods` set default ''; --- - ### 6.2.3、`2.x`版本 升级至 `v3.0.0-beta.0`版本 **升级步骤:** @@ -123,14 +120,14 @@ ALTER COLUMN `operation_methods` set default ''; UPDATE ks_km_topic INNER JOIN (SELECT - topic.cluster_id AS cluster_id, - topic.topic_name AS topic_name, - topic.description AS description + topic.cluster_id AS cluster_id, + topic.topic_name AS topic_name, + topic.description AS description FROM topic WHERE description != '' ) AS t - ON ks_km_topic.cluster_phy_id = t.cluster_id - AND ks_km_topic.topic_name = t.topic_name - AND ks_km_topic.id > 0 -SET ks_km_topic.description = t.description; +ON ks_km_topic.cluster_phy_id = t.cluster_id + AND ks_km_topic.topic_name = t.topic_name + AND ks_km_topic.id > 0 + SET ks_km_topic.description = t.description; ``` \ No newline at end of file diff --git a/docs/user_guide/用户使用手册.md b/docs/user_guide/用户使用手册.md index f2efc936..6179cb3b 100644 --- a/docs/user_guide/用户使用手册.md +++ b/docs/user_guide/用户使用手册.md @@ -11,7 +11,7 @@ 下面是用户第一次使用我们产品的典型体验路径: -![text](http://img-ys011.didistatic.com/static/dc2img/do1_YehqxqmsVaqU5gf3XphI) +![text](http://img-ys011.didistatic.com/static/dc2img/do1_qgqPsAY46sZeBaPUCwXY) ## 5.3、常用功能 From 68f76f2daf3d9f7f0e83f7ed4bfbde7b38ad2c58 Mon Sep 17 00:00:00 2001 From: wangdongfang-aden <49342405+wangdongfang-aden@users.noreply.github.com> Date: Fri, 16 Sep 2022 14:42:34 +0800 Subject: [PATCH 11/15] helm update 3.0.0-beta.2 --- km-dist/helm/charts/elasticsearch/values.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/km-dist/helm/charts/elasticsearch/values.yaml b/km-dist/helm/charts/elasticsearch/values.yaml index fbbc599d..e325ec68 100644 --- a/km-dist/helm/charts/elasticsearch/values.yaml +++ b/km-dist/helm/charts/elasticsearch/values.yaml @@ -173,8 +173,8 @@ antiAffinityTopologyKey: "kubernetes.io/hostname" # Hard means that by default pods will only be scheduled if there are enough nodes for them # and that they will never end up on the same node. Setting this to soft will do this "best effort" -antiAffinity: "hard" - +antiAffinity: "" +#antiAffinity: "hard" # This is the node affinity settings as defined in # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#node-affinity-beta-feature nodeAffinity: {} From 7288874d72e40883152fc9dabc4201ab46da4e81 Mon Sep 17 00:00:00 2001 From: wangdongfang-aden <49342405+wangdongfang-aden@users.noreply.github.com> Date: Fri, 16 Sep 2022 14:44:14 +0800 Subject: [PATCH 12/15] helm update 3.0.0-beta.2 --- km-dist/helm/templates/configmap.yaml | 32 +++++++++++++++------------ 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/km-dist/helm/templates/configmap.yaml b/km-dist/helm/templates/configmap.yaml index ae09f6b9..6395aff1 100644 --- a/km-dist/helm/templates/configmap.yaml +++ b/km-dist/helm/templates/configmap.yaml @@ -71,6 +71,7 @@ data: driver-class-name: org.mariadb.jdbc.Driver app-name: know-streaming resource-extend-bean-name: myResourceExtendImpl + login-extend-bean-name: logiSecurityDefaultLoginExtendImpl logging: config: classpath:logback-spring.xml @@ -85,11 +86,16 @@ data: queue-size: 10000 # 每个线程池队列大小 select-suitable-enable: true # 任务是否自动选择合适的线程池,非主要,可不修改 suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改 - task: # 任务模块的配置 - heaven: # 采集任务配置 - thread-num: 20 # 采集任务线程池核心线程数 - queue-size: 1000 # 采集任务线程池队列大小 - + task: # 任务模块的配置 + metrics: # metrics采集任务配置 + thread-num: 18 # metrics采集任务线程池核心线程数 + queue-size: 180 # metrics采集任务线程池队列大小 + metadata: # metadata同步任务配置 + thread-num: 27 # metadata同步任务线程池核心线程数 + queue-size: 270 # metadata同步任务线程池队列大小 + common: # 剩余其他任务配置 + thread-num: 15 # 剩余其他任务线程池核心线程数 + queue-size: 150 # 剩余其他任务线程池队列大小 client-pool: @@ -99,17 +105,16 @@ data: max-total-client-num: 20 # 最大客户端数 borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒 + es: + client: {{ if .Values.elasticsearch.enabled }} - es.client.address: elasticsearch-master:9200 - #es.client.address: {{ .Release.Name }}-elasticsearch:9200 + address: elasticsearch-master:9200 {{- else }} - es.client.address: {{ .Values.elasticsearch.esClientAddress }}:{{ .Values.elasticsearch.esProt }} + address: {{ .Values.elasticsearch.esClientAddress }}:{{ .Values.elasticsearch.esProt }} {{- end }} - # es.client.pass: knowstreaming-manager - # 集群自动均衡相关配置 - cluster-balance: - ignored-topics: - time-second: 300 + client-cnt: 10 + io-thread-cnt: 2 + max-retry-cnt: 5 # 普罗米修斯指标导出相关配置 management: @@ -158,4 +163,3 @@ data: curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_topic_metric${logdate} || \ exit 2 done - From 375c6f56c9aa4533f7b13419e2b44a1fe4da56be Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 19 Sep 2022 13:55:59 +0800 Subject: [PATCH 13/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9GroupOffsetResetEnum?= =?UTF-8?q?=E7=B1=BB=E5=90=8D=E4=B8=BAOffsetTypeEnum?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/biz/group/impl/GroupManagerImpl.java | 12 ++++++------ .../km/biz/topic/impl/TopicStateManagerImpl.java | 10 +++++----- .../common/bean/dto/group/GroupOffsetResetDTO.java | 3 ++- .../km/common/bean/dto/topic/TopicRecordDTO.java | 9 ++++----- ...oupOffsetResetEnum.java => OffsetTypeEnum.java} | 14 +++++++------- 5 files changed, 24 insertions(+), 24 deletions(-) rename km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/{GroupOffsetResetEnum.java => OffsetTypeEnum.java} (50%) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index 15aafdb5..1095d5ee 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedD import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; -import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum; +import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; @@ -199,12 +199,12 @@ public class GroupManagerImpl implements GroupManager { return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(dto.getClusterId(), dto.getTopicName())); } - if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType() + if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType() && ValidateUtils.isEmptyList(dto.getOffsetList())) { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定offset重置需传offset信息"); } - if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType() + if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType() && ValidateUtils.isNull(dto.getTimestamp())) { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定时间重置需传时间信息"); } @@ -213,7 +213,7 @@ public class GroupManagerImpl implements GroupManager { } private Result> getPartitionOffset(GroupOffsetResetDTO dto) { - if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) { + if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) { return Result.buildSuc(dto.getOffsetList().stream().collect(Collectors.toMap( elem -> new TopicPartition(dto.getTopicName(), elem.getPartitionId()), PartitionOffsetDTO::getOffset, @@ -222,9 +222,9 @@ public class GroupManagerImpl implements GroupManager { } OffsetSpec offsetSpec = null; - if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) { + if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) { offsetSpec = OffsetSpec.forTimestamp(dto.getTimestamp()); - } else if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getResetType()) { + } else if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getResetType()) { offsetSpec = OffsetSpec.earliest(); } else { offsetSpec = OffsetSpec.latest(); diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 1afb1210..9c03737a 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -23,7 +23,7 @@ import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter; -import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum; +import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; @@ -164,7 +164,7 @@ public class TopicStateManagerImpl implements TopicStateManager { Map partitionOffsetAndTimestampMap = new HashMap<>(); // 获取指定时间每个分区的offset(按指定开始时间查询消息时) - if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { Map timestampsToSearch = new HashMap<>(); partitionList.forEach(topicPartition -> { timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()); @@ -173,13 +173,13 @@ public class TopicStateManagerImpl implements TopicStateManager { } for (TopicPartition partition : partitionList) { - if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { + if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { // 重置到最旧 kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); - } else if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + } else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { // 重置到指定时间 kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset()); - } else if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) { + } else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) { // 重置到指定位置 } else { diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java index 09d12bcd..378709dd 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.group; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.ClusterTopicDTO; +import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @@ -23,7 +24,7 @@ public class GroupOffsetResetDTO extends ClusterTopicDTO { private String groupName; /** - * @see com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum + * @see OffsetTypeEnum */ @NotNull(message = "resetType不允许为空") @ApiModelProperty(value = "重置方式", example = "1") diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java index 74e5611b..ca625f01 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java @@ -1,8 +1,8 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.topic; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; +import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @@ -36,13 +36,12 @@ public class TopicRecordDTO extends PaginationSortDTO { @ApiModelProperty(value = "预览超时时间", example = "10000") private Long pullTimeoutUnitMs = 8000L; + /** + * @see OffsetTypeEnum + */ @ApiModelProperty(value = "offset", example = "") private Integer filterOffsetReset = 0; @ApiModelProperty(value = "开始日期时间戳", example = "") private Long startTimestampUnitMs; - - @ApiModelProperty(value = "结束日期时间戳", example = "") - private Long utilTimestampUnitMs; - } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/GroupOffsetResetEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/OffsetTypeEnum.java similarity index 50% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/GroupOffsetResetEnum.java rename to km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/OffsetTypeEnum.java index 178fb90e..6bdc8b80 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/GroupOffsetResetEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/OffsetTypeEnum.java @@ -3,19 +3,19 @@ package com.xiaojukeji.know.streaming.km.common.enums; import lombok.Getter; /** - * 重置offset + * offset类型 * @author zengqiao * @date 19/4/8 */ @Getter -public enum GroupOffsetResetEnum { - LATEST(0, "重置到最新"), +public enum OffsetTypeEnum { + LATEST(0, "最新"), - EARLIEST(1, "重置到最旧"), + EARLIEST(1, "最旧"), - PRECISE_TIMESTAMP(2, "按时间进行重置"), + PRECISE_TIMESTAMP(2, "指定时间"), - PRECISE_OFFSET(3, "重置到指定位置"), + PRECISE_OFFSET(3, "指定位置"), ; @@ -23,7 +23,7 @@ public enum GroupOffsetResetEnum { private final String message; - GroupOffsetResetEnum(int resetType, String message) { + OffsetTypeEnum(int resetType, String message) { this.resetType = resetType; this.message = message; } From b6c6df7ffc138657267acbc3b8d85ce8c1bd9111 Mon Sep 17 00:00:00 2001 From: Richard <49510754+f1558@users.noreply.github.com> Date: Tue, 20 Sep 2022 09:42:42 +0800 Subject: [PATCH 14/15] fix issue * SQL specification comments to avoid direct operation failure --- km-dist/init/sql/dml-logi.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/km-dist/init/sql/dml-logi.sql b/km-dist/init/sql/dml-logi.sql index 86eb9a69..6d6e8159 100644 --- a/km-dist/init/sql/dml-logi.sql +++ b/km-dist/init/sql/dml-logi.sql @@ -48,7 +48,7 @@ INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `l -- 初始化用户 ---INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOSGxOUkVsNVdETjBRVlp0Y0V0T1IwWnlaVEZ6YWxGRVJrRkpNVEU1VTJwYVUySkhlRzlSU0RBOWUwQldha28wWVd0N1d5TkFNa0FqWFgxS05sSnNiR2hBZlE9PXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming'); +-- INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOSGxOUkVsNVdETjBRVlp0Y0V0T1IwWnlaVEZ6YWxGRVJrRkpNVEU1VTJwYVUySkhlRzlSU0RBOWUwQldha28wWVd0N1d5TkFNa0FqWFgxS05sSnNiR2hBZlE9PXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming'); INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOVGRSUmxweFUycFNhR0V6ZEdKSk1FRjRVVU5PWkdaVmJ6SlZiWGh6WVVWQ09YdEFWbXBLTkdGcmUxc2pRREpBSTExOVNqWlNiR3hvUUgwPXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming'); -- 初始化角色 @@ -96,4 +96,4 @@ INSERT INTO `logi_security_user_role` (`id`, `user_id`, `role_id`, `is_delete`, INSERT INTO `logi_security_config` (`value_group`,`value_name`,`value`,`edit`,`status`,`memo`,`is_delete`,`app_name`,`operator`) VALUES -('SECURITY.LOGIN','SECURITY.TRICK_USERS','[\n \"admin\"\n]',1,1,'允许跳过登录的用户',0,'know-streaming','admin'); \ No newline at end of file +('SECURITY.LOGIN','SECURITY.TRICK_USERS','[\n \"admin\"\n]',1,1,'允许跳过登录的用户',0,'know-streaming','admin'); From e06712397e8ba1f984233f4317babd70f2905f03 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 20 Sep 2022 10:27:30 +0800 Subject: [PATCH 15/15] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=9B=A0DB=E4=B8=ADBro?= =?UTF-8?q?ker=E4=BF=A1=E6=81=AF=E4=B8=8D=E5=AD=98=E5=9C=A8=E5=AF=BC?= =?UTF-8?q?=E8=87=B4TotalLogSize=E6=8C=87=E6=A0=87=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=97=B6=E6=8A=9B=E7=A9=BA=E6=8C=87=E9=92=88=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cluster/impl/ClusterMetricServiceImpl.java | 15 ++++++++------- .../km/persistence/kafka/KafkaJMXClient.java | 4 ++++ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java index 075c53c2..9fdd9ec0 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java @@ -751,8 +751,8 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust private Result getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric){ List topics = topicService.listTopicsFromCacheFirst(clusterId); - float metricsSum = 0f; - for(Topic topic : topics){ + float sumMetricValue = 0f; + for(Topic topic : topics) { Result> ret = topicMetricService.collectTopicMetricsFromKafkaWithCacheFirst( clusterId, topic.getTopicName(), @@ -763,14 +763,15 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust continue; } - List topicMetrics = ret.getData(); - for (TopicMetrics metrics : topicMetrics) { - if(metrics.isBBrokerAgg()){ - metricsSum += Double.valueOf(metrics.getMetrics().get(topicMetric)); + for (TopicMetrics metrics : ret.getData()) { + if(metrics.isBBrokerAgg()) { + Float metricValue = metrics.getMetric(topicMetric); + sumMetricValue += (metricValue == null? 0f: metricValue); + break; } } } - return Result.buildSuc(initWithMetrics(clusterId, metric, metricsSum)); + return Result.buildSuc(initWithMetrics(clusterId, metric, sumMetricValue)); } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java index 68d1011e..39ae1ebe 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java @@ -191,6 +191,10 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler { lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE); BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper); + if (brokerPO == null) { + return null; + } + return Broker.buildFrom(brokerPO); } }