diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index a77b7c39..7d8c81ff 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -12,6 +12,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group; import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic; import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; @@ -42,7 +43,6 @@ import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.Group import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; @@ -274,16 +274,16 @@ public class GroupManagerImpl implements GroupManager { ))); } - OffsetSpec offsetSpec = null; + KSOffsetSpec offsetSpec = null; if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) { - offsetSpec = OffsetSpec.forTimestamp(dto.getTimestamp()); + offsetSpec = KSOffsetSpec.forTimestamp(dto.getTimestamp()); } else if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getResetType()) { - offsetSpec = OffsetSpec.earliest(); + offsetSpec = KSOffsetSpec.earliest(); } else { - offsetSpec = OffsetSpec.latest(); + offsetSpec = KSOffsetSpec.latest(); } - return partitionService.getPartitionOffsetFromKafka(dto.getClusterId(), dto.getTopicName(), offsetSpec, dto.getTimestamp()); + return partitionService.getPartitionOffsetFromKafka(dto.getClusterId(), dto.getTopicName(), offsetSpec); } private List convert2GroupTopicOverviewVOList(List poList, List metricsList) { diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index e4b8a147..cd970528 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -10,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -46,7 +47,6 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; @@ -143,12 +143,12 @@ public class TopicStateManagerImpl implements TopicStateManager { } // 获取分区beginOffset - Result> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.earliest(), null); + Result> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), KSOffsetSpec.earliest()); if (beginOffsetsMapResult.failed()) { return Result.buildFromIgnoreData(beginOffsetsMapResult); } // 获取分区endOffset - Result> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.latest(), null); + Result> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), KSOffsetSpec.latest()); if (endOffsetsMapResult.failed()) { return Result.buildFromIgnoreData(endOffsetsMapResult); } diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ClusterMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ClusterMetricCollector.java index 75ea6406..e70bf6f9 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ClusterMetricCollector.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ClusterMetricCollector.java @@ -47,7 +47,7 @@ public class ClusterMetricCollector extends AbstractMetricCollector { try { diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ReplicaMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ReplicaMetricCollector.java index 4e79d98e..c042ae1d 100644 --- a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ReplicaMetricCollector.java +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/kafka/ReplicaMetricCollector.java @@ -43,7 +43,7 @@ public class ReplicaMetricCollector extends AbstractMetricCollector collectKafkaMetrics(ClusterPhy clusterPhy) { Long clusterPhyId = clusterPhy.getId(); List items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); - List partitions = partitionService.listPartitionByCluster(clusterPhyId); + List partitions = partitionService.listPartitionFromCacheFirst(clusterPhyId); FutureWaitUtil future = this.getFutureUtilByClusterPhyId(clusterPhyId); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/offset/KSOffsetSpec.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/offset/KSOffsetSpec.java new file mode 100644 index 00000000..580e2f6f --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/offset/KSOffsetSpec.java @@ -0,0 +1,50 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.offset; + +import org.apache.kafka.clients.admin.OffsetSpec; + +/** + * @see OffsetSpec + */ +public class KSOffsetSpec { + public static class KSEarliestSpec extends KSOffsetSpec { } + + public static class KSLatestSpec extends KSOffsetSpec { } + + public static class KSTimestampSpec extends KSOffsetSpec { + private final long timestamp; + + public KSTimestampSpec(long timestamp) { + this.timestamp = timestamp; + } + + public long timestamp() { + return timestamp; + } + } + + /** + * Used to retrieve the latest offset of a partition + */ + public static KSOffsetSpec latest() { + return new KSOffsetSpec.KSLatestSpec(); + } + + /** + * Used to retrieve the earliest offset of a partition + */ + public static KSOffsetSpec earliest() { + return new KSOffsetSpec.KSEarliestSpec(); + } + + /** + * Used to retrieve the earliest offset whose timestamp is greater than + * or equal to the given timestamp in the corresponding partition + * @param timestamp in milliseconds + */ + public static KSOffsetSpec forTimestamp(long timestamp) { + return new KSOffsetSpec.KSTimestampSpec(timestamp); + } + + private KSOffsetSpec() { + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java index 02907a6c..09f81812 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java @@ -1,23 +1,39 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; -import lombok.Data; +import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import com.xiaojukeji.know.streaming.km.common.utils.Triple; +import lombok.Getter; import lombok.NoArgsConstructor; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.TopicPartition; -import java.util.Map; +import java.util.*; +import java.util.stream.Collectors; -@Data +@Getter @NoArgsConstructor -public class PartitionOffsetParam extends TopicParam { - private Map topicPartitionOffsets; +public class PartitionOffsetParam extends ClusterPhyParam { + private List>> offsetSpecList; - private Long timestamp; + public PartitionOffsetParam(Long clusterPhyId, String topicName, KSOffsetSpec ksOffsetSpec, List partitionList) { + super(clusterPhyId); + this.offsetSpecList = Collections.singletonList(new Triple<>(topicName, ksOffsetSpec, partitionList)); + } - public PartitionOffsetParam(Long clusterPhyId, String topicName, Map topicPartitionOffsets, Long timestamp) { - super(clusterPhyId, topicName); - this.topicPartitionOffsets = topicPartitionOffsets; - this.timestamp = timestamp; + public PartitionOffsetParam(Long clusterPhyId, String topicName, List specList, List partitionList) { + super(clusterPhyId); + this.offsetSpecList = new ArrayList<>(); + specList.forEach(elem -> offsetSpecList.add(new Triple<>(topicName, elem, partitionList))); + } + + public PartitionOffsetParam(Long clusterPhyId, KSOffsetSpec offsetSpec, List partitionList) { + super(clusterPhyId); + Map> tpMap = new HashMap<>(); + partitionList.forEach(elem -> { + tpMap.putIfAbsent(elem.topic(), new ArrayList<>()); + tpMap.get(elem.topic()).add(elem); + }); + + this.offsetSpecList = tpMap.entrySet().stream().map(elem -> new Triple<>(elem.getKey(), offsetSpec, elem.getValue())).collect(Collectors.toList()); } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/partition/PartitionPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/partition/PartitionPO.java index c54ea851..8740668c 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/partition/PartitionPO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/partition/PartitionPO.java @@ -5,6 +5,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO; import com.xiaojukeji.know.streaming.km.common.constant.Constant; import lombok.Data; +import java.util.Objects; + @Data @TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "partition") public class PartitionPO extends BasePO { @@ -37,4 +39,31 @@ public class PartitionPO extends BasePO { * AR */ private String assignReplicas; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + PartitionPO po = (PartitionPO) o; + return Objects.equals(clusterPhyId, po.clusterPhyId) + && Objects.equals(topicName, po.topicName) + && Objects.equals(partitionId, po.partitionId) + && Objects.equals(leaderBrokerId, po.leaderBrokerId) + && Objects.equals(inSyncReplicas, po.inSyncReplicas) + && Objects.equals(assignReplicas, po.assignReplicas); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), clusterPhyId, topicName, partitionId, leaderBrokerId, inSyncReplicas, assignReplicas); + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java index 16fd7921..9ab0bad1 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java @@ -33,7 +33,7 @@ public class KafkaConstant { public static final Integer DATA_VERSION_ONE = 1; - public static final Integer ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS = 5000; + public static final Integer ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS = 10000; public static final Integer KAFKA_SASL_SCRAM_ITERATIONS = 8192; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java new file mode 100644 index 00000000..84e2363b --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java @@ -0,0 +1,43 @@ +package com.xiaojukeji.know.streaming.km.core.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class DataBaseDataLocalCache { + private static final Cache clusterLatestMetricsCache = Caffeine.newBuilder() + .expireAfterWrite(180, TimeUnit.SECONDS) + .maximumSize(500) + .build(); + + private static final Cache>> partitionsCache = Caffeine.newBuilder() + .expireAfterWrite(60, TimeUnit.SECONDS) + .maximumSize(500) + .build(); + + public static ClusterMetrics getClusterLatestMetrics(Long clusterPhyId) { + return clusterLatestMetricsCache.getIfPresent(clusterPhyId); + } + + public static void putClusterLatestMetrics(Long clusterPhyId, ClusterMetrics metrics) { + clusterLatestMetricsCache.put(clusterPhyId, metrics); + } + + public static Map> getPartitions(Long clusterPhyId) { + return partitionsCache.getIfPresent(clusterPhyId); + } + + public static void putPartitions(Long clusterPhyId, Map> partitionMap) { + partitionsCache.put(clusterPhyId, partitionMap); + } + + /**************************************************** private method ****************************************************/ + + private DataBaseDataLocalCache() { + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java new file mode 100644 index 00000000..95519768 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java @@ -0,0 +1,84 @@ +package com.xiaojukeji.know.streaming.km.core.flusher; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; +import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +@Service +public class DatabaseDataFlusher { + private static final ILog LOGGER = LogFactory.getLog(DatabaseDataFlusher.class); + + @Autowired + private ClusterPhyService clusterPhyService; + + @Autowired + private ClusterMetricService clusterMetricService; + + @Autowired + private PartitionService partitionService; + + @PostConstruct + public void init() { + this.flushPartitionsCache(); + + this.flushClusterLatestMetricsCache(); + } + + @Scheduled(cron="0 0/1 * * * ?") + public void flushPartitionsCache() { + for (ClusterPhy clusterPhy: clusterPhyService.listAllClusters()) { + FutureUtil.quickStartupFutureUtil.submitTask(() -> { + try { + // 更新缓存 + Map> newPartitionMap = new ConcurrentHashMap<>(); + + List partitionList = partitionService.listPartitionByCluster(clusterPhy.getId()); + partitionList.forEach(partition -> { + newPartitionMap.putIfAbsent(partition.getTopicName(), new ArrayList<>()); + newPartitionMap.get(partition.getTopicName()).add(partition); + }); + + DataBaseDataLocalCache.putPartitions(clusterPhy.getId(), newPartitionMap); + } catch (Exception e) { + LOGGER.error("method=flushPartitionsCache||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + } + }); + } + } + + @Scheduled(cron = "0 0/1 * * * ?") + private void flushClusterLatestMetricsCache() { + for (ClusterPhy clusterPhy: clusterPhyService.listAllClusters()) { + FutureUtil.quickStartupFutureUtil.submitTask(() -> { + try { + Result metricsResult = clusterMetricService.getLatestMetricsFromES(clusterPhy.getId(), Collections.emptyList()); + if (metricsResult.hasData()) { + DataBaseDataLocalCache.putClusterLatestMetrics(clusterPhy.getId(), metricsResult.getData()); + return; + } + + LOGGER.error("method=flushClusterLatestMetricsCache||clusterPhyId={}||result={}||msg=failed", clusterPhy.getId(), metricsResult); + } catch (Exception e) { + LOGGER.error("method=flushClusterLatestMetricsCache||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + } + + DataBaseDataLocalCache.putClusterLatestMetrics(clusterPhy.getId(), new ClusterMetrics(clusterPhy.getId())); + }); + } + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java index 52a90ad2..a1320b90 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java @@ -18,6 +18,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BrokerMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; @@ -51,7 +52,6 @@ import java.util.*; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; -import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*; /** * @author didi @@ -365,7 +365,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker Long clusterId = param.getClusterId(); Integer brokerId = param.getBrokerId(); - List partitions = partitionService.listPartitionByBroker(clusterId, brokerId); + List partitions = partitionService.listPartitionFromCacheFirst(clusterId, brokerId); Float logSizeSum = 0f; for(Partition p : partitions) { @@ -387,7 +387,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker logSizeSum += (replicaLogSize == null? 0.0f: replicaLogSize); } catch (Exception e) { LOGGER.error( - "class=BrokerMetricServiceImpl||method=getLogSize||clusterPhyId={}||brokerId={}||topicName={}||partitionId={}||metricName={}||errMsg=exception", + "method=getLogSize||clusterPhyId={}||brokerId={}||topicName={}||partitionId={}||metricName={}||errMsg=exception", clusterId, brokerId, p.getTopicName(), p.getPartitionId(), metric, e.getClass().getName() ); } @@ -432,7 +432,9 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker Float brokerLeaderCount = metricsResult.getData().getMetric( BrokerMetricVersionItems.BROKER_METRIC_LEADERS); - Integer globalLeaderCount = partitionService.getLeaderPartitionSizeByClusterId(clusterId); + Integer globalLeaderCount = (int) partitionService.listPartitionFromCacheFirst(clusterId) + .stream() + .filter(partition -> !partition.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)).count(); Integer globalBrokerCount = brokerService.listAllBrokersFromDB(clusterId).size(); if (globalLeaderCount <= 0 || globalBrokerCount <= 0) { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java index 8cbfc8b8..9d2a7f70 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java @@ -2,8 +2,6 @@ package com.xiaojukeji.know.streaming.km.core.service.cluster.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.Table; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricsClusterPhyDTO; @@ -13,6 +11,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.Kafka import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ClusterMetricParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; @@ -24,6 +23,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; @@ -33,11 +33,11 @@ import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; import com.xiaojukeji.know.streaming.km.common.utils.*; +import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; -import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.job.JobService; @@ -50,32 +50,28 @@ import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import com.xiaojukeji.know.streaming.km.persistence.es.dao.ClusterMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.resource.ResourceType; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; -import javax.annotation.PostConstruct; import javax.management.InstanceNotFoundException; import javax.management.ObjectName; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics.initWithMetrics; import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; -import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.*; /** * @author didi */ -@Service("clusterMetricService") +@Service public class ClusterMetricServiceImpl extends BaseMetricService implements ClusterMetricService { private static final ILog LOGGER = LogFactory.getLog(ClusterMetricServiceImpl.class); @@ -153,22 +149,6 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust @Autowired private JobService jobService; - @Autowired - private ClusterPhyService clusterPhyService; - - private final Cache clusterLatestMetricsCache = Caffeine.newBuilder() - .expireAfterWrite(180, TimeUnit.SECONDS) - .maximumSize(1000) - .build(); - - @PostConstruct - @Scheduled(cron = "0 0/1 * * * ?") - private void flushClusterLatestMetricsCache() { - for (ClusterPhy clusterPhy: clusterPhyService.listAllClusters()) { - FutureUtil.quickStartupFutureUtil.submitTask(() -> this.updateCacheAndGetMetrics(clusterPhy.getId())); - } - } - @Override protected VersionItemTypeEnum getVersionItemType() { return VersionItemTypeEnum.METRIC_CLUSTER; @@ -283,7 +263,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust @Override public ClusterMetrics getLatestMetricsFromCache(Long clusterPhyId) { - ClusterMetrics metrics = clusterLatestMetricsCache.getIfPresent(clusterPhyId); + ClusterMetrics metrics = DataBaseDataLocalCache.getClusterLatestMetrics(clusterPhyId); if (metrics != null) { return metrics; } @@ -335,24 +315,6 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust /**************************************************** private method ****************************************************/ - private ClusterMetrics updateCacheAndGetMetrics(Long clusterPhyId) { - try { - Result metricsResult = this.getLatestMetricsFromES(clusterPhyId, Arrays.asList()); - if (metricsResult.hasData()) { - LOGGER.info("method=updateCacheAndGetMetrics||clusterPhyId={}||msg=success", clusterPhyId); - clusterLatestMetricsCache.put(clusterPhyId, metricsResult.getData()); - - return metricsResult.getData(); - } - } catch (Exception e) { - LOGGER.error("method=updateCacheAndGetMetrics||clusterPhyId={}||errMsg=exception!", clusterPhyId, e); - } - - ClusterMetrics clusterMetrics = new ClusterMetrics(clusterPhyId); - clusterLatestMetricsCache.put(clusterPhyId, clusterMetrics); - return clusterMetrics; - } - /** * doNothing */ @@ -382,9 +344,28 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust /** * 获取集群的 messageSize */ - private Result getMessageSize(VersionItemParam metricParam){ + private Result getMessageSize(VersionItemParam metricParam) { ClusterMetricParam param = (ClusterMetricParam)metricParam; - return getMetricFromKafkaByTotalTopics(param.getClusterId(), param.getMetric(), TOPIC_METRIC_MESSAGES); + + Result> beginOffsetMapResult = partitionService.getAllPartitionOffsetFromKafka(param.getClusterId(), KSOffsetSpec.earliest()); + + Result> endOffsetMapResult = partitionService.getAllPartitionOffsetFromKafka(param.getClusterId(), KSOffsetSpec.latest()); + if (endOffsetMapResult.failed() || beginOffsetMapResult.failed()) { + // 有一个失败,直接返回失败 + return Result.buildFromIgnoreData(endOffsetMapResult); + } + + long msgCount = 0; + for (Map.Entry entry: endOffsetMapResult.getData().entrySet()) { + Long beginOffset = beginOffsetMapResult.getData().get(entry.getKey()); + if (beginOffset == null) { + continue; + } + + msgCount += Math.max(0, entry.getValue() - beginOffset); + } + + return Result.buildSuc(initWithMetrics(param.getClusterId(), param.getMetric(), (float)msgCount)); } /** @@ -406,9 +387,9 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust private Result getPartitionSize(VersionItemParam metricParam){ ClusterMetricParam param = (ClusterMetricParam)metricParam; - String metric = param.getMetric(); - Long clusterId = param.getClusterId(); - Integer partitionNu = partitionService.getPartitionSizeByClusterId(clusterId); + String metric = param.getMetric(); + Long clusterId = param.getClusterId(); + Integer partitionNu = partitionService.listPartitionFromCacheFirst(clusterId).size(); return Result.buildSuc(initWithMetrics(clusterId, metric, partitionNu.floatValue())); } @@ -421,7 +402,10 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust String metric = param.getMetric(); Long clusterId = param.getClusterId(); - Integer noLeaders = partitionService.getNoLeaderPartitionSizeByClusterId(clusterId); + Integer noLeaders = (int) partitionService.listPartitionFromCacheFirst(clusterId) + .stream() + .filter(partition -> partition.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) + .count(); return Result.buildSuc(initWithMetrics(clusterId, metric, noLeaders.floatValue())); } @@ -747,7 +731,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust /** * 从所有的 Topic 的指标中加总聚合得到集群的指标 */ - private Result getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric){ + private Result getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric) { List topics = topicService.listTopicsFromCacheFirst(clusterId); float sumMetricValue = 0f; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java index 0299a67f..c9d65468 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java @@ -6,6 +6,7 @@ import com.google.common.collect.Table; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricGroupPartitionDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.GroupMetricParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -24,13 +25,11 @@ import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateSer import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; -import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*; @@ -192,31 +191,29 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe metricsList.add(metrics); } - for (String topicName: groupOffsetMap.keySet().stream().map(elem -> elem.topic()).collect(Collectors.toSet())) { - Result> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterId, topicName, OffsetSpec.latest(), null); - if (!offsetMapResult.hasData()) { - // 这个分区获取失败 + Result> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterId, new ArrayList<>(groupOffsetMap.keySet()), KSOffsetSpec.latest()); + if (!offsetMapResult.hasData()) { + // 获取失败 + return Result.buildSuc(metricsList); + } + + for (Map.Entry entry: offsetMapResult.getData().entrySet()) { + // 组织 GROUP_METRIC_LOG_END_OFFSET 指标 + GroupMetrics metrics = new GroupMetrics(clusterId, entry.getKey().partition(), entry.getKey().topic(), groupName, false); + metrics.putMetric(GROUP_METRIC_LOG_END_OFFSET, entry.getValue().floatValue()); + metricsList.add(metrics); + + Long groupOffset = groupOffsetMap.get(entry.getKey()); + if (groupOffset == null) { + // 不存在,则直接跳过 continue; } - for (Map.Entry entry: offsetMapResult.getData().entrySet()) { - // 组织 GROUP_METRIC_LOG_END_OFFSET 指标 - GroupMetrics metrics = new GroupMetrics(clusterId, entry.getKey().partition(), entry.getKey().topic(), groupName, false); - metrics.putMetric(GROUP_METRIC_LOG_END_OFFSET, entry.getValue().floatValue()); - metricsList.add(metrics); + // 组织 GROUP_METRIC_LAG 指标 + GroupMetrics groupMetrics = new GroupMetrics(clusterId, entry.getKey().partition(), entry.getKey().topic(), groupName, false); + groupMetrics.putMetric(GROUP_METRIC_LAG, Math.max(0L, entry.getValue() - groupOffset) * 1.0f); - Long groupOffset = groupOffsetMap.get(entry.getKey()); - if (groupOffset == null) { - // 不存在,则直接跳过 - continue; - } - - // 组织 GROUP_METRIC_LAG 指标 - GroupMetrics groupMetrics = new GroupMetrics(clusterId, entry.getKey().partition(), entry.getKey().topic(), groupName, false); - groupMetrics.putMetric(GROUP_METRIC_LAG, Math.max(0L, entry.getValue() - groupOffset) * 1.0f); - - metricsList.add(groupMetrics); - } + metricsList.add(groupMetrics); } return Result.buildSuc(metricsList); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionMetricService.java index 6d4bd777..e48f3131 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionMetricService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionMetricService.java @@ -26,6 +26,5 @@ public interface PartitionMetricService { * 从ES获取指标 */ PartitionMetrics getLatestMetricsFromES(Long clusterPhyId, String topic, Integer brokerId, Integer partitionId, List metricNameList); - Result> getLatestMetricsFromES(Long clusterPhyId, String topicName, List metricNameList); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionService.java index ae68dccf..9b5b1e5a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/PartitionService.java @@ -1,10 +1,11 @@ package com.xiaojukeji.know.streaming.km.core.service.partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.po.partition.PartitionPO; -import org.apache.kafka.clients.admin.OffsetSpec; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import org.apache.kafka.common.TopicPartition; import java.util.List; @@ -12,49 +13,40 @@ import java.util.Map; import java.util.Set; public interface PartitionService { + /** + * 从Kafka获取分区信息 + */ Result>> listPartitionsFromKafka(ClusterPhy clusterPhy); - Result> listPartitionsFromKafka(ClusterPhy clusterPhy, String topicName); + /** + * 从DB获取分区信息 + */ List listPartitionByCluster(Long clusterPhyId); List listPartitionPOByCluster(Long clusterPhyId); - - /** - * Topic下的分区列表 - */ List listPartitionByTopic(Long clusterPhyId, String topicName); - - - /** - * Broker下的分区列表 - */ - List listPartitionByBroker(Long clusterPhyId, Integer brokerId); - - /** - * 获取具体分区信息 - */ Partition getPartitionByTopicAndPartitionId(Long clusterPhyId, String topicName, Integer partitionId); - - - /**************************************************** 优先从缓存获取分区信息 ****************************************************/ - + + /** + * 优先从缓存获取分区信息,缓存中没有时,从DB获取分区信息 + */ + List listPartitionFromCacheFirst(Long clusterPhyId); + List listPartitionFromCacheFirst(Long clusterPhyId, Integer brokerId); List listPartitionFromCacheFirst(Long clusterPhyId, String topicName); Partition getPartitionFromCacheFirst(Long clusterPhyId, String topicName, Integer partitionId); /** - * 获取集群下分区数 + * 获取分区Offset信息 */ - Integer getPartitionSizeByClusterId(Long clusterPhyId); - - Integer getLeaderPartitionSizeByClusterId(Long clusterPhyId); - - Integer getNoLeaderPartitionSizeByClusterId(Long clusterPhyId); - - Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, OffsetSpec offsetSpec, Long timestamp); - - Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, Integer partitionId, OffsetSpec offsetSpec, Long timestamp); + Result> getAllPartitionOffsetFromKafka(Long clusterPhyId, KSOffsetSpec offsetSpec); + Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, KSOffsetSpec offsetSpec); + Result/*begin offset*/, Map/*end offset*/>> getPartitionBeginAndEndOffsetFromKafka(Long clusterPhyId, String topicName); + Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, Integer partitionId, KSOffsetSpec offsetSpec); + Result> getPartitionOffsetFromKafka(Long clusterPhyId, List tpList, KSOffsetSpec offsetSpec); + /** + * 修改分区信息 + */ int updatePartitions(Long clusterPhyId, String topicName, List kafkaPartitionList, List dbPartitionList); - void deletePartitionsIfNotIn(Long clusterPhyId, Set topicNameSet); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java index e6ce6a62..63e9bc36 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.partition.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.TopicMetricParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; @@ -15,6 +16,7 @@ import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistExcept import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; @@ -22,7 +24,6 @@ import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; import com.xiaojukeji.know.streaming.km.persistence.es.dao.PartitionMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -176,50 +177,45 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par Map metricsMap = new HashMap<>(); - // begin offset 指标 - Result> beginOffsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, OffsetSpec.earliest(), null); - if (beginOffsetMapResult.hasData()) { - for (Map.Entry entry: beginOffsetMapResult.getData().entrySet()) { - Partition partition = partitionMap.get(entry.getKey().partition()); - PartitionMetrics metrics = metricsMap.getOrDefault( - entry.getKey().partition(), - new PartitionMetrics(clusterPhyId, topicName, partition != null? partition.getLeaderBrokerId(): KafkaConstant.NO_LEADER, entry.getKey().partition()) - ); - - metrics.putMetric(PARTITION_METRIC_LOG_START_OFFSET, entry.getValue().floatValue()); - metricsMap.put(entry.getKey().partition(), metrics); - } - } else { + // offset 指标 + Result, Map>> offsetResult = partitionService.getPartitionBeginAndEndOffsetFromKafka(clusterPhyId, topicName); + if (offsetResult.failed()) { LOGGER.warn( - "method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||resultMsg={}||msg=get begin offset failed", - clusterPhyId, topicName, beginOffsetMapResult.getMessage() + "method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||result={}||msg=get offset failed", + clusterPhyId, topicName, offsetResult ); + + return Result.buildFromIgnoreData(offsetResult); + } + + // begin offset 指标 + for (Map.Entry entry: offsetResult.getData().v1().entrySet()) { + Partition partition = partitionMap.get(entry.getKey().partition()); + PartitionMetrics metrics = metricsMap.getOrDefault( + entry.getKey().partition(), + new PartitionMetrics(clusterPhyId, topicName, partition != null? partition.getLeaderBrokerId(): KafkaConstant.NO_LEADER, entry.getKey().partition()) + ); + + metrics.putMetric(PARTITION_METRIC_LOG_START_OFFSET, entry.getValue().floatValue()); + metricsMap.put(entry.getKey().partition(), metrics); } // end offset 指标 - Result> endOffsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, OffsetSpec.latest(), null); - if (endOffsetMapResult.hasData()) { - for (Map.Entry entry: endOffsetMapResult.getData().entrySet()) { - Partition partition = partitionMap.get(entry.getKey().partition()); - PartitionMetrics metrics = metricsMap.getOrDefault( - entry.getKey().partition(), - new PartitionMetrics(clusterPhyId, topicName, partition != null? partition.getLeaderBrokerId(): KafkaConstant.NO_LEADER, entry.getKey().partition()) - ); - - metrics.putMetric(PARTITION_METRIC_LOG_END_OFFSET, entry.getValue().floatValue()); - metricsMap.put(entry.getKey().partition(), metrics); - } - } else { - LOGGER.warn( - "method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||resultMsg={}||msg=get end offset failed", - clusterPhyId, topicName, endOffsetMapResult.getMessage() + for (Map.Entry entry: offsetResult.getData().v2().entrySet()) { + Partition partition = partitionMap.get(entry.getKey().partition()); + PartitionMetrics metrics = metricsMap.getOrDefault( + entry.getKey().partition(), + new PartitionMetrics(clusterPhyId, topicName, partition != null? partition.getLeaderBrokerId(): KafkaConstant.NO_LEADER, entry.getKey().partition()) ); + + metrics.putMetric(PARTITION_METRIC_LOG_END_OFFSET, entry.getValue().floatValue()); + metricsMap.put(entry.getKey().partition(), metrics); } // messages 指标 - if (endOffsetMapResult.hasData() && beginOffsetMapResult.hasData()) { - for (Map.Entry entry: endOffsetMapResult.getData().entrySet()) { - Long beginOffset = beginOffsetMapResult.getData().get(entry.getKey()); + if (!ValidateUtils.isEmptyMap(offsetResult.getData().v1()) && !ValidateUtils.isEmptyMap(offsetResult.getData().v2())) { + for (Map.Entry entry: offsetResult.getData().v2().entrySet()) { + Long beginOffset = offsetResult.getData().v1().get(entry.getKey()); if (beginOffset == null) { continue; } @@ -235,8 +231,8 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par } } else { LOGGER.warn( - "method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||endResultMsg={}||beginResultMsg={}||msg=get messages failed", - clusterPhyId, topicName, endOffsetMapResult.getMessage(), beginOffsetMapResult.getMessage() + "method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||offsetData={}||msg=get messages failed", + clusterPhyId, topicName, ConvertUtil.obj2Json(offsetResult.getData()) ); } @@ -283,7 +279,6 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par } catch (InstanceNotFoundException e) { // ignore - continue; } catch (Exception e) { LOGGER.error( "method=getMetricFromJmx||clusterPhyId={}||topicName={}||partitionId={}||leaderBrokerId={}||metricName={}||msg={}", @@ -326,7 +321,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par // 4、获取jmx指标 String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName() + ",topic=" + topicName), jmxInfo.getJmxAttribute()).toString(); - Long leaderCount = partitionList.stream().filter(elem -> elem.getLeaderBrokerId().equals(partition.getLeaderBrokerId())).count(); + long leaderCount = partitionList.stream().filter(elem -> elem.getLeaderBrokerId().equals(partition.getLeaderBrokerId())).count(); if (leaderCount <= 0) { // leader已经切换走了 continue; @@ -338,7 +333,6 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par } catch (InstanceNotFoundException e) { // ignore - continue; } catch (Exception e) { LOGGER.error( "method=getTopicAvgMetricFromJmx||clusterPhyId={}||topicName={}||partitionId={}||leaderBrokerId={}||metricName={}||msg={}", diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java index 62d24feb..a2094028 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java @@ -3,9 +3,8 @@ package com.xiaojukeji.know.streaming.km.core.service.partition.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.PartitionOffsetParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; @@ -20,7 +19,10 @@ import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; +import com.xiaojukeji.know.streaming.km.common.utils.Triple; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap; import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionState; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; @@ -44,7 +46,6 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.time.Duration; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -55,7 +56,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT /** * @author didi */ -@Service("partitionService") +@Service public class PartitionServiceImpl extends BaseVersionControlService implements PartitionService { private static final ILog log = LogFactory.getLog(PartitionServiceImpl.class); @@ -78,15 +79,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P return SERVICE_OP_PARTITION; } - private final Cache> partitionsCache = Caffeine.newBuilder() - .expireAfterWrite(90, TimeUnit.SECONDS) - .maximumSize(1000) - .build(); - @PostConstruct private void init() { - registerVCHandler(PARTITION_OFFSET_GET, V_0_10_0_0, V_0_11_0_0, "getPartitionOffsetFromKafkaConsumerClient", this::getPartitionOffsetFromKafkaConsumerClient); - registerVCHandler(PARTITION_OFFSET_GET, V_0_11_0_0, V_MAX, "getPartitionOffsetFromKafkaAdminClient", this::getPartitionOffsetFromKafkaAdminClient); + registerVCHandler(PARTITION_OFFSET_GET, V_0_10_0_0, V_0_11_0_0, "batchGetPartitionOffsetFromKafkaConsumerClient", this::batchGetPartitionOffsetFromKafkaConsumerClient); + registerVCHandler(PARTITION_OFFSET_GET, V_0_11_0_0, V_MAX, "batchGetPartitionOffsetFromKafkaAdminClient", this::batchGetPartitionOffsetFromKafkaAdminClient); } @Override @@ -133,17 +129,32 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P } @Override - public List listPartitionFromCacheFirst(Long clusterPhyId, String topicName) { - String clusterPhyIdAndTopicKey = MsgConstant.getClusterTopicKey(clusterPhyId, topicName); - List partitionList = partitionsCache.getIfPresent(clusterPhyIdAndTopicKey); + public List listPartitionFromCacheFirst(Long clusterPhyId) { + Map> partitionMap = DataBaseDataLocalCache.getPartitions(clusterPhyId); - if (!ValidateUtils.isNull(partitionList)) { - return partitionList; + if (partitionMap != null) { + return partitionMap.values().stream().collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll); } - partitionList = this.listPartitionByTopic(clusterPhyId, topicName); - partitionsCache.put(clusterPhyIdAndTopicKey, partitionList); - return partitionList; + return this.listPartitionByCluster(clusterPhyId); + } + + @Override + public List listPartitionFromCacheFirst(Long clusterPhyId, Integer brokerId) { + List partitionList = this.listPartitionFromCacheFirst(clusterPhyId); + + return partitionList.stream().filter(elem -> elem.getAssignReplicaList().contains(brokerId)).collect(Collectors.toList()); + } + + @Override + public List listPartitionFromCacheFirst(Long clusterPhyId, String topicName) { + Map> partitionMap = DataBaseDataLocalCache.getPartitions(clusterPhyId); + + if (partitionMap != null) { + return partitionMap.getOrDefault(topicName, new ArrayList<>()); + } + + return this.listPartitionByTopic(clusterPhyId, topicName); } @Override @@ -162,16 +173,6 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P return null; } - @Override - public List listPartitionByBroker(Long clusterPhyId, Integer brokerId) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(PartitionPO::getClusterPhyId, clusterPhyId); - - List partitionList = this.convert2PartitionList(partitionDAO.selectList(lambdaQueryWrapper)); - - return partitionList.stream().filter(elem -> elem.getAssignReplicaList().contains(brokerId)).collect(Collectors.toList()); - } - @Override public Partition getPartitionByTopicAndPartitionId(Long clusterPhyId, String topicName, Integer partitionId) { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); @@ -183,71 +184,122 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P } @Override - public Integer getPartitionSizeByClusterId(Long clusterPhyId) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(PartitionPO::getClusterPhyId, clusterPhyId); - - return partitionDAO.selectCount(lambdaQueryWrapper); - } - - @Override - public Integer getLeaderPartitionSizeByClusterId(Long clusterPhyId) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(PartitionPO::getClusterPhyId, clusterPhyId); - lambdaQueryWrapper.ne(PartitionPO::getLeaderBrokerId, -1); - - return partitionDAO.selectCount(lambdaQueryWrapper); - } - - @Override - public Integer getNoLeaderPartitionSizeByClusterId(Long clusterPhyId) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(PartitionPO::getClusterPhyId, clusterPhyId); - lambdaQueryWrapper.eq(PartitionPO::getLeaderBrokerId, -1); - - return partitionDAO.selectCount(lambdaQueryWrapper); - } - - @Override - public Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, OffsetSpec offsetSpec, Long timestamp) { - Map topicPartitionOffsets = new HashMap<>(); - - List partitionList = this.listPartitionByTopic(clusterPhyId, topicName); - if (partitionList == null || partitionList.isEmpty()) { - // Topic不存在 - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(clusterPhyId, topicName)); - } - - partitionList.stream() + public Result> getAllPartitionOffsetFromKafka(Long clusterPhyId, KSOffsetSpec offsetSpec) { + List tpList = this.listPartitionFromCacheFirst(clusterPhyId).stream() .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) - .forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec)); - - if (topicPartitionOffsets.isEmpty()) { - // 所有分区no-leader - return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FAILED, MsgConstant.getPartitionNoLeader(clusterPhyId, topicName)); - } + .map(elem -> new TopicPartition(elem.getTopicName(), elem.getPartitionId())) + .collect(Collectors.toList()); try { - return (Result>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp)); + Result>>> listResult = + (Result>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, offsetSpec, tpList)); + + return this.convert2OffsetMapResult(listResult); } catch (VCHandlerNotExistException e) { return Result.buildFailure(VC_HANDLE_NOT_EXIST); } } @Override - public Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, Integer partitionId, OffsetSpec offsetSpec, Long timestamp) { - if (partitionId == null) { - return this.getPartitionOffsetFromKafka(clusterPhyId, topicName, offsetSpec, timestamp); + public Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, KSOffsetSpec offsetSpec) { + List tpList = this.listPartitionFromCacheFirst(clusterPhyId, topicName).stream() + .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) + .map(elem -> new TopicPartition(topicName, elem.getPartitionId())) + .collect(Collectors.toList()); + + if (tpList.isEmpty()) { + // 所有分区no-leader + return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FAILED, MsgConstant.getPartitionNoLeader(clusterPhyId, topicName)); } - Map topicPartitionOffsets = new HashMap<>(); - this.listPartitionByTopic(clusterPhyId, topicName) - .stream() - .filter(elem -> elem.getPartitionId().equals(partitionId)) - .forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec)); + try { + Result>>> listResult = + (Result>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, offsetSpec, tpList)); + + return this.convert2OffsetMapResult(listResult); + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + @Override + public Result, Map>> getPartitionBeginAndEndOffsetFromKafka(Long clusterPhyId, String topicName) { + List tpList = this.listPartitionFromCacheFirst(clusterPhyId, topicName).stream() + .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) + .map(elem -> new TopicPartition(topicName, elem.getPartitionId())) + .collect(Collectors.toList()); + + if (tpList.isEmpty()) { + // 所有分区no-leader + return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FAILED, MsgConstant.getPartitionNoLeader(clusterPhyId, topicName)); + } try { - return (Result>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp)); + Result>>> listResult = + (Result>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, Arrays.asList(KSOffsetSpec.earliest(), KSOffsetSpec.latest()), tpList)); + if (listResult.failed()) { + return Result.buildFromIgnoreData(listResult); + } else if (ValidateUtils.isEmptyList(listResult.getData())) { + return Result.buildSuc(new Tuple, Map>(new HashMap<>(0), new HashMap<>(0))); + } + + Tuple, Map> tuple = new Tuple<>(new HashMap<>(0), new HashMap<>(0)); + listResult.getData().forEach(elem -> { + if (elem.getV1() instanceof KSOffsetSpec.KSEarliestSpec) { + tuple.setV1(elem.v2()); + } else if (elem.v1() instanceof KSOffsetSpec.KSLatestSpec) { + tuple.setV2(elem.v2()); + } + }); + + return Result.buildSuc(tuple); + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + @Override + public Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, Integer partitionId, KSOffsetSpec offsetSpec) { + if (partitionId == null) { + return this.getPartitionOffsetFromKafka(clusterPhyId, topicName, offsetSpec); + } + + List tpList = this.listPartitionFromCacheFirst(clusterPhyId, topicName).stream() + .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) + .map(elem -> new TopicPartition(topicName, elem.getPartitionId())) + .collect(Collectors.toList()); + + try { + Result>>> listResult = + (Result>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, offsetSpec, tpList)); + + return this.convert2OffsetMapResult(listResult); + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + @Override + public Result> getPartitionOffsetFromKafka(Long clusterPhyId, List tpList, KSOffsetSpec offsetSpec) { + // 集群具有leader的分区列表 + Set existLeaderTPSet = this.listPartitionFromCacheFirst(clusterPhyId).stream() + .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) + .map(elem -> new TopicPartition(elem.getTopicName(), elem.getPartitionId())) + .collect(Collectors.toSet()); + + List existLeaderTPList = tpList.stream().filter(elem -> existLeaderTPSet.contains(elem)).collect(Collectors.toList()); + if (existLeaderTPList.isEmpty()) { + return Result.buildSuc(new HashMap<>(0)); + } + + try { + Result>>> listResult = (Result>>>) doVCHandler( + clusterPhyId, + PARTITION_OFFSET_GET, + new PartitionOffsetParam(clusterPhyId, offsetSpec, existLeaderTPList) + ); + + return this.convert2OffsetMapResult(listResult); } catch (VCHandlerNotExistException e) { return Result.buildFailure(VC_HANDLE_NOT_EXIST); } @@ -267,6 +319,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P } PartitionPO presentPartitionPO = this.convert2PartitionPO(partition); + if (presentPartitionPO.equals(dbPartitionPO)) { + // 数据一样,不进行DB操作 + continue; + } presentPartitionPO.setId(dbPartitionPO.getId()); partitionDAO.updateById(presentPartitionPO); } @@ -306,64 +362,137 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P /**************************************************** private method ****************************************************/ - - private Result> getPartitionOffsetFromKafkaAdminClient(VersionItemParam itemParam) { + private Result>>> batchGetPartitionOffsetFromKafkaAdminClient(VersionItemParam itemParam) { PartitionOffsetParam offsetParam = (PartitionOffsetParam) itemParam; + if (offsetParam.getOffsetSpecList().isEmpty()) { + return Result.buildSuc(Collections.emptyList()); + } + + List> resultList = new ArrayList<>(); + for (Triple> elem: offsetParam.getOffsetSpecList()) { + Result offsetsResult = this.getPartitionOffsetFromKafkaAdminClient( + offsetParam.getClusterPhyId(), + elem.v1(), + elem.v2(), + elem.v3() + ); + + if (offsetsResult.failed() && offsetParam.getOffsetSpecList().size() == 1) { + return Result.buildFromIgnoreData(offsetsResult); + } + + if (offsetsResult.hasData()) { + resultList.add(new Triple<>(elem.v1(), elem.v2(), offsetsResult.getData())); + } + } + + List>> offsetMapList = new ArrayList<>(); + for (Triple triple: resultList) { + try { + Map offsetMap = new HashMap<>(); + triple.v3().all().get().entrySet().stream().forEach(elem -> offsetMap.put(elem.getKey(), elem.getValue().offset())); + + offsetMapList.add(new Tuple<>(triple.v2(), offsetMap)); + } catch (Exception e) { + log.error( + "method=batchGetPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||offsetSpec={}||errMsg=exception!", + offsetParam.getClusterPhyId(), triple.v1(), triple.v2(), e + ); + } + } + + return Result.buildSuc(offsetMapList); + } + + private Result getPartitionOffsetFromKafkaAdminClient(Long clusterPhyId, String topicName, KSOffsetSpec offsetSpec, List tpList) { try { - AdminClient adminClient = kafkaAdminClient.getClient(offsetParam.getClusterPhyId()); + AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId); - ListOffsetsResult listOffsetsResult = adminClient.listOffsets(offsetParam.getTopicPartitionOffsets(), new ListOffsetsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)); + Map kafkaOffsetSpecMap = new HashMap<>(tpList.size()); + tpList.forEach(elem -> { + if (offsetSpec instanceof KSOffsetSpec.KSEarliestSpec) { + kafkaOffsetSpecMap.put(elem, OffsetSpec.earliest()); + } else if (offsetSpec instanceof KSOffsetSpec.KSLatestSpec) { + kafkaOffsetSpecMap.put(elem, OffsetSpec.latest()); + } else if (offsetSpec instanceof KSOffsetSpec.KSTimestampSpec) { + kafkaOffsetSpecMap.put(elem, OffsetSpec.forTimestamp(((KSOffsetSpec.KSTimestampSpec) offsetSpec).timestamp())); + } + }); - Map offsetMap = new HashMap<>(); - listOffsetsResult.all().get().entrySet().stream().forEach(elem -> offsetMap.put(elem.getKey(), elem.getValue().offset())); + ListOffsetsResult listOffsetsResult = adminClient.listOffsets(kafkaOffsetSpecMap, new ListOffsetsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)); - return Result.buildSuc(offsetMap); + return Result.buildSuc(listOffsetsResult); } catch (NotExistException nee) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId())); + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); } catch (Exception e) { log.error( - "class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||errMsg=exception!", - offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e + "method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||errMsg=exception!", + clusterPhyId, topicName, e ); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } } - private Result> getPartitionOffsetFromKafkaConsumerClient(VersionItemParam itemParam) { + private Result>>> batchGetPartitionOffsetFromKafkaConsumerClient(VersionItemParam itemParam) { + PartitionOffsetParam offsetParam = (PartitionOffsetParam) itemParam; + if (offsetParam.getOffsetSpecList().isEmpty()) { + return Result.buildSuc(Collections.emptyList()); + } + + List>> offsetMapList = new ArrayList<>(); + for (Triple> triple: offsetParam.getOffsetSpecList()) { + Result> subOffsetMapResult = this.getPartitionOffsetFromKafkaConsumerClient( + offsetParam.getClusterPhyId(), + triple.v1(), + triple.v2(), + triple.v3() + ); + + if (subOffsetMapResult.failed() && offsetParam.getOffsetSpecList().size() == 1) { + return Result.buildFromIgnoreData(subOffsetMapResult); + } + + if (subOffsetMapResult.hasData()) { + offsetMapList.add(new Tuple<>(triple.v2(), subOffsetMapResult.getData())); + } + } + + return Result.buildSuc(offsetMapList); + } + + private Result> getPartitionOffsetFromKafkaConsumerClient(Long clusterPhyId, String topicName, KSOffsetSpec offsetSpec, List tpList) { KafkaConsumer kafkaConsumer = null; - PartitionOffsetParam offsetParam = (PartitionOffsetParam) itemParam; try { - if (ValidateUtils.isEmptyMap(offsetParam.getTopicPartitionOffsets())) { + if (ValidateUtils.isEmptyList(tpList)) { return Result.buildSuc(new HashMap<>()); } - kafkaConsumer = kafkaConsumerClient.getClient(offsetParam.getClusterPhyId()); + kafkaConsumer = kafkaConsumerClient.getClient(clusterPhyId); - OffsetSpec offsetSpec = new ArrayList<>(offsetParam.getTopicPartitionOffsets().values()).get(0); - if (offsetSpec instanceof OffsetSpec.LatestSpec) { + if (offsetSpec instanceof KSOffsetSpec.KSLatestSpec) { return Result.buildSuc( kafkaConsumer.endOffsets( - offsetParam.getTopicPartitionOffsets().keySet(), + tpList, Duration.ofMillis(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) ) ); } - if (offsetSpec instanceof OffsetSpec.EarliestSpec) { + if (offsetSpec instanceof KSOffsetSpec.KSEarliestSpec) { return Result.buildSuc( kafkaConsumer.beginningOffsets( - offsetParam.getTopicPartitionOffsets().keySet(), + tpList, Duration.ofMillis(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) ) ); } - if (offsetSpec instanceof OffsetSpec.TimestampSpec) { + if (offsetSpec instanceof KSOffsetSpec.KSTimestampSpec) { // 按照时间进行查找 Map timestampMap = new HashMap<>(); - offsetParam.getTopicPartitionOffsets().entrySet().stream().forEach(elem -> timestampMap.put(elem.getKey(), offsetParam.getTimestamp())); + tpList.forEach(elem -> timestampMap.put(elem, ((KSOffsetSpec.KSTimestampSpec) offsetSpec).timestamp())); Map offsetMetadataMap = kafkaConsumer.offsetsForTimes( timestampMap, @@ -377,17 +506,17 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "OffsetSpec type illegal"); } catch (NotExistException nee) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId())); + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); } catch (Exception e) { log.error( - "class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||topicName={}||errMsg=exception!", - offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e + "method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||topicName={}||errMsg=exception!", + clusterPhyId, topicName, e ); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } finally { if (kafkaConsumer != null) { - kafkaConsumerClient.returnClient(offsetParam.getClusterPhyId(), kafkaConsumer); + kafkaConsumerClient.returnClient(clusterPhyId, kafkaConsumer); } } } @@ -411,7 +540,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P return Result.buildSuc(partitionMap); } catch (Exception e) { - log.error("class=PartitionServiceImpl||method=getPartitionsFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); + log.error("method=getPartitionsFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } @@ -430,7 +559,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P } return Result.buildSuc(partitionMap); } catch (Exception e) { - log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); + log.error("method=getPartitionsFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } @@ -447,7 +576,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P TopicDescription description = describeTopicsResult.all().get().get(topicName); return Result.buildSuc(PartitionConverter.convert2PartitionList(clusterPhy.getId(), description)); }catch (Exception e) { - log.error("class=PartitionServiceImpl||method=getPartitionsFromAdminClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e); + log.error("method=getPartitionsFromAdminClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e); return Result.buildFailure(ResultStatus.KAFKA_OPERATE_FAILED); } } @@ -470,7 +599,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P } return Result.buildSuc(partitionList); } catch (Exception e) { - log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e); + log.error("method=getPartitionsFromZKClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } } @@ -482,21 +611,24 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P List partitionList = new ArrayList<>(); for (PartitionPO po: poList) { - if(null != po){partitionList.add(convert2Partition(po));} + if(null != po) { + partitionList.add(this.convert2Partition(po)); + } } return partitionList; } - private List convert2PartitionPOList(List partitionList) { - if (partitionList == null) { - return new ArrayList<>(); + private Result> convert2OffsetMapResult(Result>>> listResult) { + if (listResult.failed()) { + return Result.buildFromIgnoreData(listResult); + } else if (ValidateUtils.isEmptyList(listResult.getData())) { + return Result.buildSuc(new HashMap<>(0)); } - List poList = new ArrayList<>(); - for (Partition partition: partitionList) { - poList.add(this.convert2PartitionPO(partition)); - } - return poList; + Map offsetMap = new HashMap<>(); + listResult.getData().forEach(elem -> offsetMap.putAll(elem.v2())); + + return Result.buildSuc(offsetMap); } private PartitionPO convert2PartitionPO(Partition partition) { diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metrics/ReplicaMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metrics/ReplicaMetricCollectorTask.java deleted file mode 100644 index 80cc2644..00000000 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metrics/ReplicaMetricCollectorTask.java +++ /dev/null @@ -1,32 +0,0 @@ -//package com.xiaojukeji.know.streaming.km.task.kafka.metrics; -// -//import com.didiglobal.logi.job.annotation.Task; -//import com.didiglobal.logi.job.common.TaskResult; -//import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -//import com.xiaojukeji.know.streaming.km.collector.metric.kafka.ReplicaMetricCollector; -//import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -//import lombok.extern.slf4j.Slf4j; -//import org.springframework.beans.factory.annotation.Autowired; -// -///** -// * @author didi -// */ -//@Slf4j -//@Task(name = "ReplicaMetricCollectorTask", -// description = "Replica指标采集任务", -// cron = "0 0/1 * * * ? *", -// autoRegister = true, -// consensual = ConsensualEnum.BROADCAST, -// timeout = 2 * 60) -//public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { -// -// @Autowired -// private ReplicaMetricCollector replicaMetricCollector; -// -// @Override -// public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { -// replicaMetricCollector.collectMetrics(clusterPhy); -// -// return TaskResult.SUCCESS; -// } -//}