diff --git a/README.md b/README.md index a179bf2e..c5830ffe 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,7 @@ 👍 我们正在组建国内最大,最权威的 **[Kafka中文社区](https://z.didi.cn/5gSF9)** -在这里你可以结交各大互联网的 Kafka大佬 以及 3000+ Kafka爱好者,一起实现知识共享,实时掌控最新行业资讯,期待 👏   您的加入中~ https://z.didi.cn/5gSF9 +在这里你可以结交各大互联网的 Kafka大佬 以及 4000+ Kafka爱好者,一起实现知识共享,实时掌控最新行业资讯,期待 👏   您的加入中~ https://z.didi.cn/5gSF9 有问必答~! 互动有礼~! @@ -132,4 +132,8 @@ PS: 提问请尽量把问题一次性描述清楚,并告知环境信息情况 **`2、微信群`** -微信加群:添加`mike_zhangliang`、`danke-x`的微信号备注Logi加群。 \ No newline at end of file +微信加群:添加`mike_zhangliang`、`PenceXie`的微信号备注KnowStreaming加群。 + +## Star History + +[![Star History Chart](https://api.star-history.com/svg?repos=didi/KnowStreaming&type=Date)](https://star-history.com/#didi/KnowStreaming&Date) diff --git a/docs/assets/KnowStreamingLogo.png b/docs/assets/KnowStreamingLogo.png deleted file mode 100644 index f38dd42a..00000000 Binary files a/docs/assets/KnowStreamingLogo.png and /dev/null differ diff --git a/docs/assets/readme/KnowStreamingPageDemo.jpg b/docs/assets/readme/KnowStreamingPageDemo.jpg deleted file mode 100644 index a8d97df1..00000000 Binary files a/docs/assets/readme/KnowStreamingPageDemo.jpg and /dev/null differ diff --git a/docs/assets/readme/WeChat.png b/docs/assets/readme/WeChat.png deleted file mode 100644 index 262d0aae..00000000 Binary files a/docs/assets/readme/WeChat.png and /dev/null differ diff --git a/docs/assets/readme/ZSXQ.jpeg b/docs/assets/readme/ZSXQ.jpeg deleted file mode 100644 index 121bf9b1..00000000 Binary files a/docs/assets/readme/ZSXQ.jpeg and /dev/null differ diff --git a/docs/dev_guide/assets/multi_version_compatible/registerHandler.png b/docs/dev_guide/assets/multi_version_compatible/registerHandler.png deleted file mode 100644 index f7b040dc..00000000 Binary files a/docs/dev_guide/assets/multi_version_compatible/registerHandler.png and /dev/null differ diff --git a/docs/dev_guide/assets/startup_using_source_code/IDEA配置.jpg b/docs/dev_guide/assets/startup_using_source_code/IDEA配置.jpg deleted file mode 100644 index 237aaa42..00000000 Binary files a/docs/dev_guide/assets/startup_using_source_code/IDEA配置.jpg and /dev/null differ diff --git a/docs/dev_guide/多版本兼容方案.md b/docs/dev_guide/多版本兼容方案.md index 389d0650..f41c01d4 100644 --- a/docs/dev_guide/多版本兼容方案.md +++ b/docs/dev_guide/多版本兼容方案.md @@ -36,7 +36,7 @@ KS-KM 根据其需要纳管的 kafka 版本,按照上述三个维度构建了   KS-KM 的每个版本针对需要纳管的 kafka 版本列表,事先分析各个版本的差异性和产品需求,同时 KS-KM 构建了一套专门处理兼容性的服务,来进行兼容性的注册、字典构建、处理器分发等操作,其中版本兼容性处理器是来具体处理不同 kafka 版本差异性的地方。 -​ ![registerHandler](./assets/multi_version_compatible/registerHandler.png) +​ ![registerHandler](http://img-ys011.didistatic.com/static/dc2img/do1_WxVTzndYE59ah5DFrMfn)   如上图所示,KS-KM 的 topic 服务在面对不同 kafka 版本时,其 topic 的创建、删除、扩容由于 kafka 版本自身的差异,导致 KnowStreaming 的处理也不一样,所以需要根据不同的 kafka 版本来实现不同的兼容性处理器,同时向 KnowStreaming 的兼容服务进行兼容性的注册,构建兼容性字典,后续在 KnowStreaming 的运行过程中,针对不同的 kafka 版本即可分发到不同的处理器中执行。 diff --git a/docs/dev_guide/本地源码启动手册.md b/docs/dev_guide/本地源码启动手册.md index ed21c3b8..35936a9a 100644 --- a/docs/dev_guide/本地源码启动手册.md +++ b/docs/dev_guide/本地源码启动手册.md @@ -29,7 +29,7 @@ - 初始化 MySQL 表及数据 - 初始化 Elasticsearch 索引 -具体见:[快速开始](./1-quick-start.md) 中的最后一步,部署 KnowStreaming 服务中的初始化相关工作。 +具体见:[单机部署手册](../install_guide/单机部署手册.md) 中的最后一步,部署 KnowStreaming 服务中的初始化相关工作。 ### 6.1.4、本地启动 @@ -73,7 +73,7 @@ km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/KnowStreaming.java IDEA 更多具体的配置如下图所示:

- +

**第四步:启动项目** diff --git a/docs/dev_guide/解决连接JMX失败.md b/docs/dev_guide/解决连接JMX失败.md index f66a5ab0..546400d6 100644 --- a/docs/dev_guide/解决连接JMX失败.md +++ b/docs/dev_guide/解决连接JMX失败.md @@ -1,5 +1,5 @@ -![Logo](../assets/KnowStreamingLogo.png) +![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png) ## JMX-连接失败问题解决 @@ -19,7 +19,7 @@ 未开启时,直接到`2、解决方法`查看如何开启即可。 -![check_jmx_opened](./assets/connect_jmx_failed/check_jmx_opened.jpg) +![check_jmx_opened](http://img-ys011.didistatic.com/static/dc2img/do1_dRX6UHE2IUSHqsN95DGb) **类型二:配置错误** diff --git a/docs/install_guide/源码编译打包手册.md b/docs/install_guide/源码编译打包手册.md index b0b20101..708396b6 100644 --- a/docs/install_guide/源码编译打包手册.md +++ b/docs/install_guide/源码编译打包手册.md @@ -1,5 +1,5 @@ -![Logo](../assets/KnowStreamingLogo.png) +![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png) # `Know Streaming` 源码编译打包手册 diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 81ed009f..a0418bb2 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -129,7 +129,12 @@ public class TopicStateManagerImpl implements TopicStateManager { return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); } - // 获取分区offset + // 获取分区beginOffset + Result> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.earliest(), null); + if (beginOffsetsMapResult.failed()) { + return Result.buildFromIgnoreData(beginOffsetsMapResult); + } + // 获取分区endOffset Result> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.latest(), null); if (endOffsetsMapResult.failed()) { return Result.buildFromIgnoreData(endOffsetsMapResult); @@ -142,13 +147,25 @@ public class TopicStateManagerImpl implements TopicStateManager { // 创建kafka-consumer kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords())); - kafkaConsumer.assign(endOffsetsMapResult.getData().keySet()); - for (Map.Entry entry: endOffsetsMapResult.getData().entrySet()) { - kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords())); + List partitionList = new ArrayList<>(); + long maxMessage = 0; + for (Map.Entry entry : endOffsetsMapResult.getData().entrySet()) { + long begin = beginOffsetsMapResult.getData().get(entry.getKey()); + long end = entry.getValue(); + if (begin == end){ + continue; + } + maxMessage += end - begin; + partitionList.add(entry.getKey()); + } + maxMessage = Math.min(maxMessage, dto.getMaxRecords()); + kafkaConsumer.assign(partitionList); + for (TopicPartition partition : partitionList) { + kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); } // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间 - while (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS <= dto.getPullTimeoutUnitMs() && voList.size() < dto.getMaxRecords()) { + while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS)); for (ConsumerRecord consumerRecord : consumerRecords) { if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) { diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java new file mode 100644 index 00000000..d316112f --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java @@ -0,0 +1,19 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +@Data +@NoArgsConstructor +public class BatchPartitionParam extends ClusterPhyParam { + private List tpList; + + public BatchPartitionParam(Long clusterPhyId, List tpList) { + super(clusterPhyId); + this.tpList = tpList; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index d7c1c960..fae5db21 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -63,4 +63,5 @@ public class Constant { public static final String COLLECT_METRICS_COST_TIME_METRICS_NAME = "CollectMetricsCostTimeUnitSec"; public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F; + public static final Integer DEFAULT_RETRY_TIME = 3; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java index d64a42ae..724f5ed3 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/health/HealthCheckNameEnum.java @@ -26,7 +26,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.CLUSTER, "Controller", Constant.HC_CONFIG_NAME_PREFIX + "CLUSTER_NO_CONTROLLER", - "集群Controller数错误", + "集群Controller数正常", HealthCompareValueConfig.class ), @@ -34,7 +34,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.BROKER, "RequestQueueSize", Constant.HC_CONFIG_NAME_PREFIX + "BROKER_REQUEST_QUEUE_FULL", - "Broker-RequestQueueSize被打满", + "Broker-RequestQueueSize指标", HealthCompareValueConfig.class ), @@ -42,7 +42,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.BROKER, "NetworkProcessorAvgIdlePercent", Constant.HC_CONFIG_NAME_PREFIX + "BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW", - "Broker-NetworkProcessorAvgIdlePercent的Idle过低", + "Broker-NetworkProcessorAvgIdlePercent指标", HealthCompareValueConfig.class ), @@ -50,7 +50,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.GROUP, "Group Re-Balance", Constant.HC_CONFIG_NAME_PREFIX + "GROUP_RE_BALANCE_TOO_FREQUENTLY", - "Group re-balance太频繁", + "Group re-balance频率", HealthDetectedInLatestMinutesConfig.class ), @@ -66,7 +66,7 @@ public enum HealthCheckNameEnum { HealthCheckDimensionEnum.TOPIC, "UnderReplicaTooLong", Constant.HC_CONFIG_NAME_PREFIX + "TOPIC_UNDER_REPLICA_TOO_LONG", - "Topic 长期处于未同步状态", + "Topic 未同步持续时间", HealthDetectedInLatestMinutesConfig.class ), diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java index 270999b4..15f13175 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java @@ -31,9 +31,11 @@ public enum VersionItemTypeEnum { SERVICE_OP_PARTITION(320, "service_partition_operation"), + SERVICE_OP_PARTITION_LEADER(321, "service_partition-leader_operation"), SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"), + /** * 前端操作 */ diff --git a/km-console/packages/config-manager-fe/package.json b/km-console/packages/config-manager-fe/package.json index 00d8c3b8..78ae5901 100644 --- a/km-console/packages/config-manager-fe/package.json +++ b/km-console/packages/config-manager-fe/package.json @@ -36,7 +36,8 @@ "react-intl": "^3.2.1", "react-router-cache-route": "^1.11.1", "single-spa": "^5.8.0", - "single-spa-react": "^2.14.0" + "single-spa-react": "^2.14.0", + "knowdesign": "1.3.7" }, "devDependencies": { "@ant-design/icons": "^4.6.2", @@ -54,7 +55,6 @@ "@babel/preset-env": "^7.4.2", "@babel/preset-react": "^7.0.0", "@babel/preset-typescript": "^7.14.5", - "knowdesign": "^1.3.6", "@pmmmwh/react-refresh-webpack-plugin": "^0.5.1", "@types/lodash": "^4.14.138", "@types/react-dom": "^17.0.5", diff --git a/km-console/packages/layout-clusters-fe/package.json b/km-console/packages/layout-clusters-fe/package.json index 8afcc8ff..8ee1130f 100644 --- a/km-console/packages/layout-clusters-fe/package.json +++ b/km-console/packages/layout-clusters-fe/package.json @@ -58,7 +58,8 @@ "react-joyride": "^2.5.0", "single-spa": "5.9.3", "single-spa-react": "2.14.0", - "webpack-bundle-analyzer": "^4.5.0" + "webpack-bundle-analyzer": "^4.5.0", + "knowdesign": "1.3.7" }, "devDependencies": { "@babel/core": "^7.5.5", @@ -74,7 +75,6 @@ "@babel/preset-env": "^7.4.2", "@babel/preset-react": "^7.0.0", "@babel/preset-typescript": "^7.14.5", - "knowdesign": "^1.3.6", "@pmmmwh/react-refresh-webpack-plugin": "^0.5.1", "@types/crypto-js": "^4.1.0", "@types/lodash": "^4.14.171", diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java index c0469fb6..bc5b1c34 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/CollectedMetricsLocalCache.java @@ -10,13 +10,13 @@ import java.util.concurrent.TimeUnit; public class CollectedMetricsLocalCache { private static final Cache brokerMetricsCache = Caffeine.newBuilder() - .expireAfterWrite(60, TimeUnit.SECONDS) - .maximumSize(2000) + .expireAfterWrite(90, TimeUnit.SECONDS) + .maximumSize(10000) .build(); private static final Cache> topicMetricsCache = Caffeine.newBuilder() .expireAfterWrite(90, TimeUnit.SECONDS) - .maximumSize(5000) + .maximumSize(10000) .build(); private static final Cache> partitionMetricsCache = Caffeine.newBuilder() @@ -29,63 +29,64 @@ public class CollectedMetricsLocalCache { .maximumSize(20000) .build(); - public static Float getBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName) { - return brokerMetricsCache.getIfPresent(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName)); + public static Float getBrokerMetrics(String brokerMetricKey) { + return brokerMetricsCache.getIfPresent(brokerMetricKey); } - public static void putBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName, Float value) { + public static void putBrokerMetrics(String brokerMetricKey, Float value) { if (value == null) { return; } - brokerMetricsCache.put(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value); + + brokerMetricsCache.put(brokerMetricKey, value); } - public static List getTopicMetrics(Long clusterPhyId, String topicName, String metricName) { - return topicMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName)); + public static List getTopicMetrics(String topicMetricKey) { + return topicMetricsCache.getIfPresent(topicMetricKey); } - public static void putTopicMetrics(Long clusterPhyId, String topicName, String metricName, List metricsList) { + public static void putTopicMetrics(String topicMetricKey, List metricsList) { if (metricsList == null) { return; } - topicMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList); + + topicMetricsCache.put(topicMetricKey, metricsList); } - public static List getPartitionMetricsList(Long clusterPhyId, String topicName, String metricName) { - return partitionMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName)); + public static List getPartitionMetricsList(String partitionMetricKey) { + return partitionMetricsCache.getIfPresent(partitionMetricKey); } - public static void putPartitionMetricsList(Long clusterPhyId, String topicName, String metricName, List metricsList) { + public static void putPartitionMetricsList(String partitionMetricsKey, List metricsList) { if (metricsList == null) { return; } - partitionMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList); + partitionMetricsCache.put(partitionMetricsKey, metricsList); } - public static Float getReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) { - return replicaMetricsValueCache.getIfPresent(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName)); + public static Float getReplicaMetrics(String replicaMetricsKey) { + return replicaMetricsValueCache.getIfPresent(replicaMetricsKey); } - public static void putReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName, Float value) { + public static void putReplicaMetrics(String replicaMetricsKey, Float value) { if (value == null) { return; } - replicaMetricsValueCache.put(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value); + replicaMetricsValueCache.put(replicaMetricsKey, value); } - - /**************************************************** private method ****************************************************/ - - - private static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) { + public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) { return clusterPhyId + "@" + brokerId + "@" + metricName; } - private static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) { + public static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) { return clusterPhyId + "@" + topicName + "@" + metricName; } - private static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) { + public static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) { return clusterPhyId + "@" + brokerId + "@" + topicName + "@" + partitionId + "@" + metricName; } + + /**************************************************** private method ****************************************************/ + } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java index 5c52af61..836c7d56 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.acl.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -10,10 +11,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; @@ -58,6 +61,9 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka @Autowired private KafkaAdminZKClient kafkaAdminZKClient; + @Autowired + private ClusterPhyService clusterPhyService; + @Override protected VersionItemTypeEnum getVersionItemType() { return VersionItemTypeEnum.SERVICE_OP_ACL; @@ -175,6 +181,18 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka private Result> getAclByKafkaClient(VersionItemParam itemParam) { ClusterPhyParam param = (ClusterPhyParam) itemParam; try { + // 获取集群 + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId()); + if (clusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId())); + } + + // 判断是否开启认证 + if (!ClusterAuthTypeEnum.enableAuth(clusterPhy.getAuthType())) { + log.warn("method=getAclByKafkaClient||clusterPhyId={}||msg=not open auth and ignore get acls", clusterPhy.getId()); + return Result.buildSuc(new ArrayList<>()); + } + AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); DescribeAclsResult describeAclsResult = diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java index d1f181b4..62f03e65 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java @@ -44,6 +44,7 @@ public interface BrokerService { * 获取具体Broker */ Broker getBroker(Long clusterPhyId, Integer brokerId); + Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId); /** * 获取BrokerLog-Dir信息 diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java index d47aa2ea..93c343ff 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java @@ -110,9 +110,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker } @Override - public Result collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric){ + public Result collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric) { + String brokerMetricKey = CollectedMetricsLocalCache.genBrokerMetricKey(clusterId, brokerId, metric); - Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric); + Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(brokerMetricKey); if(null != keyValue) { BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId); brokerMetrics.putMetric(metric, keyValue); @@ -124,7 +125,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker Map metricsMap = ret.getData().getMetrics(); for(Map.Entry metricNameAndValueEntry : metricsMap.entrySet()){ - CollectedMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue()); + CollectedMetricsLocalCache.putBrokerMetrics(brokerMetricKey, metricNameAndValueEntry.getValue()); } return ret; @@ -178,11 +179,16 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker @Override public Result> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, MetricDTO dto) { - Map metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(clusterPhyId, brokerId, - dto.getMetricsNames(), dto.getAggType(), dto.getStartTime(), dto.getEndTime()); + Map metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint( + clusterPhyId, + brokerId, + dto.getMetricsNames(), + dto.getAggType(), + dto.getStartTime(), + dto.getEndTime() + ); - List metricPoints = new ArrayList<>(metricPointMap.values()); - return Result.buildSuc(metricPoints); + return Result.buildSuc(new ArrayList<>(metricPointMap.values())); } @Override @@ -199,8 +205,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker brokerMetrics.add(ConvertUtil.obj2Obj(brokerMetricPO, BrokerMetrics.class)); } catch (Exception e) { - LOGGER.error("method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception", - clusterPhyId, brokerId, e); + LOGGER.error( + "method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception", + clusterPhyId, brokerId, e + ); } } @@ -219,6 +227,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker } /**************************************************** private method ****************************************************/ + private List listTopNBrokerIds(Long clusterId, Integer topN){ List brokers = brokerService.listAliveBrokersFromDB(clusterId); if(CollectionUtils.isEmpty(brokers)){return new ArrayList<>();} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index 3ab9f3fa..e9f8c933 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -206,6 +206,22 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class); } + @Override + public Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId) { + List brokerList = this.listAliveBrokersFromCacheFirst(clusterPhyId); + if (brokerList == null) { + return null; + } + + for (Broker broker: brokerList) { + if (brokerId.equals(broker.getBrokerId())) { + return broker; + } + } + + return null; + } + @Override public Result> getBrokerLogDirDescFromKafka(Long clusterPhyId, Integer brokerId) { try { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/change/record/impl/KafkaChangeRecordServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/change/record/impl/KafkaChangeRecordServiceImpl.java index 0c4eefe7..40265a8f 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/change/record/impl/KafkaChangeRecordServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/change/record/impl/KafkaChangeRecordServiceImpl.java @@ -5,14 +5,19 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.po.changerecord.KafkaChangeRecordPO; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; import com.xiaojukeji.know.streaming.km.persistence.mysql.changerecord.KafkaChangeRecordDAO; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; +import java.util.List; +import java.util.concurrent.TimeUnit; + + @Service public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService { private static final ILog log = LogFactory.getLog(KafkaChangeRecordServiceImpl.class); @@ -20,11 +25,24 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService { @Autowired private KafkaChangeRecordDAO kafkaChangeRecordDAO; + private static final Cache recordCache = Caffeine.newBuilder() + .expireAfterWrite(12, TimeUnit.HOURS) + .maximumSize(1000) + .build(); + @Override public int insertAndIgnoreDuplicate(KafkaChangeRecordPO recordPO) { try { + String cacheData = recordCache.getIfPresent(recordPO.getUniqueField()); + if (cacheData != null || this.checkExistInDB(recordPO.getUniqueField())) { + // 已存在时,则直接返回 + return 0; + } + + recordCache.put(recordPO.getUniqueField(), recordPO.getUniqueField()); + return kafkaChangeRecordDAO.insert(recordPO); - } catch (DuplicateKeyException dke) { + } catch (Exception e) { return 0; } } @@ -40,4 +58,12 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService { /**************************************************** private method ****************************************************/ + private boolean checkExistInDB(String uniqueField) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(KafkaChangeRecordPO::getUniqueField, uniqueField); + + List poList = kafkaChangeRecordDAO.selectList(lambdaQueryWrapper); + + return poList != null && !poList.isEmpty(); + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyService.java index b55594b1..56b6640b 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyService.java @@ -73,5 +73,5 @@ public interface ClusterPhyService { * 获取系统已存在的kafka版本列表 * @return */ - Set getClusterVersionSet(); + List getClusterVersionList(); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java index fee8fb0e..075c53c2 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterMetricServiceImpl.java @@ -126,7 +126,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust private TopicMetricService topicMetricService; @Autowired - private TopicService topicService; + private TopicService topicService; @Autowired private PartitionService partitionService; @@ -728,13 +728,10 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust Long clusterId = param.getClusterId(); //1、获取jmx的属性信息 - VersionJmxInfo jmxInfo = getJMXInfo(clusterId, metric); - if(null == jmxInfo){return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);} - List brokers = brokerService.listAliveBrokersFromDB(clusterId); float metricVale = 0f; - for(Broker broker : brokers){ + for(Broker broker : brokers) { Result ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterId, broker.getBrokerId(), metric); if(null == ret || ret.failed() || null == ret.getData()){continue;} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java index 562645c0..2ba13738 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java @@ -24,8 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; /** @@ -111,7 +112,7 @@ public class ClusterPhyServiceImpl implements ClusterPhyService { throw new DuplicateException(String.format("clusterName:%s duplicated", clusterPhyPO.getName())); } catch (Exception e) { - log.error("cmethod=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e); + log.error("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e); throw new AdminOperateException("add cluster failed", e, ResultStatus.MYSQL_OPERATE_FAILED); } @@ -205,9 +206,12 @@ public class ClusterPhyServiceImpl implements ClusterPhyService { } @Override - public Set getClusterVersionSet() { - List clusterPhyList = listAllClusters(); - Set versionSet = clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet()); - return versionSet; + public List getClusterVersionList() { + List clusterPhyList = this.listAllClusters(); + + List versionList = new ArrayList<>(clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet())); + Collections.sort(versionList); + + return versionList; } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java index 42311eef..1fb3f488 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java @@ -56,7 +56,7 @@ public class KafkaControllerServiceImpl implements KafkaControllerService { @Override public int insertAndIgnoreDuplicateException(KafkaController kafkaController) { try { - Broker broker = brokerService.getBroker(kafkaController.getClusterPhyId(), kafkaController.getBrokerId()); + Broker broker = brokerService.getBrokerFromCacheFirst(kafkaController.getClusterPhyId(), kafkaController.getBrokerId()); KafkaControllerPO kafkaControllerPO = new KafkaControllerPO(); kafkaControllerPO.setClusterPhyId(kafkaController.getClusterPhyId()); @@ -136,34 +136,56 @@ public class KafkaControllerServiceImpl implements KafkaControllerService { /**************************************************** private method ****************************************************/ private Result getControllerFromAdminClient(ClusterPhy clusterPhy) { + AdminClient adminClient = null; try { - AdminClient adminClient = null; - try { - adminClient = kafkaAdminClient.getClient(clusterPhy.getId()); - } catch (Exception e) { - log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); - - // 集群已经加载进来,但是创建admin-client失败,则设置无controller - return Result.buildSuc(); - } - - DescribeClusterResult describeClusterResult = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)); - - Node controllerNode = describeClusterResult.controller().get(); - if (controllerNode == null) { - return Result.buildSuc(); - } - - return Result.buildSuc(new KafkaController( - clusterPhy.getId(), - controllerNode.id(), - System.currentTimeMillis() - )); + adminClient = kafkaAdminClient.getClient(clusterPhy.getId()); } catch (Exception e) { log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); + // 集群已经加载进来,但是创建admin-client失败,则设置无controller + return Result.buildSuc(); } + + // 先从DB获取该集群controller + KafkaController dbKafkaController = null; + + for (int i = 1; i <= Constant.DEFAULT_RETRY_TIME; ++i) { + try { + if (i == 1) { + // 获取DB中的controller信息 + dbKafkaController = this.getKafkaControllerFromDB(clusterPhy.getId()); + } + + DescribeClusterResult describeClusterResult = adminClient.describeCluster( + new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + Node controllerNode = describeClusterResult.controller().get(); + if (controllerNode == null) { + return Result.buildSuc(); + } + + if (dbKafkaController != null && controllerNode.id() == dbKafkaController.getBrokerId()) { + // ID没有变化,直接返回原先的 + return Result.buildSuc(dbKafkaController); + } + + // 发生了变化 + return Result.buildSuc(new KafkaController( + clusterPhy.getId(), + controllerNode.id(), + System.currentTimeMillis() + )); + } catch (Exception e) { + log.error( + "class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||tryTime={}||errMsg=exception", + clusterPhy.getId(), i, e + ); + } + } + + // 三次出错,则直接返回无controller + return Result.buildSuc(); } private Result getControllerFromZKClient(ClusterPhy clusterPhy) { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkauser/impl/KafkaUserServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkauser/impl/KafkaUserServiceImpl.java index 7e5fa91f..e939f00d 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkauser/impl/KafkaUserServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkauser/impl/KafkaUserServiceImpl.java @@ -7,6 +7,7 @@ import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; import com.didiglobal.logi.security.util.PWEncryptUtil; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkauser.KafkaUser; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.kafkauser.KafkaUserParam; @@ -17,11 +18,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaUserPO; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.kafkauser.KafkaUserService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; @@ -32,7 +35,6 @@ import kafka.admin.ConfigCommand; import kafka.server.ConfigType; import kafka.zk.*; import org.apache.kafka.clients.admin.*; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.security.scram.ScramCredential; import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils; import org.apache.kafka.common.security.scram.internals.ScramFormatter; @@ -71,6 +73,9 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K @Autowired private OpLogWrapService opLogWrapService; + @Autowired + private ClusterPhyService clusterPhyService; + @Override protected VersionItemTypeEnum getVersionItemType() { return VersionItemTypeEnum.SERVICE_OP_KAFKA_USER; @@ -571,6 +576,18 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K private Result> getKafkaUserByKafkaClient(VersionItemParam itemParam) { KafkaUserParam param = (KafkaUserParam) itemParam; try { + // 获取集群 + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId()); + if (clusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId())); + } + + // 判断认证模式,如果是非scram模式,直接返回 + if (!ClusterAuthTypeEnum.isScram(clusterPhy.getAuthType())) { + log.warn("method=getKafkaUserByKafkaClient||clusterPhyId={}||msg=not scram auth type and ignore get users", clusterPhy.getId()); + return Result.buildSuc(new ArrayList<>()); + } + AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); // 查询集群kafka-user diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java new file mode 100644 index 00000000..6a3611f8 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java @@ -0,0 +1,14 @@ +package com.xiaojukeji.know.streaming.km.core.service.partition; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +public interface OpPartitionService { + + /** + * 优先副本选举 + */ + Result preferredReplicaElection(Long clusterPhyId, List tpList); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java new file mode 100644 index 00000000..0f1186ef --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java @@ -0,0 +1,119 @@ +package com.xiaojukeji.know.streaming.km.core.service.partition.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.BatchPartitionParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; +import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ElectLeadersOptions; +import org.apache.kafka.clients.admin.ElectLeadersResult; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import scala.jdk.javaapi.CollectionConverters; + +import javax.annotation.PostConstruct; +import java.util.HashSet; +import java.util.List; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_PARTITION_LEADER; + + +/** + * @author didi + */ +@Service +public class OpPartitionServiceImpl extends BaseVersionControlService implements OpPartitionService { + private static final ILog LOGGER = LogFactory.getLog(OpPartitionServiceImpl.class); + + @Autowired + private KafkaAdminClient kafkaAdminClient; + + @Autowired + private KafkaAdminZKClient kafkaAdminZKClient; + + public static final String PREFERRED_REPLICA_ELECTION = "PreferredReplicaElection"; + + @Override + protected VersionItemTypeEnum getVersionItemType() { + return SERVICE_OP_PARTITION_LEADER; + } + + @PostConstruct + private void init() { + registerVCHandler(PREFERRED_REPLICA_ELECTION, V_0_10_0_0, V_2_8_0, "preferredReplicaElectionByZKClient", this::preferredReplicaElectionByZKClient); + registerVCHandler(PREFERRED_REPLICA_ELECTION, V_2_8_0, V_MAX, "preferredReplicaElectionByKafkaClient", this::preferredReplicaElectionByKafkaClient); + } + + @Override + public Result preferredReplicaElection(Long clusterPhyId, List tpList) { + try { + return (Result) doVCHandler( + clusterPhyId, + PREFERRED_REPLICA_ELECTION, + new BatchPartitionParam(clusterPhyId, tpList) + ); + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + /**************************************************** private method ****************************************************/ + + private Result preferredReplicaElectionByZKClient(VersionItemParam itemParam) { + BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam; + + try { + KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(partitionParam.getClusterPhyId()); + + kafkaZkClient.createPreferredReplicaElection(CollectionConverters.asScala(partitionParam.getTpList()).toSet()); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "class=OpPartitionServiceImpl||method=preferredReplicaElectionByZKClient||clusterPhyId={}||errMsg=exception", + partitionParam.getClusterPhyId(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } + + private Result preferredReplicaElectionByKafkaClient(VersionItemParam itemParam) { + BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam; + + try { + AdminClient adminClient = kafkaAdminClient.getClient(partitionParam.getClusterPhyId()); + + ElectLeadersResult electLeadersResult = adminClient.electLeaders( + ElectionType.PREFERRED, + new HashSet<>(partitionParam.getTpList()), + new ElectLeadersOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + electLeadersResult.all().get(); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "class=OpPartitionServiceImpl||method=preferredReplicaElectionByKafkaClient||clusterPhyId={}||errMsg=exception", + partitionParam.getClusterPhyId(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java index 9e354634..9104b398 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionMetricServiceImpl.java @@ -75,7 +75,9 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par @Override public Result> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName) { - List metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(clusterPhyId, topicName, metricName); + String partitionMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName); + + List metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(partitionMetricsKey); if(null != metricsList) { return Result.buildSuc(metricsList); } @@ -88,12 +90,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par // 更新cache PartitionMetrics metrics = metricsResult.getData().get(0); metrics.getMetrics().entrySet().forEach( - metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList( - clusterPhyId, - metrics.getTopic(), - metricEntry.getKey(), - metricsResult.getData() - ) + metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(partitionMetricsKey, metricsResult.getData()) ); return metricsResult; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java index 5240e8b9..460e6520 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/replica/impl/ReplicaMetricServiceImpl.java @@ -77,9 +77,14 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli } @Override - public Result collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic, - Integer brokerId, Integer partitionId, String metric){ - Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(clusterPhyId, brokerId, topic, partitionId, metric); + public Result collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, + String topic, + Integer brokerId, + Integer partitionId, + String metric) { + String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric); + + Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey); if(null != keyValue){ ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId); replicationMetrics.putMetric(metric, keyValue); @@ -92,11 +97,7 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli // 更新cache ret.getData().getMetrics().entrySet().stream().forEach( metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics( - clusterPhyId, - brokerId, - topic, - partitionId, - metricNameAndValueEntry.getKey(), + replicaMetricsKey, metricNameAndValueEntry.getValue() ) ); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java index d7cca017..478c142b 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java @@ -120,7 +120,9 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe @Override public Result> collectTopicMetricsFromKafkaWithCacheFirst(Long clusterPhyId, String topicName, String metricName) { - List metricsList = CollectedMetricsLocalCache.getTopicMetrics(clusterPhyId, topicName, metricName); + String topicMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName); + + List metricsList = CollectedMetricsLocalCache.getTopicMetrics(topicMetricsKey); if(null != metricsList) { return Result.buildSuc(metricsList); } @@ -133,12 +135,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe // 更新cache TopicMetrics metrics = metricsResult.getData().get(0); metrics.getMetrics().entrySet().forEach( - metricEntry -> CollectedMetricsLocalCache.putTopicMetrics( - clusterPhyId, - metrics.getTopic(), - metricEntry.getKey(), - metricsResult.getData() - ) + metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(topicMetricsKey, metricsResult.getData()) ); return metricsResult; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java index d4c58d69..d3357ab4 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ClusterMetricVersionItems.java @@ -35,6 +35,7 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric { public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster"; public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster"; public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster"; + public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize"; public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize"; public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize"; diff --git a/km-dist/init/sql/ddl-ks-km.sql b/km-dist/init/sql/ddl-ks-km.sql index 90f588a6..96de4ed1 100644 --- a/km-dist/init/sql/ddl-ks-km.sql +++ b/km-dist/init/sql/ddl-ks-km.sql @@ -51,7 +51,6 @@ CREATE TABLE `ks_km_cluster_balance_job` ( `total_reassign_size` double NOT NULL DEFAULT '0' COMMENT '总迁移大小', `total_reassign_replica_num` int(16) NOT NULL DEFAULT '0' COMMENT '总迁移副本数', `move_in_topic_list` varchar(4096) NOT NULL DEFAULT '' COMMENT '移入topic', - `move_broker_list` varchar(1024) NOT NULL DEFAULT '' COMMENT '移除节点', `broker_balance_detail` text COMMENT '节点均衡详情', `status` int(16) NOT NULL DEFAULT '0' COMMENT '任务状态 1:进行中,2:准备,3,成功,4:失败,5:取消', `creator` varchar(64) NOT NULL DEFAULT '' COMMENT '操作人', diff --git a/km-dist/init/sql/ddl-logi-security.sql b/km-dist/init/sql/ddl-logi-security.sql index 3ac20657..69fcdc66 100644 --- a/km-dist/init/sql/ddl-logi-security.sql +++ b/km-dist/init/sql/ddl-logi-security.sql @@ -39,7 +39,7 @@ CREATE TABLE `logi_security_oplog` operate_type varchar(16) not null comment '操作类型', target_type varchar(16) not null comment '对象分类', target varchar(20) not null comment '操作对象', - operation_methods varchar(20) not null comment '操作方式', + operation_methods varchar(20) not null default '' comment '操作方式', detail text null comment '日志详情', create_time timestamp default CURRENT_TIMESTAMP null, update_time timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP comment '更新时间', diff --git a/km-dist/init/sql/dml-ks-km.sql b/km-dist/init/sql/dml-ks-km.sql index f986533d..2d354a87 100644 --- a/km-dist/init/sql/dml-ks-km.sql +++ b/km-dist/init/sql/dml-ks-km.sql @@ -1,7 +1,7 @@ -- 检查检查配置 -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_CLUSTER_NO_CONTROLLER','{ \"value\": 1, \"weight\": 30 } ','集群Controller数错误','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_REQUEST_QUEUE_FULL','{ \"value\": 10, \"weight\": 20 } ','Broker请求队列被打满','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW','{ \"value\": 0.8, \"weight\": 20 } ','Broker网络处理线程Idle过低','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_GROUP_RE_BALANCE_TOO_FREQUENTLY','{\n \"latestMinutes\": 10,\n \"detectedTimes\": 8,\n \"weight\": 10\n}\n','Group的re-balance太频繁','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_NO_LEADER','{ \"value\": 1, \"weight\": 10 } ','Topic无Leader数','know-stream'); -INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_UNDER_REPLICA_TOO_LONG','{ \"latestMinutes\": 10, \"detectedTimes\": 8, \"weight\": 10 } ','Topic长期处于未同步状态','know-stream'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_CLUSTER_NO_CONTROLLER','{ \"value\": 1, \"weight\": 30 } ','集群Controller数正常','know-streaming'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_REQUEST_QUEUE_FULL','{ \"value\": 10, \"weight\": 20 } ','Broker-RequestQueueSize指标','know-streaming'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_BROKER_NETWORK_PROCESSOR_AVG_IDLE_TOO_LOW','{ \"value\": 0.8, \"weight\": 20 } ','Broker-NetworkProcessorAvgIdlePercent指标','know-streaming'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_GROUP_RE_BALANCE_TOO_FREQUENTLY','{\n \"latestMinutes\": 10,\n \"detectedTimes\": 8,\n \"weight\": 10\n}\n','Group的re-balance频率','know-streaming'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_NO_LEADER','{ \"value\": 1, \"weight\": 10 } ','Topic 无Leader数','know-stream'); +INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`,`value_group`,`value_name`,`value`,`description`,`operator`) VALUES (-1,'HEALTH','HC_TOPIC_UNDER_REPLICA_TOO_LONG','{ \"latestMinutes\": 10, \"detectedTimes\": 8, \"weight\": 10 } ','Topic 未同步持续时间','know-streaming'); diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index d0ca75e9..c611c538 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -16,7 +16,6 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.google.common.collect.Lists; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; -import com.xiaojukeji.know.streaming.km.common.constant.ESConstant; import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -37,7 +36,6 @@ import java.util.function.Function; @Component public class ESOpClient { - private static final ILog LOGGER = LogFactory.getLog("ES_LOGGER"); /** @@ -45,6 +43,7 @@ public class ESOpClient { */ @Value("${es.client.address}") private String esAddress; + /** * es 访问密码 */ @@ -54,22 +53,32 @@ public class ESOpClient { /** * 客户端个数 */ - private static final int ES_CLIENT_COUNT = 30; + @Value("${es.client.client-cnt:10}") + private Integer clientCnt; - private static final int MAX_RETRY_CNT = 5; - - private static final int ES_IO_THREAD_COUNT = 4; + /** + * 最大重试次数 + */ + @Value("${es.client.max-retry-cnt:5}") + private Integer maxRetryCnt; + /** + * IO线程数 + */ + @Value("${es.client.io-thread-cnt:2}") + private Integer ioThreadCnt; /** * 更新es数据的客户端连接队列 */ - private LinkedBlockingQueue esClientPool = new LinkedBlockingQueue<>( ES_CLIENT_COUNT ); + private LinkedBlockingQueue esClientPool; @PostConstruct public void init(){ - for (int i = 0; i < ES_CLIENT_COUNT; ++i) { - ESClient esClient = buildEsClient(esAddress, esPass, "", ""); + esClientPool = new LinkedBlockingQueue<>( clientCnt ); + + for (int i = 0; i < clientCnt; ++i) { + ESClient esClient = this.buildEsClient(esAddress, esPass, "", ""); if (esClient != null) { this.esClientPool.add(esClient); LOGGER.info("class=ESOpClient||method=init||msg=add new es client {}", esAddress); @@ -245,7 +254,7 @@ public class ESOpClient { esIndexRequest.source(source); esIndexRequest.id(id); - for (int i = 0; i < MAX_RETRY_CNT; ++i) { + for (int i = 0; i < this.maxRetryCnt; ++i) { response = esClient.index(esIndexRequest).actionGet(10, TimeUnit.SECONDS); if (response == null) { continue; @@ -307,7 +316,7 @@ public class ESOpClient { batchRequest.addNode(BatchType.INDEX, indexName, null, po.getKey(), JSON.toJSONString(po)); } - for (int i = 0; i < MAX_RETRY_CNT; ++i) { + for (int i = 0; i < this.maxRetryCnt; ++i) { response = esClient.batch(batchRequest).actionGet(2, TimeUnit.MINUTES); if (response == null) {continue;} @@ -428,8 +437,8 @@ public class ESOpClient { if(StringUtils.isNotBlank(password)){ esClient.setPassword(password); } - if(ES_IO_THREAD_COUNT > 0) { - esClient.setIoThreadCount( ES_IO_THREAD_COUNT ); + if(this.ioThreadCnt > 0) { + esClient.setIoThreadCount( this.ioThreadCnt ); } // 配置http超时 @@ -439,11 +448,13 @@ public class ESOpClient { return esClient; } catch (Exception e) { - esClient.close(); - - LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, - e); + try { + esClient.close(); + } catch (Exception innerE) { + // ignore + } + LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, e); return null; } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java index 1af1e357..b80c1ca0 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java @@ -41,7 +41,11 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { DslsConstant.GET_BROKER_LATEST_METRICS, clusterId, brokerId, startTime, endTime); BrokerMetricPO brokerMetricPO = esOpClient.performRequestAndTakeFirst( - brokerId.toString(), realIndex(startTime, endTime), dsl, BrokerMetricPO.class); + brokerId.toString(), + realIndex(startTime, endTime), + dsl, + BrokerMetricPO.class + ); return (null == brokerMetricPO) ? new BrokerMetricPO(clusterId, brokerId) : brokerMetricPO; } @@ -49,8 +53,12 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { /** * 获取集群 clusterPhyId 中每个 metric 的指定 broker 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值 */ - public Map getBrokerMetricsPoint(Long clusterPhyId, Integer brokerId, List metrics, - String aggType, Long startTime, Long endTime){ + public Map getBrokerMetricsPoint(Long clusterPhyId, + Integer brokerId, + List metrics, + String aggType, + Long startTime, + Long endTime) { //1、获取需要查下的索引 String realIndex = realIndex(startTime, endTime); @@ -60,8 +68,13 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { String dsl = dslLoaderUtil.getFormatDslByFileName( DslsConstant.GET_BROKER_AGG_SINGLE_METRICS, clusterPhyId, brokerId, startTime, endTime, aggDsl); - return esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl, - s -> handleSingleESQueryResponse(s, metrics, aggType), 3); + return esOpClient.performRequestWithRouting( + String.valueOf(brokerId), + realIndex, + dsl, + s -> handleSingleESQueryResponse(s, metrics, aggType), + 3 + ); } /** @@ -75,10 +88,19 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { Map> metricBrokerIds = getTopNBrokerIds(clusterPhyId, metrics, aggType, topN, startTime, endTime); Table> table = HashBasedTable.create(); + //2、查询指标 for(String metric : metricBrokerIds.keySet()){ - table.putAll(listBrokerMetricsByBrokerIds(clusterPhyId, Arrays.asList(metric), - aggType, metricBrokerIds.getOrDefault(metric, brokerIds), startTime, endTime)); + table.putAll( + this.listBrokerMetricsByBrokerIds( + clusterPhyId, + Arrays.asList(metric), + aggType, + metricBrokerIds.getOrDefault(metric, brokerIds), + startTime, + endTime + ) + ); } return table; @@ -87,9 +109,12 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { /** * 获取集群 clusterPhyId 中每个 metric 的指定 brokers 在指定时间[startTime、endTime]区间内所有的指标 */ - public Table> listBrokerMetricsByBrokerIds(Long clusterPhyId, List metrics, - String aggType, List brokerIds, - Long startTime, Long endTime){ + public Table> listBrokerMetricsByBrokerIds(Long clusterPhyId, + List metrics, + String aggType, + List brokerIds, + Long startTime, + Long endTime){ //1、获取需要查下的索引 String realIndex = realIndex(startTime, endTime); @@ -105,22 +130,34 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { for(Long brokerId : brokerIds){ try { String dsl = dslLoaderUtil.getFormatDslByFileName( - DslsConstant.GET_BROKER_AGG_LIST_METRICS, clusterPhyId, brokerId, startTime, endTime, interval, aggDsl); + DslsConstant.GET_BROKER_AGG_LIST_METRICS, + clusterPhyId, + brokerId, + startTime, + endTime, + interval, + aggDsl + ); queryFuture.runnableTask( String.format("class=BrokerMetricESDAO||method=listBrokerMetricsByBrokerIds||ClusterPhyId=%d", clusterPhyId), 5000, () -> { - Map> metricMap = esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl, - s -> handleListESQueryResponse(s, metrics, aggType), 3); + Map> metricMap = esOpClient.performRequestWithRouting( + String.valueOf(brokerId), + realIndex, + dsl, + s -> handleListESQueryResponse(s, metrics, aggType), + 3 + ); - synchronized (table){ + synchronized (table) { for(String metric : metricMap.keySet()){ table.put(metric, brokerId, metricMap.get(metric)); } } }); - }catch (Exception e){ + } catch (Exception e){ LOGGER.error("method=listBrokerMetricsByBrokerIds||clusterPhyId={}||brokerId{}||errMsg=exception!", clusterPhyId, brokerId, e); } } diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java index d443bcac..34b907a8 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/MultiClusterPhyController.java @@ -16,7 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; -import java.util.Set; +import java.util.List; /** @@ -49,7 +49,7 @@ public class MultiClusterPhyController { @ApiOperation(value = "多物理集群-已存在kafka版本", notes = "") @GetMapping(value = "physical-clusters/exist-version") - public Result> getClusterPhysVersion() { - return Result.buildSuc(clusterPhyService.getClusterVersionSet()); + public Result> getClusterPhysVersion() { + return Result.buildSuc(clusterPhyService.getClusterVersionList()); } } diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index a6417157..4b0831c7 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -73,8 +73,13 @@ client-pool: borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒 -# es客户端服务地址 -es.client.address: 127.0.0.1:8060 +# ES客户端配置 +es: + client: + address: 127.0.0.1:8091,127.0.0.1:8061,127.0.0.1:8061 + client-cnt: 10 + io-thread-cnt: 2 + max-retry-cnt: 5 # 普罗米修斯指标导出相关配置 management: diff --git a/pom.xml b/pom.xml index d1dd7544..5d0052d8 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ - 3.0.0-beta + 3.0.0-beta.1 8 8