From d30cb8a0f086510b143bfccf2cb8a3e2b95c0646 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 11 Jan 2022 11:24:43 +0800 Subject: [PATCH 1/3] bump swagger version --- .../xiaojukeji/kafka/manager/web/config/SwaggerConfig.java | 7 +++---- pom.xml | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java index 91d0080c..f4ae13a4 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java @@ -39,10 +39,9 @@ public class SwaggerConfig implements WebMvcConfigurer { private ApiInfo apiInfo() { return new ApiInfoBuilder() - .title("Logi-KafkaManager 接口文档") - .description("欢迎使用滴滴Logi-KafkaManager") - .contact("huangyiminghappy@163.com") - .version("2.2.0") + .title("LogiKM接口文档") + .description("欢迎使用滴滴LogiKM") + .version("2.5.0") .build(); } diff --git a/pom.xml b/pom.xml index 26a31cb7..55d4ab46 100644 --- a/pom.xml +++ b/pom.xml @@ -17,8 +17,8 @@ 2.5 - 2.7.0 - 1.5.13 + 2.9.2 + 1.5.21 true true From 44ea896de87cd426ad9b77eb4da0bb66ddfd67c7 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 11 Jan 2022 11:37:23 +0800 Subject: [PATCH 2/3] fix NPE when flush logical cluster and physical cluster not in cache or not exist --- .../service/cache/LogicalClusterMetadataManager.java | 3 ++- .../service/cache/PhysicalClusterMetadataManager.java | 7 +++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java index 744101ef..d58efc9a 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java @@ -208,7 +208,8 @@ public class LogicalClusterMetadataManager { // 计算逻辑集群到Topic名称的映射 Set topicNameSet = PhysicalClusterMetadataManager.getBrokerTopicNum( logicalClusterDO.getClusterId(), - brokerIdSet); + brokerIdSet + ); LOGICAL_CLUSTER_ID_TOPIC_NAME_MAP.put(logicalClusterDO.getId(), topicNameSet); // 计算Topic名称到逻辑集群的映射 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index c5f09820..402cb0e4 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -539,9 +539,12 @@ public class PhysicalClusterMetadataManager { } public static Set getBrokerTopicNum(Long clusterId, Set brokerIdSet) { - Set topicNameSet = new HashSet<>(); - Map metadataMap = TOPIC_METADATA_MAP.get(clusterId); + if (metadataMap == null) { + return new HashSet<>(); + } + + Set topicNameSet = new HashSet<>(); for (String topicName: metadataMap.keySet()) { try { TopicMetadata tm = metadataMap.get(topicName); From 80785ce0725b33eed908cd8d46a33b8ecdba088c Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 11 Jan 2022 11:45:28 +0800 Subject: [PATCH 3/3] JmxConnectorWrap's log add cluster and broker info --- .../common/utils/jmx/JmxConnectorWrap.java | 25 +++++++++++-------- .../cache/PhysicalClusterMetadataManager.java | 2 +- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java index 56b3a1d5..db1e2341 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java @@ -14,7 +14,6 @@ import javax.naming.Context; import javax.rmi.ssl.SslRMIClientSocketFactory; import java.io.IOException; import java.net.MalformedURLException; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -26,19 +25,25 @@ import java.util.concurrent.atomic.AtomicInteger; * @date 2015/11/9. */ public class JmxConnectorWrap { - private final static Logger LOGGER = LoggerFactory.getLogger(JmxConnectorWrap.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JmxConnectorWrap.class); - private String host; + private final Long physicalClusterId; - private int port; + private final Integer brokerId; + + private final String host; + + private final int port; private JMXConnector jmxConnector; - private AtomicInteger atomicInteger; + private final AtomicInteger atomicInteger; private JmxConfig jmxConfig; - public JmxConnectorWrap(String host, int port, JmxConfig jmxConfig) { + public JmxConnectorWrap(Long physicalClusterId, Integer brokerId, String host, int port, JmxConfig jmxConfig) { + this.physicalClusterId = physicalClusterId; + this.brokerId = brokerId; this.host = host; this.port = port; this.jmxConfig = jmxConfig; @@ -68,7 +73,7 @@ public class JmxConnectorWrap { try { jmxConnector.close(); } catch (IOException e) { - LOGGER.warn("close JmxConnector exception, host:{} port:{}.", host, port, e); + LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); } } @@ -91,12 +96,12 @@ public class JmxConnectorWrap { } jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment); - LOGGER.info("JMX connect success, host:{} port:{}.", host, port); + LOGGER.info("JMX connect success, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port); return true; } catch (MalformedURLException e) { - LOGGER.error("JMX url exception, host:{} port:{} jmxUrl:{}", host, port, jmxUrl, e); + LOGGER.error("JMX url exception, physicalClusterId:{} brokerId:{} host:{} port:{} jmxUrl:{}", physicalClusterId, brokerId, host, port, jmxUrl, e); } catch (Exception e) { - LOGGER.error("JMX connect exception, host:{} port:{}.", host, port, e); + LOGGER.error("JMX connect exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); } return false; } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index 402cb0e4..a04a4d87 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -314,7 +314,7 @@ public class PhysicalClusterMetadataManager { metadataMap.put(brokerId, brokerMetadata); Map jmxMap = JMX_CONNECTOR_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>()); - jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig)); + jmxMap.put(brokerId, new JmxConnectorWrap(clusterId, brokerId, brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig)); JMX_CONNECTOR_MAP.put(clusterId, jmxMap); Map versionMap = KAFKA_VERSION_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());