mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Optimize]Topic-Messages页面后端增加按照Partition和Offset纬度的排序 (#1075)
This commit is contained in:
@@ -28,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart
|
|||||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.constant.PaginationConstant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
|
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
|
||||||
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
|
||||||
@@ -46,8 +47,6 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
|||||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
|
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems;
|
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems;
|
||||||
import org.apache.commons.lang3.ObjectUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.kafka.clients.consumer.*;
|
import org.apache.kafka.clients.consumer.*;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.config.TopicConfig;
|
import org.apache.kafka.common.config.TopicConfig;
|
||||||
@@ -105,7 +104,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
|||||||
TopicBrokerAllVO allVO = new TopicBrokerAllVO();
|
TopicBrokerAllVO allVO = new TopicBrokerAllVO();
|
||||||
|
|
||||||
allVO.setTotal(topic.getBrokerIdSet().size());
|
allVO.setTotal(topic.getBrokerIdSet().size());
|
||||||
allVO.setLive((int)brokerMap.values().stream().filter(elem -> elem.alive()).count());
|
allVO.setLive((int)brokerMap.values().stream().filter(Broker::alive).count());
|
||||||
allVO.setDead(allVO.getTotal() - allVO.getLive());
|
allVO.setDead(allVO.getTotal() - allVO.getLive());
|
||||||
|
|
||||||
allVO.setPartitionCount(topic.getPartitionNum());
|
allVO.setPartitionCount(topic.getPartitionNum());
|
||||||
@@ -157,95 +156,28 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
|||||||
return Result.buildFromIgnoreData(endOffsetsMapResult);
|
return Result.buildFromIgnoreData(endOffsetsMapResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<TopicRecordVO> voList = new ArrayList<>();
|
// 数据采集
|
||||||
|
List<TopicRecordVO> voList = this.getTopicMessages(clusterPhy, topicName, beginOffsetsMapResult.getData(), endOffsetsMapResult.getData(), startTime, dto);
|
||||||
|
|
||||||
KafkaConsumer<String, String> kafkaConsumer = null;
|
// 排序
|
||||||
try {
|
if (ValidateUtils.isBlank(dto.getSortType())) {
|
||||||
// 创建kafka-consumer
|
// 默认按时间倒序排序
|
||||||
kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()));
|
dto.setSortType(SortTypeEnum.DESC.getSortType());
|
||||||
|
|
||||||
List<TopicPartition> partitionList = new ArrayList<>();
|
|
||||||
long maxMessage = 0;
|
|
||||||
for (Map.Entry<TopicPartition, Long> entry : endOffsetsMapResult.getData().entrySet()) {
|
|
||||||
long begin = beginOffsetsMapResult.getData().get(entry.getKey());
|
|
||||||
long end = entry.getValue();
|
|
||||||
if (begin == end){
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
maxMessage += end - begin;
|
|
||||||
partitionList.add(entry.getKey());
|
|
||||||
}
|
|
||||||
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
|
|
||||||
kafkaConsumer.assign(partitionList);
|
|
||||||
|
|
||||||
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
|
|
||||||
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
|
|
||||||
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
|
||||||
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
|
||||||
partitionList.forEach(topicPartition -> timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()));
|
|
||||||
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (TopicPartition partition : partitionList) {
|
|
||||||
if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
|
|
||||||
// 重置到最旧
|
|
||||||
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
|
|
||||||
} else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
|
||||||
// 重置到指定时间
|
|
||||||
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
|
|
||||||
} else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
|
|
||||||
// 重置到指定位置
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// 默认,重置到最新
|
|
||||||
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
|
|
||||||
while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) {
|
|
||||||
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
|
|
||||||
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
|
|
||||||
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
voList.add(TopicVOConverter.convert2TopicRecordVO(topicName, consumerRecord));
|
|
||||||
if (voList.size() >= dto.getMaxRecords()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 超时则返回
|
|
||||||
if (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS > dto.getPullTimeoutUnitMs()
|
|
||||||
|| voList.size() > dto.getMaxRecords()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 排序
|
|
||||||
if (ObjectUtils.isNotEmpty(voList)) {
|
|
||||||
// 默认按时间倒序排序
|
|
||||||
if (StringUtils.isBlank(dto.getSortType())) {
|
|
||||||
dto.setSortType(SortTypeEnum.DESC.getSortType());
|
|
||||||
}
|
|
||||||
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
|
|
||||||
}
|
|
||||||
|
|
||||||
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);
|
|
||||||
|
|
||||||
throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED);
|
|
||||||
} finally {
|
|
||||||
if (kafkaConsumer != null) {
|
|
||||||
try {
|
|
||||||
kafkaConsumer.close(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
|
|
||||||
} catch (Exception e) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if (ValidateUtils.isBlank(dto.getSortField())) {
|
||||||
|
// 默认按照timestampUnitMs字段排序
|
||||||
|
dto.setSortField(PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD.equals(dto.getSortField())) {
|
||||||
|
// 如果是时间类型,则第二排序规则是offset
|
||||||
|
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType(), PaginationConstant.TOPIC_RECORDS_OFFSET_SORTED_FIELD, dto.getSortType());
|
||||||
|
} else {
|
||||||
|
// 如果是非时间类型,则第二排序规则是时间
|
||||||
|
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType(), PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD, dto.getSortType());
|
||||||
|
}
|
||||||
|
|
||||||
|
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -338,7 +270,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
|||||||
|
|
||||||
// Broker统计信息
|
// Broker统计信息
|
||||||
vo.setBrokerCount(brokerMap.size());
|
vo.setBrokerCount(brokerMap.size());
|
||||||
vo.setLiveBrokerCount((int)brokerMap.values().stream().filter(elem -> elem.alive()).count());
|
vo.setLiveBrokerCount((int)brokerMap.values().stream().filter(Broker::alive).count());
|
||||||
vo.setDeadBrokerCount(vo.getBrokerCount() - vo.getLiveBrokerCount());
|
vo.setDeadBrokerCount(vo.getBrokerCount() - vo.getLiveBrokerCount());
|
||||||
|
|
||||||
// Partition统计信息
|
// Partition统计信息
|
||||||
@@ -394,11 +326,8 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
|||||||
// ignore
|
// ignore
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (filterValue != null && consumerRecord.value() != null && !consumerRecord.value().contains(filterValue)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
return (filterValue != null && consumerRecord.value() != null && !consumerRecord.value().contains(filterValue));
|
||||||
}
|
}
|
||||||
|
|
||||||
private TopicBrokerSingleVO getTopicBrokerSingle(Long clusterPhyId,
|
private TopicBrokerSingleVO getTopicBrokerSingle(Long clusterPhyId,
|
||||||
@@ -458,4 +387,90 @@ public class TopicStateManagerImpl implements TopicStateManager {
|
|||||||
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Math.max(2, Math.min(5, maxPollRecords)));
|
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Math.max(2, Math.min(5, maxPollRecords)));
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<TopicRecordVO> getTopicMessages(ClusterPhy clusterPhy,
|
||||||
|
String topicName,
|
||||||
|
Map<TopicPartition, Long> beginOffsetsMap,
|
||||||
|
Map<TopicPartition, Long> endOffsetsMap,
|
||||||
|
long startTime,
|
||||||
|
TopicRecordDTO dto) throws AdminOperateException {
|
||||||
|
List<TopicRecordVO> voList = new ArrayList<>();
|
||||||
|
|
||||||
|
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()))) {
|
||||||
|
// 移动到指定位置
|
||||||
|
long maxMessage = this.assignAndSeekToSpecifiedOffset(kafkaConsumer, beginOffsetsMap, endOffsetsMap, dto);
|
||||||
|
|
||||||
|
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
|
||||||
|
while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) {
|
||||||
|
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
|
||||||
|
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
|
||||||
|
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
voList.add(TopicVOConverter.convert2TopicRecordVO(topicName, consumerRecord));
|
||||||
|
if (voList.size() >= dto.getMaxRecords()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 超时则返回
|
||||||
|
if (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS > dto.getPullTimeoutUnitMs()
|
||||||
|
|| voList.size() > dto.getMaxRecords()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return voList;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e);
|
||||||
|
|
||||||
|
throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long assignAndSeekToSpecifiedOffset(KafkaConsumer<String, String> kafkaConsumer,
|
||||||
|
Map<TopicPartition, Long> beginOffsetsMap,
|
||||||
|
Map<TopicPartition, Long> endOffsetsMap,
|
||||||
|
TopicRecordDTO dto) {
|
||||||
|
List<TopicPartition> partitionList = new ArrayList<>();
|
||||||
|
long maxMessage = 0;
|
||||||
|
for (Map.Entry<TopicPartition, Long> entry : endOffsetsMap.entrySet()) {
|
||||||
|
long begin = beginOffsetsMap.get(entry.getKey());
|
||||||
|
long end = entry.getValue();
|
||||||
|
if (begin == end){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
maxMessage += end - begin;
|
||||||
|
partitionList.add(entry.getKey());
|
||||||
|
}
|
||||||
|
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
|
||||||
|
kafkaConsumer.assign(partitionList);
|
||||||
|
|
||||||
|
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
|
||||||
|
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
|
||||||
|
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||||
|
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
|
||||||
|
partitionList.forEach(topicPartition -> timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()));
|
||||||
|
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (TopicPartition partition : partitionList) {
|
||||||
|
if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
|
||||||
|
// 重置到最旧
|
||||||
|
kafkaConsumer.seek(partition, beginOffsetsMap.get(partition));
|
||||||
|
} else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
|
||||||
|
// 重置到指定时间
|
||||||
|
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
|
||||||
|
} else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
|
||||||
|
// 重置到指定位置
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// 默认,重置到最新
|
||||||
|
kafkaConsumer.seek(partition, Math.max(beginOffsetsMap.get(partition), endOffsetsMap.get(partition) - dto.getMaxRecords()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return maxMessage;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,5 +27,8 @@ public class PaginationConstant {
|
|||||||
/**
|
/**
|
||||||
* groupTopic列表的默认排序规则
|
* groupTopic列表的默认排序规则
|
||||||
*/
|
*/
|
||||||
public static final String DEFAULT_GROUP_TOPIC_SORTED_FIELD = "topicName";
|
public static final String DEFAULT_GROUP_TOPIC_SORTED_FIELD = "topicName";
|
||||||
|
|
||||||
|
public static final String TOPIC_RECORDS_TIME_SORTED_FIELD = "timestampUnitMs";
|
||||||
|
public static final String TOPIC_RECORDS_OFFSET_SORTED_FIELD = "offset";
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user