[Optimize]指标采集性能优化-part1(#726)

This commit is contained in:
zengqiao
2022-12-04 15:31:46 +08:00
committed by EricZeng
parent 921161d6d0
commit 2c82baf9fc
18 changed files with 616 additions and 326 deletions

View File

@@ -12,6 +12,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.group.Group;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic; import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember; import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopicMember;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
@@ -42,7 +43,6 @@ import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.Group
import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO;
import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -274,16 +274,16 @@ public class GroupManagerImpl implements GroupManager {
))); )));
} }
OffsetSpec offsetSpec = null; KSOffsetSpec offsetSpec = null;
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) { if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) {
offsetSpec = OffsetSpec.forTimestamp(dto.getTimestamp()); offsetSpec = KSOffsetSpec.forTimestamp(dto.getTimestamp());
} else if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getResetType()) { } else if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getResetType()) {
offsetSpec = OffsetSpec.earliest(); offsetSpec = KSOffsetSpec.earliest();
} else { } else {
offsetSpec = OffsetSpec.latest(); offsetSpec = KSOffsetSpec.latest();
} }
return partitionService.getPartitionOffsetFromKafka(dto.getClusterId(), dto.getTopicName(), offsetSpec, dto.getTimestamp()); return partitionService.getPartitionOffsetFromKafka(dto.getClusterId(), dto.getTopicName(), offsetSpec);
} }
private List<GroupTopicOverviewVO> convert2GroupTopicOverviewVOList(List<GroupMemberPO> poList, List<GroupMetrics> metricsList) { private List<GroupTopicOverviewVO> convert2GroupTopicOverviewVOList(List<GroupMemberPO> poList, List<GroupMetrics> metricsList) {

View File

@@ -10,6 +10,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
@@ -46,7 +47,6 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.config.TopicConfig;
@@ -143,12 +143,12 @@ public class TopicStateManagerImpl implements TopicStateManager {
} }
// 获取分区beginOffset // 获取分区beginOffset
Result<Map<TopicPartition, Long>> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.earliest(), null); Result<Map<TopicPartition, Long>> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), KSOffsetSpec.earliest());
if (beginOffsetsMapResult.failed()) { if (beginOffsetsMapResult.failed()) {
return Result.buildFromIgnoreData(beginOffsetsMapResult); return Result.buildFromIgnoreData(beginOffsetsMapResult);
} }
// 获取分区endOffset // 获取分区endOffset
Result<Map<TopicPartition, Long>> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.latest(), null); Result<Map<TopicPartition, Long>> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), KSOffsetSpec.latest());
if (endOffsetsMapResult.failed()) { if (endOffsetsMapResult.failed()) {
return Result.buildFromIgnoreData(endOffsetsMapResult); return Result.buildFromIgnoreData(endOffsetsMapResult);
} }

View File

@@ -47,7 +47,7 @@ public class ClusterMetricCollector extends AbstractMetricCollector<ClusterMetri
for(VersionControlItem v : items) { for(VersionControlItem v : items) {
future.runnableTask( future.runnableTask(
String.format("class=ClusterMetricCollector||clusterPhyId=%d", clusterPhyId), String.format("class=ClusterMetricCollector||clusterPhyId=%d||metricName=%s", clusterPhyId, v.getName()),
30000, 30000,
() -> { () -> {
try { try {

View File

@@ -43,7 +43,7 @@ public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationM
public List<ReplicationMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) { public List<ReplicationMetrics> collectKafkaMetrics(ClusterPhy clusterPhy) {
Long clusterPhyId = clusterPhy.getId(); Long clusterPhyId = clusterPhy.getId();
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
List<Partition> partitions = partitionService.listPartitionByCluster(clusterPhyId); List<Partition> partitions = partitionService.listPartitionFromCacheFirst(clusterPhyId);
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId); FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(clusterPhyId);

View File

@@ -0,0 +1,50 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.offset;
import org.apache.kafka.clients.admin.OffsetSpec;
/**
* @see OffsetSpec
*/
public class KSOffsetSpec {
public static class KSEarliestSpec extends KSOffsetSpec { }
public static class KSLatestSpec extends KSOffsetSpec { }
public static class KSTimestampSpec extends KSOffsetSpec {
private final long timestamp;
public KSTimestampSpec(long timestamp) {
this.timestamp = timestamp;
}
public long timestamp() {
return timestamp;
}
}
/**
* Used to retrieve the latest offset of a partition
*/
public static KSOffsetSpec latest() {
return new KSOffsetSpec.KSLatestSpec();
}
/**
* Used to retrieve the earliest offset of a partition
*/
public static KSOffsetSpec earliest() {
return new KSOffsetSpec.KSEarliestSpec();
}
/**
* Used to retrieve the earliest offset whose timestamp is greater than
* or equal to the given timestamp in the corresponding partition
* @param timestamp in milliseconds
*/
public static KSOffsetSpec forTimestamp(long timestamp) {
return new KSOffsetSpec.KSTimestampSpec(timestamp);
}
private KSOffsetSpec() {
}
}

View File

@@ -1,23 +1,39 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition; package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import lombok.Data; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import java.util.Map; import java.util.*;
import java.util.stream.Collectors;
@Data @Getter
@NoArgsConstructor @NoArgsConstructor
public class PartitionOffsetParam extends TopicParam { public class PartitionOffsetParam extends ClusterPhyParam {
private Map<TopicPartition, OffsetSpec> topicPartitionOffsets; private List<Triple<String, KSOffsetSpec, List<TopicPartition>>> offsetSpecList;
private Long timestamp; public PartitionOffsetParam(Long clusterPhyId, String topicName, KSOffsetSpec ksOffsetSpec, List<TopicPartition> partitionList) {
super(clusterPhyId);
this.offsetSpecList = Collections.singletonList(new Triple<>(topicName, ksOffsetSpec, partitionList));
}
public PartitionOffsetParam(Long clusterPhyId, String topicName, Map<TopicPartition, OffsetSpec> topicPartitionOffsets, Long timestamp) { public PartitionOffsetParam(Long clusterPhyId, String topicName, List<KSOffsetSpec> specList, List<TopicPartition> partitionList) {
super(clusterPhyId, topicName); super(clusterPhyId);
this.topicPartitionOffsets = topicPartitionOffsets; this.offsetSpecList = new ArrayList<>();
this.timestamp = timestamp; specList.forEach(elem -> offsetSpecList.add(new Triple<>(topicName, elem, partitionList)));
}
public PartitionOffsetParam(Long clusterPhyId, KSOffsetSpec offsetSpec, List<TopicPartition> partitionList) {
super(clusterPhyId);
Map<String, List<TopicPartition>> tpMap = new HashMap<>();
partitionList.forEach(elem -> {
tpMap.putIfAbsent(elem.topic(), new ArrayList<>());
tpMap.get(elem.topic()).add(elem);
});
this.offsetSpecList = tpMap.entrySet().stream().map(elem -> new Triple<>(elem.getKey(), offsetSpec, elem.getValue())).collect(Collectors.toList());
} }
} }

View File

@@ -5,6 +5,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import lombok.Data; import lombok.Data;
import java.util.Objects;
@Data @Data
@TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "partition") @TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "partition")
public class PartitionPO extends BasePO { public class PartitionPO extends BasePO {
@@ -37,4 +39,31 @@ public class PartitionPO extends BasePO {
* AR * AR
*/ */
private String assignReplicas; private String assignReplicas;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PartitionPO po = (PartitionPO) o;
return Objects.equals(clusterPhyId, po.clusterPhyId)
&& Objects.equals(topicName, po.topicName)
&& Objects.equals(partitionId, po.partitionId)
&& Objects.equals(leaderBrokerId, po.leaderBrokerId)
&& Objects.equals(inSyncReplicas, po.inSyncReplicas)
&& Objects.equals(assignReplicas, po.assignReplicas);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), clusterPhyId, topicName, partitionId, leaderBrokerId, inSyncReplicas, assignReplicas);
}
} }

View File

@@ -33,7 +33,7 @@ public class KafkaConstant {
public static final Integer DATA_VERSION_ONE = 1; public static final Integer DATA_VERSION_ONE = 1;
public static final Integer ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS = 5000; public static final Integer ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS = 10000;
public static final Integer KAFKA_SASL_SCRAM_ITERATIONS = 8192; public static final Integer KAFKA_SASL_SCRAM_ITERATIONS = 8192;

View File

@@ -0,0 +1,43 @@
package com.xiaojukeji.know.streaming.km.core.cache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class DataBaseDataLocalCache {
private static final Cache<Long, ClusterMetrics> clusterLatestMetricsCache = Caffeine.newBuilder()
.expireAfterWrite(180, TimeUnit.SECONDS)
.maximumSize(500)
.build();
private static final Cache<Long, Map<String, List<Partition>>> partitionsCache = Caffeine.newBuilder()
.expireAfterWrite(60, TimeUnit.SECONDS)
.maximumSize(500)
.build();
public static ClusterMetrics getClusterLatestMetrics(Long clusterPhyId) {
return clusterLatestMetricsCache.getIfPresent(clusterPhyId);
}
public static void putClusterLatestMetrics(Long clusterPhyId, ClusterMetrics metrics) {
clusterLatestMetricsCache.put(clusterPhyId, metrics);
}
public static Map<String, List<Partition>> getPartitions(Long clusterPhyId) {
return partitionsCache.getIfPresent(clusterPhyId);
}
public static void putPartitions(Long clusterPhyId, Map<String, List<Partition>> partitionMap) {
partitionsCache.put(clusterPhyId, partitionMap);
}
/**************************************************** private method ****************************************************/
private DataBaseDataLocalCache() {
}
}

View File

@@ -0,0 +1,84 @@
package com.xiaojukeji.know.streaming.km.core.flusher;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class DatabaseDataFlusher {
private static final ILog LOGGER = LogFactory.getLog(DatabaseDataFlusher.class);
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private ClusterMetricService clusterMetricService;
@Autowired
private PartitionService partitionService;
@PostConstruct
public void init() {
this.flushPartitionsCache();
this.flushClusterLatestMetricsCache();
}
@Scheduled(cron="0 0/1 * * * ?")
public void flushPartitionsCache() {
for (ClusterPhy clusterPhy: clusterPhyService.listAllClusters()) {
FutureUtil.quickStartupFutureUtil.submitTask(() -> {
try {
// 更新缓存
Map<String, List<Partition>> newPartitionMap = new ConcurrentHashMap<>();
List<Partition> partitionList = partitionService.listPartitionByCluster(clusterPhy.getId());
partitionList.forEach(partition -> {
newPartitionMap.putIfAbsent(partition.getTopicName(), new ArrayList<>());
newPartitionMap.get(partition.getTopicName()).add(partition);
});
DataBaseDataLocalCache.putPartitions(clusterPhy.getId(), newPartitionMap);
} catch (Exception e) {
LOGGER.error("method=flushPartitionsCache||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
}
});
}
}
@Scheduled(cron = "0 0/1 * * * ?")
private void flushClusterLatestMetricsCache() {
for (ClusterPhy clusterPhy: clusterPhyService.listAllClusters()) {
FutureUtil.quickStartupFutureUtil.submitTask(() -> {
try {
Result<ClusterMetrics> metricsResult = clusterMetricService.getLatestMetricsFromES(clusterPhy.getId(), Collections.emptyList());
if (metricsResult.hasData()) {
DataBaseDataLocalCache.putClusterLatestMetrics(clusterPhy.getId(), metricsResult.getData());
return;
}
LOGGER.error("method=flushClusterLatestMetricsCache||clusterPhyId={}||result={}||msg=failed", clusterPhy.getId(), metricsResult);
} catch (Exception e) {
LOGGER.error("method=flushClusterLatestMetricsCache||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e);
}
DataBaseDataLocalCache.putClusterLatestMetrics(clusterPhy.getId(), new ClusterMetrics(clusterPhy.getId()));
});
}
}
}

View File

@@ -18,6 +18,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.BrokerMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
@@ -51,7 +52,6 @@ import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
/** /**
* @author didi * @author didi
@@ -365,7 +365,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
Long clusterId = param.getClusterId(); Long clusterId = param.getClusterId();
Integer brokerId = param.getBrokerId(); Integer brokerId = param.getBrokerId();
List<Partition> partitions = partitionService.listPartitionByBroker(clusterId, brokerId); List<Partition> partitions = partitionService.listPartitionFromCacheFirst(clusterId, brokerId);
Float logSizeSum = 0f; Float logSizeSum = 0f;
for(Partition p : partitions) { for(Partition p : partitions) {
@@ -387,7 +387,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
logSizeSum += (replicaLogSize == null? 0.0f: replicaLogSize); logSizeSum += (replicaLogSize == null? 0.0f: replicaLogSize);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error( LOGGER.error(
"class=BrokerMetricServiceImpl||method=getLogSize||clusterPhyId={}||brokerId={}||topicName={}||partitionId={}||metricName={}||errMsg=exception", "method=getLogSize||clusterPhyId={}||brokerId={}||topicName={}||partitionId={}||metricName={}||errMsg=exception",
clusterId, brokerId, p.getTopicName(), p.getPartitionId(), metric, e.getClass().getName() clusterId, brokerId, p.getTopicName(), p.getPartitionId(), metric, e.getClass().getName()
); );
} }
@@ -432,7 +432,9 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
Float brokerLeaderCount = metricsResult.getData().getMetric( BrokerMetricVersionItems.BROKER_METRIC_LEADERS); Float brokerLeaderCount = metricsResult.getData().getMetric( BrokerMetricVersionItems.BROKER_METRIC_LEADERS);
Integer globalLeaderCount = partitionService.getLeaderPartitionSizeByClusterId(clusterId); Integer globalLeaderCount = (int) partitionService.listPartitionFromCacheFirst(clusterId)
.stream()
.filter(partition -> !partition.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)).count();
Integer globalBrokerCount = brokerService.listAllBrokersFromDB(clusterId).size(); Integer globalBrokerCount = brokerService.listAllBrokersFromDB(clusterId).size();
if (globalLeaderCount <= 0 || globalBrokerCount <= 0) { if (globalLeaderCount <= 0 || globalBrokerCount <= 0) {

View File

@@ -2,8 +2,6 @@ package com.xiaojukeji.know.streaming.km.core.service.cluster.impl;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Table; import com.google.common.collect.Table;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricsClusterPhyDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricsClusterPhyDTO;
@@ -13,6 +11,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.Kafka
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ClusterMetricParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ClusterMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
@@ -24,6 +23,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ClusterMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
@@ -33,11 +33,11 @@ import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.*; import com.xiaojukeji.know.streaming.km.common.utils.*;
import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.job.JobService; import com.xiaojukeji.know.streaming.km.core.service.job.JobService;
@@ -50,32 +50,28 @@ import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ClusterMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dao.ClusterMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.resource.ResourceType;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.management.InstanceNotFoundException; import javax.management.InstanceNotFoundException;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics.initWithMetrics; import static com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics.initWithMetrics;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.*;
/** /**
* @author didi * @author didi
*/ */
@Service("clusterMetricService") @Service
public class ClusterMetricServiceImpl extends BaseMetricService implements ClusterMetricService { public class ClusterMetricServiceImpl extends BaseMetricService implements ClusterMetricService {
private static final ILog LOGGER = LogFactory.getLog(ClusterMetricServiceImpl.class); private static final ILog LOGGER = LogFactory.getLog(ClusterMetricServiceImpl.class);
@@ -153,22 +149,6 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
@Autowired @Autowired
private JobService jobService; private JobService jobService;
@Autowired
private ClusterPhyService clusterPhyService;
private final Cache<Long, ClusterMetrics> clusterLatestMetricsCache = Caffeine.newBuilder()
.expireAfterWrite(180, TimeUnit.SECONDS)
.maximumSize(1000)
.build();
@PostConstruct
@Scheduled(cron = "0 0/1 * * * ?")
private void flushClusterLatestMetricsCache() {
for (ClusterPhy clusterPhy: clusterPhyService.listAllClusters()) {
FutureUtil.quickStartupFutureUtil.submitTask(() -> this.updateCacheAndGetMetrics(clusterPhy.getId()));
}
}
@Override @Override
protected VersionItemTypeEnum getVersionItemType() { protected VersionItemTypeEnum getVersionItemType() {
return VersionItemTypeEnum.METRIC_CLUSTER; return VersionItemTypeEnum.METRIC_CLUSTER;
@@ -283,7 +263,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
@Override @Override
public ClusterMetrics getLatestMetricsFromCache(Long clusterPhyId) { public ClusterMetrics getLatestMetricsFromCache(Long clusterPhyId) {
ClusterMetrics metrics = clusterLatestMetricsCache.getIfPresent(clusterPhyId); ClusterMetrics metrics = DataBaseDataLocalCache.getClusterLatestMetrics(clusterPhyId);
if (metrics != null) { if (metrics != null) {
return metrics; return metrics;
} }
@@ -335,24 +315,6 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
/**************************************************** private method ****************************************************/ /**************************************************** private method ****************************************************/
private ClusterMetrics updateCacheAndGetMetrics(Long clusterPhyId) {
try {
Result<ClusterMetrics> metricsResult = this.getLatestMetricsFromES(clusterPhyId, Arrays.asList());
if (metricsResult.hasData()) {
LOGGER.info("method=updateCacheAndGetMetrics||clusterPhyId={}||msg=success", clusterPhyId);
clusterLatestMetricsCache.put(clusterPhyId, metricsResult.getData());
return metricsResult.getData();
}
} catch (Exception e) {
LOGGER.error("method=updateCacheAndGetMetrics||clusterPhyId={}||errMsg=exception!", clusterPhyId, e);
}
ClusterMetrics clusterMetrics = new ClusterMetrics(clusterPhyId);
clusterLatestMetricsCache.put(clusterPhyId, clusterMetrics);
return clusterMetrics;
}
/** /**
* doNothing * doNothing
*/ */
@@ -382,9 +344,28 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
/** /**
* 获取集群的 messageSize * 获取集群的 messageSize
*/ */
private Result<ClusterMetrics> getMessageSize(VersionItemParam metricParam){ private Result<ClusterMetrics> getMessageSize(VersionItemParam metricParam) {
ClusterMetricParam param = (ClusterMetricParam)metricParam; ClusterMetricParam param = (ClusterMetricParam)metricParam;
return getMetricFromKafkaByTotalTopics(param.getClusterId(), param.getMetric(), TOPIC_METRIC_MESSAGES);
Result<Map<TopicPartition, Long>> beginOffsetMapResult = partitionService.getAllPartitionOffsetFromKafka(param.getClusterId(), KSOffsetSpec.earliest());
Result<Map<TopicPartition, Long>> endOffsetMapResult = partitionService.getAllPartitionOffsetFromKafka(param.getClusterId(), KSOffsetSpec.latest());
if (endOffsetMapResult.failed() || beginOffsetMapResult.failed()) {
// 有一个失败,直接返回失败
return Result.buildFromIgnoreData(endOffsetMapResult);
}
long msgCount = 0;
for (Map.Entry<TopicPartition, Long> entry: endOffsetMapResult.getData().entrySet()) {
Long beginOffset = beginOffsetMapResult.getData().get(entry.getKey());
if (beginOffset == null) {
continue;
}
msgCount += Math.max(0, entry.getValue() - beginOffset);
}
return Result.buildSuc(initWithMetrics(param.getClusterId(), param.getMetric(), (float)msgCount));
} }
/** /**
@@ -406,9 +387,9 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
private Result<ClusterMetrics> getPartitionSize(VersionItemParam metricParam){ private Result<ClusterMetrics> getPartitionSize(VersionItemParam metricParam){
ClusterMetricParam param = (ClusterMetricParam)metricParam; ClusterMetricParam param = (ClusterMetricParam)metricParam;
String metric = param.getMetric(); String metric = param.getMetric();
Long clusterId = param.getClusterId(); Long clusterId = param.getClusterId();
Integer partitionNu = partitionService.getPartitionSizeByClusterId(clusterId); Integer partitionNu = partitionService.listPartitionFromCacheFirst(clusterId).size();
return Result.buildSuc(initWithMetrics(clusterId, metric, partitionNu.floatValue())); return Result.buildSuc(initWithMetrics(clusterId, metric, partitionNu.floatValue()));
} }
@@ -421,7 +402,10 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
String metric = param.getMetric(); String metric = param.getMetric();
Long clusterId = param.getClusterId(); Long clusterId = param.getClusterId();
Integer noLeaders = partitionService.getNoLeaderPartitionSizeByClusterId(clusterId); Integer noLeaders = (int) partitionService.listPartitionFromCacheFirst(clusterId)
.stream()
.filter(partition -> partition.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER))
.count();
return Result.buildSuc(initWithMetrics(clusterId, metric, noLeaders.floatValue())); return Result.buildSuc(initWithMetrics(clusterId, metric, noLeaders.floatValue()));
} }
@@ -747,7 +731,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
/** /**
* 从所有的 Topic 的指标中加总聚合得到集群的指标 * 从所有的 Topic 的指标中加总聚合得到集群的指标
*/ */
private Result<ClusterMetrics> getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric){ private Result<ClusterMetrics> getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric) {
List<Topic> topics = topicService.listTopicsFromCacheFirst(clusterId); List<Topic> topics = topicService.listTopicsFromCacheFirst(clusterId);
float sumMetricValue = 0f; float sumMetricValue = 0f;

View File

@@ -6,6 +6,7 @@ import com.google.common.collect.Table;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricGroupPartitionDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricGroupPartitionDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic; import com.xiaojukeji.know.streaming.km.common.bean.entity.group.GroupTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.GroupMetricParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.GroupMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
@@ -24,13 +25,11 @@ import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateSer
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*; import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*;
@@ -192,31 +191,29 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe
metricsList.add(metrics); metricsList.add(metrics);
} }
for (String topicName: groupOffsetMap.keySet().stream().map(elem -> elem.topic()).collect(Collectors.toSet())) { Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterId, new ArrayList<>(groupOffsetMap.keySet()), KSOffsetSpec.latest());
Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterId, topicName, OffsetSpec.latest(), null); if (!offsetMapResult.hasData()) {
if (!offsetMapResult.hasData()) { // 获取失败
// 这个分区获取失败 return Result.buildSuc(metricsList);
}
for (Map.Entry<TopicPartition, Long> entry: offsetMapResult.getData().entrySet()) {
// 组织 GROUP_METRIC_LOG_END_OFFSET 指标
GroupMetrics metrics = new GroupMetrics(clusterId, entry.getKey().partition(), entry.getKey().topic(), groupName, false);
metrics.putMetric(GROUP_METRIC_LOG_END_OFFSET, entry.getValue().floatValue());
metricsList.add(metrics);
Long groupOffset = groupOffsetMap.get(entry.getKey());
if (groupOffset == null) {
// 不存在,则直接跳过
continue; continue;
} }
for (Map.Entry<TopicPartition, Long> entry: offsetMapResult.getData().entrySet()) { // 组织 GROUP_METRIC_LAG 指标
// 组织 GROUP_METRIC_LOG_END_OFFSET 指标 GroupMetrics groupMetrics = new GroupMetrics(clusterId, entry.getKey().partition(), entry.getKey().topic(), groupName, false);
GroupMetrics metrics = new GroupMetrics(clusterId, entry.getKey().partition(), entry.getKey().topic(), groupName, false); groupMetrics.putMetric(GROUP_METRIC_LAG, Math.max(0L, entry.getValue() - groupOffset) * 1.0f);
metrics.putMetric(GROUP_METRIC_LOG_END_OFFSET, entry.getValue().floatValue());
metricsList.add(metrics);
Long groupOffset = groupOffsetMap.get(entry.getKey()); metricsList.add(groupMetrics);
if (groupOffset == null) {
// 不存在,则直接跳过
continue;
}
// 组织 GROUP_METRIC_LAG 指标
GroupMetrics groupMetrics = new GroupMetrics(clusterId, entry.getKey().partition(), entry.getKey().topic(), groupName, false);
groupMetrics.putMetric(GROUP_METRIC_LAG, Math.max(0L, entry.getValue() - groupOffset) * 1.0f);
metricsList.add(groupMetrics);
}
} }
return Result.buildSuc(metricsList); return Result.buildSuc(metricsList);

View File

@@ -26,6 +26,5 @@ public interface PartitionMetricService {
* 从ES获取指标 * 从ES获取指标
*/ */
PartitionMetrics getLatestMetricsFromES(Long clusterPhyId, String topic, Integer brokerId, Integer partitionId, List<String> metricNameList); PartitionMetrics getLatestMetricsFromES(Long clusterPhyId, String topic, Integer brokerId, Integer partitionId, List<String> metricNameList);
Result<List<PartitionMetrics>> getLatestMetricsFromES(Long clusterPhyId, String topicName, List<String> metricNameList); Result<List<PartitionMetrics>> getLatestMetricsFromES(Long clusterPhyId, String topicName, List<String> metricNameList);
} }

View File

@@ -1,10 +1,11 @@
package com.xiaojukeji.know.streaming.km.core.service.partition; package com.xiaojukeji.know.streaming.km.core.service.partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.po.partition.PartitionPO; import com.xiaojukeji.know.streaming.km.common.bean.po.partition.PartitionPO;
import org.apache.kafka.clients.admin.OffsetSpec; import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import java.util.List; import java.util.List;
@@ -12,49 +13,40 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
public interface PartitionService { public interface PartitionService {
/**
* 从Kafka获取分区信息
*/
Result<Map<String, List<Partition>>> listPartitionsFromKafka(ClusterPhy clusterPhy); Result<Map<String, List<Partition>>> listPartitionsFromKafka(ClusterPhy clusterPhy);
Result<List<Partition>> listPartitionsFromKafka(ClusterPhy clusterPhy, String topicName); Result<List<Partition>> listPartitionsFromKafka(ClusterPhy clusterPhy, String topicName);
/**
* 从DB获取分区信息
*/
List<Partition> listPartitionByCluster(Long clusterPhyId); List<Partition> listPartitionByCluster(Long clusterPhyId);
List<PartitionPO> listPartitionPOByCluster(Long clusterPhyId); List<PartitionPO> listPartitionPOByCluster(Long clusterPhyId);
/**
* Topic下的分区列表
*/
List<Partition> listPartitionByTopic(Long clusterPhyId, String topicName); List<Partition> listPartitionByTopic(Long clusterPhyId, String topicName);
/**
* Broker下的分区列表
*/
List<Partition> listPartitionByBroker(Long clusterPhyId, Integer brokerId);
/**
* 获取具体分区信息
*/
Partition getPartitionByTopicAndPartitionId(Long clusterPhyId, String topicName, Integer partitionId); Partition getPartitionByTopicAndPartitionId(Long clusterPhyId, String topicName, Integer partitionId);
/**
/**************************************************** 优先从缓存获取分区信息 ****************************************************/ * 优先从缓存获取分区信息缓存中没有时从DB获取分区信息
*/
List<Partition> listPartitionFromCacheFirst(Long clusterPhyId);
List<Partition> listPartitionFromCacheFirst(Long clusterPhyId, Integer brokerId);
List<Partition> listPartitionFromCacheFirst(Long clusterPhyId, String topicName); List<Partition> listPartitionFromCacheFirst(Long clusterPhyId, String topicName);
Partition getPartitionFromCacheFirst(Long clusterPhyId, String topicName, Integer partitionId); Partition getPartitionFromCacheFirst(Long clusterPhyId, String topicName, Integer partitionId);
/** /**
* 获取集群下分区数 * 获取分区Offset信息
*/ */
Integer getPartitionSizeByClusterId(Long clusterPhyId); Result<Map<TopicPartition, Long>> getAllPartitionOffsetFromKafka(Long clusterPhyId, KSOffsetSpec offsetSpec);
Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, KSOffsetSpec offsetSpec);
Integer getLeaderPartitionSizeByClusterId(Long clusterPhyId); Result<Tuple<Map<TopicPartition, Long>/*begin offset*/, Map<TopicPartition, Long>/*end offset*/>> getPartitionBeginAndEndOffsetFromKafka(Long clusterPhyId, String topicName);
Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, Integer partitionId, KSOffsetSpec offsetSpec);
Integer getNoLeaderPartitionSizeByClusterId(Long clusterPhyId); Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, List<TopicPartition> tpList, KSOffsetSpec offsetSpec);
Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, OffsetSpec offsetSpec, Long timestamp);
Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, Integer partitionId, OffsetSpec offsetSpec, Long timestamp);
/**
* 修改分区信息
*/
int updatePartitions(Long clusterPhyId, String topicName, List<Partition> kafkaPartitionList, List<PartitionPO> dbPartitionList); int updatePartitions(Long clusterPhyId, String topicName, List<Partition> kafkaPartitionList, List<PartitionPO> dbPartitionList);
void deletePartitionsIfNotIn(Long clusterPhyId, Set<String> topicNameSet); void deletePartitionsIfNotIn(Long clusterPhyId, Set<String> topicNameSet);
} }

View File

@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.partition.impl;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.TopicMetricParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.TopicMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
@@ -15,6 +16,7 @@ import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistExcept
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil; import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache; import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
@@ -22,7 +24,6 @@ import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.PartitionMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.es.dao.PartitionMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -176,50 +177,45 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
Map<Integer, PartitionMetrics> metricsMap = new HashMap<>(); Map<Integer, PartitionMetrics> metricsMap = new HashMap<>();
// begin offset 指标 // offset 指标
Result<Map<TopicPartition, Long>> beginOffsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, OffsetSpec.earliest(), null); Result<Tuple<Map<TopicPartition, Long>, Map<TopicPartition, Long>>> offsetResult = partitionService.getPartitionBeginAndEndOffsetFromKafka(clusterPhyId, topicName);
if (beginOffsetMapResult.hasData()) { if (offsetResult.failed()) {
for (Map.Entry<TopicPartition, Long> entry: beginOffsetMapResult.getData().entrySet()) {
Partition partition = partitionMap.get(entry.getKey().partition());
PartitionMetrics metrics = metricsMap.getOrDefault(
entry.getKey().partition(),
new PartitionMetrics(clusterPhyId, topicName, partition != null? partition.getLeaderBrokerId(): KafkaConstant.NO_LEADER, entry.getKey().partition())
);
metrics.putMetric(PARTITION_METRIC_LOG_START_OFFSET, entry.getValue().floatValue());
metricsMap.put(entry.getKey().partition(), metrics);
}
} else {
LOGGER.warn( LOGGER.warn(
"method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||resultMsg={}||msg=get begin offset failed", "method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||result={}||msg=get offset failed",
clusterPhyId, topicName, beginOffsetMapResult.getMessage() clusterPhyId, topicName, offsetResult
); );
return Result.buildFromIgnoreData(offsetResult);
}
// begin offset 指标
for (Map.Entry<TopicPartition, Long> entry: offsetResult.getData().v1().entrySet()) {
Partition partition = partitionMap.get(entry.getKey().partition());
PartitionMetrics metrics = metricsMap.getOrDefault(
entry.getKey().partition(),
new PartitionMetrics(clusterPhyId, topicName, partition != null? partition.getLeaderBrokerId(): KafkaConstant.NO_LEADER, entry.getKey().partition())
);
metrics.putMetric(PARTITION_METRIC_LOG_START_OFFSET, entry.getValue().floatValue());
metricsMap.put(entry.getKey().partition(), metrics);
} }
// end offset 指标 // end offset 指标
Result<Map<TopicPartition, Long>> endOffsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, OffsetSpec.latest(), null); for (Map.Entry<TopicPartition, Long> entry: offsetResult.getData().v2().entrySet()) {
if (endOffsetMapResult.hasData()) { Partition partition = partitionMap.get(entry.getKey().partition());
for (Map.Entry<TopicPartition, Long> entry: endOffsetMapResult.getData().entrySet()) { PartitionMetrics metrics = metricsMap.getOrDefault(
Partition partition = partitionMap.get(entry.getKey().partition()); entry.getKey().partition(),
PartitionMetrics metrics = metricsMap.getOrDefault( new PartitionMetrics(clusterPhyId, topicName, partition != null? partition.getLeaderBrokerId(): KafkaConstant.NO_LEADER, entry.getKey().partition())
entry.getKey().partition(),
new PartitionMetrics(clusterPhyId, topicName, partition != null? partition.getLeaderBrokerId(): KafkaConstant.NO_LEADER, entry.getKey().partition())
);
metrics.putMetric(PARTITION_METRIC_LOG_END_OFFSET, entry.getValue().floatValue());
metricsMap.put(entry.getKey().partition(), metrics);
}
} else {
LOGGER.warn(
"method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||resultMsg={}||msg=get end offset failed",
clusterPhyId, topicName, endOffsetMapResult.getMessage()
); );
metrics.putMetric(PARTITION_METRIC_LOG_END_OFFSET, entry.getValue().floatValue());
metricsMap.put(entry.getKey().partition(), metrics);
} }
// messages 指标 // messages 指标
if (endOffsetMapResult.hasData() && beginOffsetMapResult.hasData()) { if (!ValidateUtils.isEmptyMap(offsetResult.getData().v1()) && !ValidateUtils.isEmptyMap(offsetResult.getData().v2())) {
for (Map.Entry<TopicPartition, Long> entry: endOffsetMapResult.getData().entrySet()) { for (Map.Entry<TopicPartition, Long> entry: offsetResult.getData().v2().entrySet()) {
Long beginOffset = beginOffsetMapResult.getData().get(entry.getKey()); Long beginOffset = offsetResult.getData().v1().get(entry.getKey());
if (beginOffset == null) { if (beginOffset == null) {
continue; continue;
} }
@@ -235,8 +231,8 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
} }
} else { } else {
LOGGER.warn( LOGGER.warn(
"method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||endResultMsg={}||beginResultMsg={}||msg=get messages failed", "method=getOffsetRelevantMetrics||clusterPhyId={}||topicName={}||offsetData={}||msg=get messages failed",
clusterPhyId, topicName, endOffsetMapResult.getMessage(), beginOffsetMapResult.getMessage() clusterPhyId, topicName, ConvertUtil.obj2Json(offsetResult.getData())
); );
} }
@@ -283,7 +279,6 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
} catch (InstanceNotFoundException e) { } catch (InstanceNotFoundException e) {
// ignore // ignore
continue;
} catch (Exception e) { } catch (Exception e) {
LOGGER.error( LOGGER.error(
"method=getMetricFromJmx||clusterPhyId={}||topicName={}||partitionId={}||leaderBrokerId={}||metricName={}||msg={}", "method=getMetricFromJmx||clusterPhyId={}||topicName={}||partitionId={}||leaderBrokerId={}||metricName={}||msg={}",
@@ -326,7 +321,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
// 4、获取jmx指标 // 4、获取jmx指标
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName() + ",topic=" + topicName), jmxInfo.getJmxAttribute()).toString(); String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName() + ",topic=" + topicName), jmxInfo.getJmxAttribute()).toString();
Long leaderCount = partitionList.stream().filter(elem -> elem.getLeaderBrokerId().equals(partition.getLeaderBrokerId())).count(); long leaderCount = partitionList.stream().filter(elem -> elem.getLeaderBrokerId().equals(partition.getLeaderBrokerId())).count();
if (leaderCount <= 0) { if (leaderCount <= 0) {
// leader已经切换走了 // leader已经切换走了
continue; continue;
@@ -338,7 +333,6 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
} catch (InstanceNotFoundException e) { } catch (InstanceNotFoundException e) {
// ignore // ignore
continue;
} catch (Exception e) { } catch (Exception e) {
LOGGER.error( LOGGER.error(
"method=getTopicAvgMetricFromJmx||clusterPhyId={}||topicName={}||partitionId={}||leaderBrokerId={}||metricName={}||msg={}", "method=getTopicAvgMetricFromJmx||clusterPhyId={}||topicName={}||partitionId={}||leaderBrokerId={}||metricName={}||msg={}",

View File

@@ -3,9 +3,8 @@ package com.xiaojukeji.know.streaming.km.core.service.partition.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.log.LogFactory;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.PartitionOffsetParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.PartitionOffsetParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
@@ -20,7 +19,10 @@ import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache;
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap; import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap;
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionState; import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionState;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
@@ -44,7 +46,6 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -55,7 +56,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemT
/** /**
* @author didi * @author didi
*/ */
@Service("partitionService") @Service
public class PartitionServiceImpl extends BaseVersionControlService implements PartitionService { public class PartitionServiceImpl extends BaseVersionControlService implements PartitionService {
private static final ILog log = LogFactory.getLog(PartitionServiceImpl.class); private static final ILog log = LogFactory.getLog(PartitionServiceImpl.class);
@@ -78,15 +79,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
return SERVICE_OP_PARTITION; return SERVICE_OP_PARTITION;
} }
private final Cache<String, List<Partition>> partitionsCache = Caffeine.newBuilder()
.expireAfterWrite(90, TimeUnit.SECONDS)
.maximumSize(1000)
.build();
@PostConstruct @PostConstruct
private void init() { private void init() {
registerVCHandler(PARTITION_OFFSET_GET, V_0_10_0_0, V_0_11_0_0, "getPartitionOffsetFromKafkaConsumerClient", this::getPartitionOffsetFromKafkaConsumerClient); registerVCHandler(PARTITION_OFFSET_GET, V_0_10_0_0, V_0_11_0_0, "batchGetPartitionOffsetFromKafkaConsumerClient", this::batchGetPartitionOffsetFromKafkaConsumerClient);
registerVCHandler(PARTITION_OFFSET_GET, V_0_11_0_0, V_MAX, "getPartitionOffsetFromKafkaAdminClient", this::getPartitionOffsetFromKafkaAdminClient); registerVCHandler(PARTITION_OFFSET_GET, V_0_11_0_0, V_MAX, "batchGetPartitionOffsetFromKafkaAdminClient", this::batchGetPartitionOffsetFromKafkaAdminClient);
} }
@Override @Override
@@ -133,17 +129,32 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
} }
@Override @Override
public List<Partition> listPartitionFromCacheFirst(Long clusterPhyId, String topicName) { public List<Partition> listPartitionFromCacheFirst(Long clusterPhyId) {
String clusterPhyIdAndTopicKey = MsgConstant.getClusterTopicKey(clusterPhyId, topicName); Map<String, List<Partition>> partitionMap = DataBaseDataLocalCache.getPartitions(clusterPhyId);
List<Partition> partitionList = partitionsCache.getIfPresent(clusterPhyIdAndTopicKey);
if (!ValidateUtils.isNull(partitionList)) { if (partitionMap != null) {
return partitionList; return partitionMap.values().stream().collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);
} }
partitionList = this.listPartitionByTopic(clusterPhyId, topicName); return this.listPartitionByCluster(clusterPhyId);
partitionsCache.put(clusterPhyIdAndTopicKey, partitionList); }
return partitionList;
@Override
public List<Partition> listPartitionFromCacheFirst(Long clusterPhyId, Integer brokerId) {
List<Partition> partitionList = this.listPartitionFromCacheFirst(clusterPhyId);
return partitionList.stream().filter(elem -> elem.getAssignReplicaList().contains(brokerId)).collect(Collectors.toList());
}
@Override
public List<Partition> listPartitionFromCacheFirst(Long clusterPhyId, String topicName) {
Map<String, List<Partition>> partitionMap = DataBaseDataLocalCache.getPartitions(clusterPhyId);
if (partitionMap != null) {
return partitionMap.getOrDefault(topicName, new ArrayList<>());
}
return this.listPartitionByTopic(clusterPhyId, topicName);
} }
@Override @Override
@@ -162,16 +173,6 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
return null; return null;
} }
@Override
public List<Partition> listPartitionByBroker(Long clusterPhyId, Integer brokerId) {
LambdaQueryWrapper<PartitionPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(PartitionPO::getClusterPhyId, clusterPhyId);
List<Partition> partitionList = this.convert2PartitionList(partitionDAO.selectList(lambdaQueryWrapper));
return partitionList.stream().filter(elem -> elem.getAssignReplicaList().contains(brokerId)).collect(Collectors.toList());
}
@Override @Override
public Partition getPartitionByTopicAndPartitionId(Long clusterPhyId, String topicName, Integer partitionId) { public Partition getPartitionByTopicAndPartitionId(Long clusterPhyId, String topicName, Integer partitionId) {
LambdaQueryWrapper<PartitionPO> lambdaQueryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<PartitionPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
@@ -183,71 +184,122 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
} }
@Override @Override
public Integer getPartitionSizeByClusterId(Long clusterPhyId) { public Result<Map<TopicPartition, Long>> getAllPartitionOffsetFromKafka(Long clusterPhyId, KSOffsetSpec offsetSpec) {
LambdaQueryWrapper<PartitionPO> lambdaQueryWrapper = new LambdaQueryWrapper<>(); List<TopicPartition> tpList = this.listPartitionFromCacheFirst(clusterPhyId).stream()
lambdaQueryWrapper.eq(PartitionPO::getClusterPhyId, clusterPhyId);
return partitionDAO.selectCount(lambdaQueryWrapper);
}
@Override
public Integer getLeaderPartitionSizeByClusterId(Long clusterPhyId) {
LambdaQueryWrapper<PartitionPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(PartitionPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.ne(PartitionPO::getLeaderBrokerId, -1);
return partitionDAO.selectCount(lambdaQueryWrapper);
}
@Override
public Integer getNoLeaderPartitionSizeByClusterId(Long clusterPhyId) {
LambdaQueryWrapper<PartitionPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(PartitionPO::getClusterPhyId, clusterPhyId);
lambdaQueryWrapper.eq(PartitionPO::getLeaderBrokerId, -1);
return partitionDAO.selectCount(lambdaQueryWrapper);
}
@Override
public Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, OffsetSpec offsetSpec, Long timestamp) {
Map<TopicPartition, OffsetSpec> topicPartitionOffsets = new HashMap<>();
List<Partition> partitionList = this.listPartitionByTopic(clusterPhyId, topicName);
if (partitionList == null || partitionList.isEmpty()) {
// Topic不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(clusterPhyId, topicName));
}
partitionList.stream()
.filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER))
.forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec)); .map(elem -> new TopicPartition(elem.getTopicName(), elem.getPartitionId()))
.collect(Collectors.toList());
if (topicPartitionOffsets.isEmpty()) {
// 所有分区no-leader
return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FAILED, MsgConstant.getPartitionNoLeader(clusterPhyId, topicName));
}
try { try {
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp)); Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>> listResult =
(Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, offsetSpec, tpList));
return this.convert2OffsetMapResult(listResult);
} catch (VCHandlerNotExistException e) { } catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST); return Result.buildFailure(VC_HANDLE_NOT_EXIST);
} }
} }
@Override @Override
public Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, Integer partitionId, OffsetSpec offsetSpec, Long timestamp) { public Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, KSOffsetSpec offsetSpec) {
if (partitionId == null) { List<TopicPartition> tpList = this.listPartitionFromCacheFirst(clusterPhyId, topicName).stream()
return this.getPartitionOffsetFromKafka(clusterPhyId, topicName, offsetSpec, timestamp); .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER))
.map(elem -> new TopicPartition(topicName, elem.getPartitionId()))
.collect(Collectors.toList());
if (tpList.isEmpty()) {
// 所有分区no-leader
return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FAILED, MsgConstant.getPartitionNoLeader(clusterPhyId, topicName));
} }
Map<TopicPartition, OffsetSpec> topicPartitionOffsets = new HashMap<>(); try {
this.listPartitionByTopic(clusterPhyId, topicName) Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>> listResult =
.stream() (Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, offsetSpec, tpList));
.filter(elem -> elem.getPartitionId().equals(partitionId))
.forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec)); return this.convert2OffsetMapResult(listResult);
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}
@Override
public Result<Tuple<Map<TopicPartition, Long>, Map<TopicPartition, Long>>> getPartitionBeginAndEndOffsetFromKafka(Long clusterPhyId, String topicName) {
List<TopicPartition> tpList = this.listPartitionFromCacheFirst(clusterPhyId, topicName).stream()
.filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER))
.map(elem -> new TopicPartition(topicName, elem.getPartitionId()))
.collect(Collectors.toList());
if (tpList.isEmpty()) {
// 所有分区no-leader
return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FAILED, MsgConstant.getPartitionNoLeader(clusterPhyId, topicName));
}
try { try {
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp)); Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>> listResult =
(Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, Arrays.asList(KSOffsetSpec.earliest(), KSOffsetSpec.latest()), tpList));
if (listResult.failed()) {
return Result.buildFromIgnoreData(listResult);
} else if (ValidateUtils.isEmptyList(listResult.getData())) {
return Result.buildSuc(new Tuple<Map<TopicPartition, Long>, Map<TopicPartition, Long>>(new HashMap<>(0), new HashMap<>(0)));
}
Tuple<Map<TopicPartition, Long>, Map<TopicPartition, Long>> tuple = new Tuple<>(new HashMap<>(0), new HashMap<>(0));
listResult.getData().forEach(elem -> {
if (elem.getV1() instanceof KSOffsetSpec.KSEarliestSpec) {
tuple.setV1(elem.v2());
} else if (elem.v1() instanceof KSOffsetSpec.KSLatestSpec) {
tuple.setV2(elem.v2());
}
});
return Result.buildSuc(tuple);
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}
@Override
public Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, Integer partitionId, KSOffsetSpec offsetSpec) {
if (partitionId == null) {
return this.getPartitionOffsetFromKafka(clusterPhyId, topicName, offsetSpec);
}
List<TopicPartition> tpList = this.listPartitionFromCacheFirst(clusterPhyId, topicName).stream()
.filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER))
.map(elem -> new TopicPartition(topicName, elem.getPartitionId()))
.collect(Collectors.toList());
try {
Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>> listResult =
(Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, offsetSpec, tpList));
return this.convert2OffsetMapResult(listResult);
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}
@Override
public Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long clusterPhyId, List<TopicPartition> tpList, KSOffsetSpec offsetSpec) {
// 集群具有leader的分区列表
Set<TopicPartition> existLeaderTPSet = this.listPartitionFromCacheFirst(clusterPhyId).stream()
.filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER))
.map(elem -> new TopicPartition(elem.getTopicName(), elem.getPartitionId()))
.collect(Collectors.toSet());
List<TopicPartition> existLeaderTPList = tpList.stream().filter(elem -> existLeaderTPSet.contains(elem)).collect(Collectors.toList());
if (existLeaderTPList.isEmpty()) {
return Result.buildSuc(new HashMap<>(0));
}
try {
Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>> listResult = (Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>>) doVCHandler(
clusterPhyId,
PARTITION_OFFSET_GET,
new PartitionOffsetParam(clusterPhyId, offsetSpec, existLeaderTPList)
);
return this.convert2OffsetMapResult(listResult);
} catch (VCHandlerNotExistException e) { } catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST); return Result.buildFailure(VC_HANDLE_NOT_EXIST);
} }
@@ -267,6 +319,10 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
} }
PartitionPO presentPartitionPO = this.convert2PartitionPO(partition); PartitionPO presentPartitionPO = this.convert2PartitionPO(partition);
if (presentPartitionPO.equals(dbPartitionPO)) {
// 数据一样不进行DB操作
continue;
}
presentPartitionPO.setId(dbPartitionPO.getId()); presentPartitionPO.setId(dbPartitionPO.getId());
partitionDAO.updateById(presentPartitionPO); partitionDAO.updateById(presentPartitionPO);
} }
@@ -306,64 +362,137 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
/**************************************************** private method ****************************************************/ /**************************************************** private method ****************************************************/
private Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>> batchGetPartitionOffsetFromKafkaAdminClient(VersionItemParam itemParam) {
private Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafkaAdminClient(VersionItemParam itemParam) {
PartitionOffsetParam offsetParam = (PartitionOffsetParam) itemParam; PartitionOffsetParam offsetParam = (PartitionOffsetParam) itemParam;
if (offsetParam.getOffsetSpecList().isEmpty()) {
return Result.buildSuc(Collections.emptyList());
}
List<Triple<String, KSOffsetSpec, ListOffsetsResult>> resultList = new ArrayList<>();
for (Triple<String, KSOffsetSpec, List<TopicPartition>> elem: offsetParam.getOffsetSpecList()) {
Result<ListOffsetsResult> offsetsResult = this.getPartitionOffsetFromKafkaAdminClient(
offsetParam.getClusterPhyId(),
elem.v1(),
elem.v2(),
elem.v3()
);
if (offsetsResult.failed() && offsetParam.getOffsetSpecList().size() == 1) {
return Result.buildFromIgnoreData(offsetsResult);
}
if (offsetsResult.hasData()) {
resultList.add(new Triple<>(elem.v1(), elem.v2(), offsetsResult.getData()));
}
}
List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>> offsetMapList = new ArrayList<>();
for (Triple<String, KSOffsetSpec, ListOffsetsResult> triple: resultList) {
try {
Map<TopicPartition, Long> offsetMap = new HashMap<>();
triple.v3().all().get().entrySet().stream().forEach(elem -> offsetMap.put(elem.getKey(), elem.getValue().offset()));
offsetMapList.add(new Tuple<>(triple.v2(), offsetMap));
} catch (Exception e) {
log.error(
"method=batchGetPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||offsetSpec={}||errMsg=exception!",
offsetParam.getClusterPhyId(), triple.v1(), triple.v2(), e
);
}
}
return Result.buildSuc(offsetMapList);
}
private Result<ListOffsetsResult> getPartitionOffsetFromKafkaAdminClient(Long clusterPhyId, String topicName, KSOffsetSpec offsetSpec, List<TopicPartition> tpList) {
try { try {
AdminClient adminClient = kafkaAdminClient.getClient(offsetParam.getClusterPhyId()); AdminClient adminClient = kafkaAdminClient.getClient(clusterPhyId);
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(offsetParam.getTopicPartitionOffsets(), new ListOffsetsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)); Map<TopicPartition, OffsetSpec> kafkaOffsetSpecMap = new HashMap<>(tpList.size());
tpList.forEach(elem -> {
if (offsetSpec instanceof KSOffsetSpec.KSEarliestSpec) {
kafkaOffsetSpecMap.put(elem, OffsetSpec.earliest());
} else if (offsetSpec instanceof KSOffsetSpec.KSLatestSpec) {
kafkaOffsetSpecMap.put(elem, OffsetSpec.latest());
} else if (offsetSpec instanceof KSOffsetSpec.KSTimestampSpec) {
kafkaOffsetSpecMap.put(elem, OffsetSpec.forTimestamp(((KSOffsetSpec.KSTimestampSpec) offsetSpec).timestamp()));
}
});
Map<TopicPartition, Long> offsetMap = new HashMap<>(); ListOffsetsResult listOffsetsResult = adminClient.listOffsets(kafkaOffsetSpecMap, new ListOffsetsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
listOffsetsResult.all().get().entrySet().stream().forEach(elem -> offsetMap.put(elem.getKey(), elem.getValue().offset()));
return Result.buildSuc(offsetMap); return Result.buildSuc(listOffsetsResult);
} catch (NotExistException nee) { } catch (NotExistException nee) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId())); return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
} catch (Exception e) { } catch (Exception e) {
log.error( log.error(
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||errMsg=exception!", "method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||errMsg=exception!",
offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e clusterPhyId, topicName, e
); );
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
} }
} }
private Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafkaConsumerClient(VersionItemParam itemParam) { private Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>> batchGetPartitionOffsetFromKafkaConsumerClient(VersionItemParam itemParam) {
PartitionOffsetParam offsetParam = (PartitionOffsetParam) itemParam;
if (offsetParam.getOffsetSpecList().isEmpty()) {
return Result.buildSuc(Collections.emptyList());
}
List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>> offsetMapList = new ArrayList<>();
for (Triple<String, KSOffsetSpec, List<TopicPartition>> triple: offsetParam.getOffsetSpecList()) {
Result<Map<TopicPartition, Long>> subOffsetMapResult = this.getPartitionOffsetFromKafkaConsumerClient(
offsetParam.getClusterPhyId(),
triple.v1(),
triple.v2(),
triple.v3()
);
if (subOffsetMapResult.failed() && offsetParam.getOffsetSpecList().size() == 1) {
return Result.buildFromIgnoreData(subOffsetMapResult);
}
if (subOffsetMapResult.hasData()) {
offsetMapList.add(new Tuple<>(triple.v2(), subOffsetMapResult.getData()));
}
}
return Result.buildSuc(offsetMapList);
}
private Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafkaConsumerClient(Long clusterPhyId, String topicName, KSOffsetSpec offsetSpec, List<TopicPartition> tpList) {
KafkaConsumer<String, String> kafkaConsumer = null; KafkaConsumer<String, String> kafkaConsumer = null;
PartitionOffsetParam offsetParam = (PartitionOffsetParam) itemParam;
try { try {
if (ValidateUtils.isEmptyMap(offsetParam.getTopicPartitionOffsets())) { if (ValidateUtils.isEmptyList(tpList)) {
return Result.buildSuc(new HashMap<>()); return Result.buildSuc(new HashMap<>());
} }
kafkaConsumer = kafkaConsumerClient.getClient(offsetParam.getClusterPhyId()); kafkaConsumer = kafkaConsumerClient.getClient(clusterPhyId);
OffsetSpec offsetSpec = new ArrayList<>(offsetParam.getTopicPartitionOffsets().values()).get(0); if (offsetSpec instanceof KSOffsetSpec.KSLatestSpec) {
if (offsetSpec instanceof OffsetSpec.LatestSpec) {
return Result.buildSuc( return Result.buildSuc(
kafkaConsumer.endOffsets( kafkaConsumer.endOffsets(
offsetParam.getTopicPartitionOffsets().keySet(), tpList,
Duration.ofMillis(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) Duration.ofMillis(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
) )
); );
} }
if (offsetSpec instanceof OffsetSpec.EarliestSpec) { if (offsetSpec instanceof KSOffsetSpec.KSEarliestSpec) {
return Result.buildSuc( return Result.buildSuc(
kafkaConsumer.beginningOffsets( kafkaConsumer.beginningOffsets(
offsetParam.getTopicPartitionOffsets().keySet(), tpList,
Duration.ofMillis(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) Duration.ofMillis(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
) )
); );
} }
if (offsetSpec instanceof OffsetSpec.TimestampSpec) { if (offsetSpec instanceof KSOffsetSpec.KSTimestampSpec) {
// 按照时间进行查找 // 按照时间进行查找
Map<TopicPartition, Long> timestampMap = new HashMap<>(); Map<TopicPartition, Long> timestampMap = new HashMap<>();
offsetParam.getTopicPartitionOffsets().entrySet().stream().forEach(elem -> timestampMap.put(elem.getKey(), offsetParam.getTimestamp())); tpList.forEach(elem -> timestampMap.put(elem, ((KSOffsetSpec.KSTimestampSpec) offsetSpec).timestamp()));
Map<TopicPartition, OffsetAndTimestamp> offsetMetadataMap = kafkaConsumer.offsetsForTimes( Map<TopicPartition, OffsetAndTimestamp> offsetMetadataMap = kafkaConsumer.offsetsForTimes(
timestampMap, timestampMap,
@@ -377,17 +506,17 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "OffsetSpec type illegal"); return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "OffsetSpec type illegal");
} catch (NotExistException nee) { } catch (NotExistException nee) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId())); return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
} catch (Exception e) { } catch (Exception e) {
log.error( log.error(
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||topicName={}||errMsg=exception!", "method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||topicName={}||errMsg=exception!",
offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e clusterPhyId, topicName, e
); );
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
} finally { } finally {
if (kafkaConsumer != null) { if (kafkaConsumer != null) {
kafkaConsumerClient.returnClient(offsetParam.getClusterPhyId(), kafkaConsumer); kafkaConsumerClient.returnClient(clusterPhyId, kafkaConsumer);
} }
} }
} }
@@ -411,7 +540,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
return Result.buildSuc(partitionMap); return Result.buildSuc(partitionMap);
} catch (Exception e) { } catch (Exception e) {
log.error("class=PartitionServiceImpl||method=getPartitionsFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); log.error("method=getPartitionsFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
} }
@@ -430,7 +559,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
} }
return Result.buildSuc(partitionMap); return Result.buildSuc(partitionMap);
} catch (Exception e) { } catch (Exception e) {
log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); log.error("method=getPartitionsFromZKClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
} }
@@ -447,7 +576,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
TopicDescription description = describeTopicsResult.all().get().get(topicName); TopicDescription description = describeTopicsResult.all().get().get(topicName);
return Result.buildSuc(PartitionConverter.convert2PartitionList(clusterPhy.getId(), description)); return Result.buildSuc(PartitionConverter.convert2PartitionList(clusterPhy.getId(), description));
}catch (Exception e) { }catch (Exception e) {
log.error("class=PartitionServiceImpl||method=getPartitionsFromAdminClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e); log.error("method=getPartitionsFromAdminClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e);
return Result.buildFailure(ResultStatus.KAFKA_OPERATE_FAILED); return Result.buildFailure(ResultStatus.KAFKA_OPERATE_FAILED);
} }
} }
@@ -470,7 +599,7 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
} }
return Result.buildSuc(partitionList); return Result.buildSuc(partitionList);
} catch (Exception e) { } catch (Exception e) {
log.error("class=PartitionServiceImpl||method=getPartitionsFromZKClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e); log.error("method=getPartitionsFromZKClientByClusterTopicName||clusterPhyId={}||topicName={}||errMsg=exception", clusterPhy.getId(),topicName, e);
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
} }
} }
@@ -482,21 +611,24 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P
List<Partition> partitionList = new ArrayList<>(); List<Partition> partitionList = new ArrayList<>();
for (PartitionPO po: poList) { for (PartitionPO po: poList) {
if(null != po){partitionList.add(convert2Partition(po));} if(null != po) {
partitionList.add(this.convert2Partition(po));
}
} }
return partitionList; return partitionList;
} }
private List<PartitionPO> convert2PartitionPOList(List<Partition> partitionList) { private Result<Map<TopicPartition, Long>> convert2OffsetMapResult(Result<List<Tuple<KSOffsetSpec, Map<TopicPartition, Long>>>> listResult) {
if (partitionList == null) { if (listResult.failed()) {
return new ArrayList<>(); return Result.buildFromIgnoreData(listResult);
} else if (ValidateUtils.isEmptyList(listResult.getData())) {
return Result.buildSuc(new HashMap<>(0));
} }
List<PartitionPO> poList = new ArrayList<>(); Map<TopicPartition, Long> offsetMap = new HashMap<>();
for (Partition partition: partitionList) { listResult.getData().forEach(elem -> offsetMap.putAll(elem.v2()));
poList.add(this.convert2PartitionPO(partition));
} return Result.buildSuc(offsetMap);
return poList;
} }
private PartitionPO convert2PartitionPO(Partition partition) { private PartitionPO convert2PartitionPO(Partition partition) {

View File

@@ -1,32 +0,0 @@
//package com.xiaojukeji.know.streaming.km.task.kafka.metrics;
//
//import com.didiglobal.logi.job.annotation.Task;
//import com.didiglobal.logi.job.common.TaskResult;
//import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
//import com.xiaojukeji.know.streaming.km.collector.metric.kafka.ReplicaMetricCollector;
//import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//
///**
// * @author didi
// */
//@Slf4j
//@Task(name = "ReplicaMetricCollectorTask",
// description = "Replica指标采集任务",
// cron = "0 0/1 * * * ? *",
// autoRegister = true,
// consensual = ConsensualEnum.BROADCAST,
// timeout = 2 * 60)
//public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
//
// @Autowired
// private ReplicaMetricCollector replicaMetricCollector;
//
// @Override
// public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
// replicaMetricCollector.collectMetrics(clusterPhy);
//
// return TaskResult.SUCCESS;
// }
//}