From 4114777a4ea8adb6dd51049efe1cfe9295b9c604 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 31 Aug 2022 17:11:12 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E8=A1=A5=E5=85=85leader=E9=80=89=E4=B8=BE?= =?UTF-8?q?=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../param/partition/BatchPartitionParam.java | 19 +++ .../service/partition/OpPartitionService.java | 14 +++ .../impl/OpPartitionServiceImpl.java | 119 ++++++++++++++++++ 3 files changed, 152 insertions(+) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java new file mode 100644 index 00000000..d316112f --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java @@ -0,0 +1,19 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +@Data +@NoArgsConstructor +public class BatchPartitionParam extends ClusterPhyParam { + private List tpList; + + public BatchPartitionParam(Long clusterPhyId, List tpList) { + super(clusterPhyId); + this.tpList = tpList; + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java new file mode 100644 index 00000000..6a3611f8 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java @@ -0,0 +1,14 @@ +package com.xiaojukeji.know.streaming.km.core.service.partition; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +public interface OpPartitionService { + + /** + * 优先副本选举 + */ + Result preferredReplicaElection(Long clusterPhyId, List tpList); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java new file mode 100644 index 00000000..0f1186ef --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java @@ -0,0 +1,119 @@ +package com.xiaojukeji.know.streaming.km.core.service.partition.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.BatchPartitionParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; +import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ElectLeadersOptions; +import org.apache.kafka.clients.admin.ElectLeadersResult; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import scala.jdk.javaapi.CollectionConverters; + +import javax.annotation.PostConstruct; +import java.util.HashSet; +import java.util.List; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_PARTITION_LEADER; + + +/** + * @author didi + */ +@Service +public class OpPartitionServiceImpl extends BaseVersionControlService implements OpPartitionService { + private static final ILog LOGGER = LogFactory.getLog(OpPartitionServiceImpl.class); + + @Autowired + private KafkaAdminClient kafkaAdminClient; + + @Autowired + private KafkaAdminZKClient kafkaAdminZKClient; + + public static final String PREFERRED_REPLICA_ELECTION = "PreferredReplicaElection"; + + @Override + protected VersionItemTypeEnum getVersionItemType() { + return SERVICE_OP_PARTITION_LEADER; + } + + @PostConstruct + private void init() { + registerVCHandler(PREFERRED_REPLICA_ELECTION, V_0_10_0_0, V_2_8_0, "preferredReplicaElectionByZKClient", this::preferredReplicaElectionByZKClient); + registerVCHandler(PREFERRED_REPLICA_ELECTION, V_2_8_0, V_MAX, "preferredReplicaElectionByKafkaClient", this::preferredReplicaElectionByKafkaClient); + } + + @Override + public Result preferredReplicaElection(Long clusterPhyId, List tpList) { + try { + return (Result) doVCHandler( + clusterPhyId, + PREFERRED_REPLICA_ELECTION, + new BatchPartitionParam(clusterPhyId, tpList) + ); + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + /**************************************************** private method ****************************************************/ + + private Result preferredReplicaElectionByZKClient(VersionItemParam itemParam) { + BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam; + + try { + KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(partitionParam.getClusterPhyId()); + + kafkaZkClient.createPreferredReplicaElection(CollectionConverters.asScala(partitionParam.getTpList()).toSet()); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "class=OpPartitionServiceImpl||method=preferredReplicaElectionByZKClient||clusterPhyId={}||errMsg=exception", + partitionParam.getClusterPhyId(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } + + private Result preferredReplicaElectionByKafkaClient(VersionItemParam itemParam) { + BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam; + + try { + AdminClient adminClient = kafkaAdminClient.getClient(partitionParam.getClusterPhyId()); + + ElectLeadersResult electLeadersResult = adminClient.electLeaders( + ElectionType.PREFERRED, + new HashSet<>(partitionParam.getTpList()), + new ElectLeadersOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + electLeadersResult.all().get(); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "class=OpPartitionServiceImpl||method=preferredReplicaElectionByKafkaClient||clusterPhyId={}||errMsg=exception", + partitionParam.getClusterPhyId(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } +} From 271ab432d9a9a3608feb99ac34a03814ae2cb01e Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 31 Aug 2022 17:12:34 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=9B=BE=E7=89=87=E9=93=BE=E6=8E=A5?= =?UTF-8?q?=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/dev_guide/解决连接JMX失败.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dev_guide/解决连接JMX失败.md b/docs/dev_guide/解决连接JMX失败.md index a82069ac..546400d6 100644 --- a/docs/dev_guide/解决连接JMX失败.md +++ b/docs/dev_guide/解决连接JMX失败.md @@ -19,7 +19,7 @@ 未开启时,直接到`2、解决方法`查看如何开启即可。 -![check_jmx_opened](./assets/connect_jmx_failed/check_jmx_opened.jpg) +![check_jmx_opened](http://img-ys011.didistatic.com/static/dc2img/do1_dRX6UHE2IUSHqsN95DGb) **类型二:配置错误** From 7be462599f461e0888f2ab3b6bded9cdcd1c36e5 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 31 Aug 2022 17:13:18 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E5=81=A5=E5=BA=B7=E6=A3=80=E6=9F=A5?= =?UTF-8?q?=E6=96=87=E6=A1=88=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/common/enums/health/HealthCheckNameEnum.java | 10 +++++----- km-dist/init/sql/dml-ks-km.sql | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java index d64a42ae..724f5ed3 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java @@ -26,7 +26,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.CLUSTER, "Controller", Constant.HC_CONFIG_NAME_PREFIX + "CLUSTER_NO_CONTROLLER", - "集群Controller数错误", + "集群Controller数正常", HealthCompareValueConfig.class ), @@ -34,7 +34,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.BROKER, "RequestQueueSize", Constant.HC_CONFIG_NAME_PREFIX + "BROKER_REQUEST_QUEUE_FULL", - "Broker-RequestQueueSize被打满", + "Broker-RequestQueueSize指标", HealthCompareValueConfig.class ), @@ -42,7 +42,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.BROKER, "NetworkProcessorAvgIdlePercent", Constant.HC_CONFIG_NAME_PREFIX + "BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW", - "Broker-NetworkProcessorAvgIdlePercent的Idle过低", + "Broker-NetworkProcessorAvgIdlePercent指标", HealthCompareValueConfig.class ), @@ -50,7 +50,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.GROUP, "Group Re-Balance", Constant.HC_CONFIG_NAME_PREFIX + "GROUP_RE_BALANCE_TOO_FREQUENTLY", - "Group re-balance太频繁", + "Group re-balance频率", HealthDetectedInLatestMinutesConfig.class ), @@ -66,7 +66,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.TOPIC, "UnderReplicaTooLong", Constant.HC_CONFIG_NAME_PREFIX + "TOPIC_UNDER_REPLICA_TOO_LONG", - "Topic 长期处于未同步状态", + "Topic 未同步持续时间", HealthDetectedInLatestMinutesConfig.class ), diff --git a/km-dist/init/sql/dml-ks-km.sql b/km-dist/init/sql/dml-ks-km.sql index f986533d..2d354a87 100644 --- a/km-dist/init/sql/dml-ks-km.sql +++ b/km-dist/init/sql/dml-ks-km.sql @@ -1,7 +1,7 @@ -- 检查检查配置 -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_CLUSTER_NO_CONTROLLER','{ \"value\": 1, \"weight\": 30 } ','集群Controller数错误','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_REQUEST_QUEUE_FULL','{ \"value\": 10, \"weight\": 20 } ','Broker请求队列被打满','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW','{ \"value\": 0.8, \"weight\": 20 } ','Broker网络处理线程Idle过低','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_GROUP_RE_BALANCE_TOO_FREQUENTLY','{\n \"latestMinutes\": 10,\n \"detectedTimes\": 8,\n \"weight\": 10\n}\n','Group的re-balance太频繁','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_NO_LEADER','{ \"value\": 1, \"weight\": 10 } ','Topic无Leader数','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_UNDER_REPLICA_TOO_LONG','{ \"latestMinutes\": 10, \"detectedTimes\": 8, \"weight\": 10 } ','Topic长期处于未同步状态','know-stream'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_CLUSTER_NO_CONTROLLER','{ \"value\": 1, \"weight\": 30 } ','集群Controller数正常','know-streaming'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_REQUEST_QUEUE_FULL','{ \"value\": 10, \"weight\": 20 } ','Broker-RequestQueueSize指标','know-streaming'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW','{ \"value\": 0.8, \"weight\": 20 } ','Broker-NetworkProcessorAvgIdlePercent指标','know-streaming'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_GROUP_RE_BALANCE_TOO_FREQUENTLY','{\n \"latestMinutes\": 10,\n \"detectedTimes\": 8,\n \"weight\": 10\n}\n','Group的re-balance频率','know-streaming'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_NO_LEADER','{ \"value\": 1, \"weight\": 10 } ','Topic 无Leader数','know-stream'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_UNDER_REPLICA_TOO_LONG','{ \"latestMinutes\": 10, \"detectedTimes\": 8, \"weight\": 10 } ','Topic 未同步持续时间','know-streaming'); From f005c6bc447b2673b56b434a0912ce107e295fba Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 31 Aug 2022 17:14:56 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E7=89=88=E6=9C=AC=E5=88=97=E8=A1=A8?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=8E=92=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/core/service/cluster/ClusterPhyService.java | 2 +- .../cluster/impl/ClusterPhyServiceImpl.java | 14 +++++++++----- .../api/v3/cluster/MultiClusterPhyController.java | 6 +++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyService.java index b55594b1..56b6640b 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyService.java @@ -73,5 +73,5 @@ public interface ClusterPhyService { * 获取系统已存在的kafka版本列表 * @return */ - Set getClusterVersionSet(); + List getClusterVersionList(); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java index d7c355ef..2ba13738 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java @@ -24,8 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; /** @@ -205,9 +206,12 @@ public class ClusterPhyServiceImpl implements ClusterPhyService { } @Override - public Set getClusterVersionSet() { - List clusterPhyList = listAllClusters(); - Set versionSet = clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet()); - return versionSet; + public List getClusterVersionList() { + List clusterPhyList = this.listAllClusters(); + + List versionList = new ArrayList<>(clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet())); + Collections.sort(versionList); + + return versionList; } } diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java index d443bcac..34b907a8 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java @@ -16,7 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; -import java.util.Set; +import java.util.List; /** @@ -49,7 +49,7 @@ public class MultiClusterPhyController { @ApiOperation(value = "多物理集群-已存在kafka版本", notes = "") @GetMapping(value = "physical-clusters/exist-version") - public Result> getClusterPhysVersion() { - return Result.buildSuc(clusterPhyService.getClusterVersionSet()); + public Result> getClusterPhysVersion() { + return Result.buildSuc(clusterPhyService.getClusterVersionList()); } } From 6e058240b334f66aa5b5af995fc5fd76e3de3f84 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 31 Aug 2022 17:15:49 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=8C=87=E6=A0=87=E9=87=87=E9=9B=86?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E6=97=B6=E9=97=B4=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../enums/version/VersionItemTypeEnum.java | 2 + .../cache/CollectedMetricsLocalCache.java | 53 ++++++++++--------- .../broker/impl/BrokerMetricServiceImpl.java | 7 +-- .../impl/ClusterMetricServiceImpl.java | 7 +-- .../impl/PartitionMetricServiceImpl.java | 11 ++-- .../impl/ReplicaMetricServiceImpl.java | 17 +++--- .../topic/impl/TopicMetricServiceImpl.java | 11 ++-- .../metrics/ClusterMetricVersionItems.java | 1 + 8 files changed, 53 insertions(+), 56 deletions(-) diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java index 270999b4..15f13175 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java @@ -31,9 +31,11 @@ public enum VersionItemTypeEnum { SERVICE_OP_PARTITION(320, "service_partition_operation"), + SERVICE_OP_PARTITION_LEADER(321, "service_partition-leader_operation"), SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"), + /** * 前端操作 */ diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java index c0469fb6..bc5b1c34 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java @@ -10,13 +10,13 @@ import java.util.concurrent.TimeUnit; public class CollectedMetricsLocalCache { private static final Cache brokerMetricsCache = Caffeine.newBuilder() - .expireAfterWrite(60, TimeUnit.SECONDS) - .maximumSize(2000) + .expireAfterWrite(90, TimeUnit.SECONDS) + .maximumSize(10000) .build(); private static final Cache> topicMetricsCache = Caffeine.newBuilder() .expireAfterWrite(90, TimeUnit.SECONDS) - .maximumSize(5000) + .maximumSize(10000) .build(); private static final Cache> partitionMetricsCache = Caffeine.newBuilder() @@ -29,63 +29,64 @@ public class CollectedMetricsLocalCache { .maximumSize(20000) .build(); - public static Float getBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName) { - return brokerMetricsCache.getIfPresent(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName)); + public static Float getBrokerMetrics(String brokerMetricKey) { + return brokerMetricsCache.getIfPresent(brokerMetricKey); } - public static void putBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName, Float value) { + public static void putBrokerMetrics(String brokerMetricKey, Float value) { if (value == null) { return; } - brokerMetricsCache.put(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value); + + brokerMetricsCache.put(brokerMetricKey, value); } - public static List getTopicMetrics(Long clusterPhyId, String topicName, String metricName) { - return topicMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName)); + public static List getTopicMetrics(String topicMetricKey) { + return topicMetricsCache.getIfPresent(topicMetricKey); } - public static void putTopicMetrics(Long clusterPhyId, String topicName, String metricName, List metricsList) { + public static void putTopicMetrics(String topicMetricKey, List metricsList) { if (metricsList == null) { return; } - topicMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList); + + topicMetricsCache.put(topicMetricKey, metricsList); } - public static List getPartitionMetricsList(Long clusterPhyId, String topicName, String metricName) { - return partitionMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName)); + public static List getPartitionMetricsList(String partitionMetricKey) { + return partitionMetricsCache.getIfPresent(partitionMetricKey); } - public static void putPartitionMetricsList(Long clusterPhyId, String topicName, String metricName, List metricsList) { + public static void putPartitionMetricsList(String partitionMetricsKey, List metricsList) { if (metricsList == null) { return; } - partitionMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList); + partitionMetricsCache.put(partitionMetricsKey, metricsList); } - public static Float getReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) { - return replicaMetricsValueCache.getIfPresent(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName)); + public static Float getReplicaMetrics(String replicaMetricsKey) { + return replicaMetricsValueCache.getIfPresent(replicaMetricsKey); } - public static void putReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName, Float value) { + public static void putReplicaMetrics(String replicaMetricsKey, Float value) { if (value == null) { return; } - replicaMetricsValueCache.put(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value); + replicaMetricsValueCache.put(replicaMetricsKey, value); } - - /**************************************************** private method ****************************************************/ - - - private static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) { + public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) { return clusterPhyId + "@" + brokerId + "@" + metricName; } - private static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) { + public static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) { return clusterPhyId + "@" + topicName + "@" + metricName; } - private static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) { + public static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) { return clusterPhyId + "@" + brokerId + "@" + topicName + "@" + partitionId + "@" + metricName; } + + /**************************************************** private method ****************************************************/ + } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java index 6c7dec0e..93c343ff 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java @@ -110,9 +110,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker } @Override - public Result collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric){ + public Result collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric) { + String brokerMetricKey = CollectedMetricsLocalCache.genBrokerMetricKey(clusterId, brokerId, metric); - Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric); + Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(brokerMetricKey); if(null != keyValue) { BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId); brokerMetrics.putMetric(metric, keyValue); @@ -124,7 +125,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker Map metricsMap = ret.getData().getMetrics(); for(Map.Entry metricNameAndValueEntry : metricsMap.entrySet()){ - CollectedMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue()); + CollectedMetricsLocalCache.putBrokerMetrics(brokerMetricKey, metricNameAndValueEntry.getValue()); } return ret; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java index fee8fb0e..075c53c2 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java @@ -126,7 +126,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust private TopicMetricService topicMetricService; @Autowired - private TopicService topicService; + private TopicService topicService; @Autowired private PartitionService partitionService; @@ -728,13 +728,10 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust Long clusterId = param.getClusterId(); //1、获取jmx的属性信息 - VersionJmxInfo jmxInfo = getJMXInfo(clusterId, metric); - if(null == jmxInfo){return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);} - List brokers = brokerService.listAliveBrokersFromDB(clusterId); float metricVale = 0f; - for(Broker broker : brokers){ + for(Broker broker : brokers) { Result ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterId, broker.getBrokerId(), metric); if(null == ret || ret.failed() || null == ret.getData()){continue;} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java index 9e354634..9104b398 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java @@ -75,7 +75,9 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par @Override public Result> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName) { - List metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(clusterPhyId, topicName, metricName); + String partitionMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName); + + List metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(partitionMetricsKey); if(null != metricsList) { return Result.buildSuc(metricsList); } @@ -88,12 +90,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par // 更新cache PartitionMetrics metrics = metricsResult.getData().get(0); metrics.getMetrics().entrySet().forEach( - metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList( - clusterPhyId, - metrics.getTopic(), - metricEntry.getKey(), - metricsResult.getData() - ) + metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(partitionMetricsKey, metricsResult.getData()) ); return metricsResult; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java index 5240e8b9..460e6520 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java @@ -77,9 +77,14 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli } @Override - public Result collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic, - Integer brokerId, Integer partitionId, String metric){ - Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(clusterPhyId, brokerId, topic, partitionId, metric); + public Result collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, + String topic, + Integer brokerId, + Integer partitionId, + String metric) { + String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric); + + Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey); if(null != keyValue){ ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId); replicationMetrics.putMetric(metric, keyValue); @@ -92,11 +97,7 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli // 更新cache ret.getData().getMetrics().entrySet().stream().forEach( metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics( - clusterPhyId, - brokerId, - topic, - partitionId, - metricNameAndValueEntry.getKey(), + replicaMetricsKey, metricNameAndValueEntry.getValue() ) ); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java index d7cca017..478c142b 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java @@ -120,7 +120,9 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe @Override public Result> collectTopicMetricsFromKafkaWithCacheFirst(Long clusterPhyId, String topicName, String metricName) { - List metricsList = CollectedMetricsLocalCache.getTopicMetrics(clusterPhyId, topicName, metricName); + String topicMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName); + + List metricsList = CollectedMetricsLocalCache.getTopicMetrics(topicMetricsKey); if(null != metricsList) { return Result.buildSuc(metricsList); } @@ -133,12 +135,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe // 更新cache TopicMetrics metrics = metricsResult.getData().get(0); metrics.getMetrics().entrySet().forEach( - metricEntry -> CollectedMetricsLocalCache.putTopicMetrics( - clusterPhyId, - metrics.getTopic(), - metricEntry.getKey(), - metricsResult.getData() - ) + metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(topicMetricsKey, metricsResult.getData()) ); return metricsResult; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java index d4c58d69..d3357ab4 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java @@ -35,6 +35,7 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric { public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster"; public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster"; public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster"; + public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize"; public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize"; public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";