mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-05 13:08:48 +08:00
[Bugfix]修复采集副本指标时,参数传递错误问题(#867)
This commit is contained in:
@@ -373,8 +373,8 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
|
|||||||
Result<ReplicationMetrics> metricsResult = replicaMetricService.collectReplicaMetricsFromKafka(
|
Result<ReplicationMetrics> metricsResult = replicaMetricService.collectReplicaMetricsFromKafka(
|
||||||
clusterId,
|
clusterId,
|
||||||
p.getTopicName(),
|
p.getTopicName(),
|
||||||
brokerId,
|
|
||||||
p.getPartitionId(),
|
p.getPartitionId(),
|
||||||
|
brokerId,
|
||||||
ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE
|
ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -78,8 +78,8 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
|
|||||||
Result<ReplicationMetrics> ret = this.collectReplicaMetricsFromKafka(
|
Result<ReplicationMetrics> ret = this.collectReplicaMetricsFromKafka(
|
||||||
clusterId,
|
clusterId,
|
||||||
metrics.getTopic(),
|
metrics.getTopic(),
|
||||||
metrics.getBrokerId(),
|
|
||||||
metrics.getPartitionId(),
|
metrics.getPartitionId(),
|
||||||
|
metrics.getBrokerId(),
|
||||||
metricName
|
metricName
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -146,8 +146,8 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
|
|||||||
Integer brokerId = metricParam.getBrokerId();
|
Integer brokerId = metricParam.getBrokerId();
|
||||||
Integer partitionId = metricParam.getPartitionId();
|
Integer partitionId = metricParam.getPartitionId();
|
||||||
|
|
||||||
Result<ReplicationMetrics> endRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET);
|
Result<ReplicationMetrics> endRet = this.collectReplicaMetricsFromKafka(clusterId, topic, partitionId, brokerId, REPLICATION_METRIC_LOG_END_OFFSET);
|
||||||
Result<ReplicationMetrics> startRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET);
|
Result<ReplicationMetrics> startRet = this.collectReplicaMetricsFromKafka(clusterId, topic, partitionId, brokerId, REPLICATION_METRIC_LOG_START_OFFSET);
|
||||||
|
|
||||||
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterId, topic, brokerId, partitionId);
|
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterId, topic, brokerId, partitionId);
|
||||||
if(null != endRet && endRet.successful() && null != startRet && startRet.successful()){
|
if(null != endRet && endRet.successful() && null != startRet && startRet.successful()){
|
||||||
@@ -155,6 +155,8 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
|
|||||||
Float startOffset = startRet.getData().getMetrics().get(REPLICATION_METRIC_LOG_START_OFFSET);
|
Float startOffset = startRet.getData().getMetrics().get(REPLICATION_METRIC_LOG_START_OFFSET);
|
||||||
|
|
||||||
replicationMetrics.putMetric(metric, endOffset - startOffset);
|
replicationMetrics.putMetric(metric, endOffset - startOffset);
|
||||||
|
replicationMetrics.putMetric(REPLICATION_METRIC_LOG_END_OFFSET, endOffset);
|
||||||
|
replicationMetrics.putMetric(REPLICATION_METRIC_LOG_START_OFFSET, startOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Result.buildSuc(replicationMetrics);
|
return Result.buildSuc(replicationMetrics);
|
||||||
|
|||||||
Reference in New Issue
Block a user