From dd6004b9d4eda9823305fdcf26159193b2d79b59 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 11 Jan 2023 11:21:34 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E4=BF=AE=E5=A4=8D=E9=87=87=E9=9B=86?= =?UTF-8?q?=E5=89=AF=E6=9C=AC=E6=8C=87=E6=A0=87=E6=97=B6=EF=BC=8C=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E4=BC=A0=E9=80=92=E9=94=99=E8=AF=AF=E9=97=AE=E9=A2=98?= =?UTF-8?q?(#867)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/service/broker/impl/BrokerMetricServiceImpl.java | 2 +- .../service/replica/impl/ReplicaMetricServiceImpl.java | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java index a1320b90..20d89362 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java @@ -373,8 +373,8 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker Result metricsResult = replicaMetricService.collectReplicaMetricsFromKafka( clusterId, p.getTopicName(), - brokerId, p.getPartitionId(), + brokerId, ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE ); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java index 92986d4b..6b0c28d2 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java @@ -78,8 +78,8 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli Result ret = this.collectReplicaMetricsFromKafka( clusterId, metrics.getTopic(), - metrics.getBrokerId(), metrics.getPartitionId(), + metrics.getBrokerId(), metricName ); @@ -146,8 +146,8 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli Integer brokerId = metricParam.getBrokerId(); Integer partitionId = metricParam.getPartitionId(); - Result endRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET); - Result startRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET); + Result endRet = this.collectReplicaMetricsFromKafka(clusterId, topic, partitionId, brokerId, REPLICATION_METRIC_LOG_END_OFFSET); + Result startRet = this.collectReplicaMetricsFromKafka(clusterId, topic, partitionId, brokerId, REPLICATION_METRIC_LOG_START_OFFSET); ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterId, topic, brokerId, partitionId); if(null != endRet && endRet.successful() && null != startRet && startRet.successful()){ @@ -155,6 +155,8 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli Float startOffset = startRet.getData().getMetrics().get(REPLICATION_METRIC_LOG_START_OFFSET); replicationMetrics.putMetric(metric, endOffset - startOffset); + replicationMetrics.putMetric(REPLICATION_METRIC_LOG_END_OFFSET, endOffset); + replicationMetrics.putMetric(REPLICATION_METRIC_LOG_START_OFFSET, startOffset); } return Result.buildSuc(replicationMetrics);