mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
Merge pull request #423 from didi/dev
1. bump swagger version; 2. fix NPE when flush logical cluster and physical cluster not in cache or not exist; 3. JmxConnectorWrap's log add cluster and broker info;
This commit is contained in:
@@ -14,7 +14,6 @@ import javax.naming.Context;
|
|||||||
import javax.rmi.ssl.SslRMIClientSocketFactory;
|
import javax.rmi.ssl.SslRMIClientSocketFactory;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@@ -26,19 +25,25 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||||||
* @date 2015/11/9.
|
* @date 2015/11/9.
|
||||||
*/
|
*/
|
||||||
public class JmxConnectorWrap {
|
public class JmxConnectorWrap {
|
||||||
private final static Logger LOGGER = LoggerFactory.getLogger(JmxConnectorWrap.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(JmxConnectorWrap.class);
|
||||||
|
|
||||||
private String host;
|
private final Long physicalClusterId;
|
||||||
|
|
||||||
private int port;
|
private final Integer brokerId;
|
||||||
|
|
||||||
|
private final String host;
|
||||||
|
|
||||||
|
private final int port;
|
||||||
|
|
||||||
private JMXConnector jmxConnector;
|
private JMXConnector jmxConnector;
|
||||||
|
|
||||||
private AtomicInteger atomicInteger;
|
private final AtomicInteger atomicInteger;
|
||||||
|
|
||||||
private JmxConfig jmxConfig;
|
private JmxConfig jmxConfig;
|
||||||
|
|
||||||
public JmxConnectorWrap(String host, int port, JmxConfig jmxConfig) {
|
public JmxConnectorWrap(Long physicalClusterId, Integer brokerId, String host, int port, JmxConfig jmxConfig) {
|
||||||
|
this.physicalClusterId = physicalClusterId;
|
||||||
|
this.brokerId = brokerId;
|
||||||
this.host = host;
|
this.host = host;
|
||||||
this.port = port;
|
this.port = port;
|
||||||
this.jmxConfig = jmxConfig;
|
this.jmxConfig = jmxConfig;
|
||||||
@@ -68,7 +73,7 @@ public class JmxConnectorWrap {
|
|||||||
try {
|
try {
|
||||||
jmxConnector.close();
|
jmxConnector.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOGGER.warn("close JmxConnector exception, host:{} port:{}.", host, port, e);
|
LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,12 +96,12 @@ public class JmxConnectorWrap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment);
|
jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment);
|
||||||
LOGGER.info("JMX connect success, host:{} port:{}.", host, port);
|
LOGGER.info("JMX connect success, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port);
|
||||||
return true;
|
return true;
|
||||||
} catch (MalformedURLException e) {
|
} catch (MalformedURLException e) {
|
||||||
LOGGER.error("JMX url exception, host:{} port:{} jmxUrl:{}", host, port, jmxUrl, e);
|
LOGGER.error("JMX url exception, physicalClusterId:{} brokerId:{} host:{} port:{} jmxUrl:{}", physicalClusterId, brokerId, host, port, jmxUrl, e);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("JMX connect exception, host:{} port:{}.", host, port, e);
|
LOGGER.error("JMX connect exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -208,7 +208,8 @@ public class LogicalClusterMetadataManager {
|
|||||||
// 计算逻辑集群到Topic名称的映射
|
// 计算逻辑集群到Topic名称的映射
|
||||||
Set<String> topicNameSet = PhysicalClusterMetadataManager.getBrokerTopicNum(
|
Set<String> topicNameSet = PhysicalClusterMetadataManager.getBrokerTopicNum(
|
||||||
logicalClusterDO.getClusterId(),
|
logicalClusterDO.getClusterId(),
|
||||||
brokerIdSet);
|
brokerIdSet
|
||||||
|
);
|
||||||
LOGICAL_CLUSTER_ID_TOPIC_NAME_MAP.put(logicalClusterDO.getId(), topicNameSet);
|
LOGICAL_CLUSTER_ID_TOPIC_NAME_MAP.put(logicalClusterDO.getId(), topicNameSet);
|
||||||
|
|
||||||
// 计算Topic名称到逻辑集群的映射
|
// 计算Topic名称到逻辑集群的映射
|
||||||
|
|||||||
@@ -314,7 +314,7 @@ public class PhysicalClusterMetadataManager {
|
|||||||
metadataMap.put(brokerId, brokerMetadata);
|
metadataMap.put(brokerId, brokerMetadata);
|
||||||
|
|
||||||
Map<Integer, JmxConnectorWrap> jmxMap = JMX_CONNECTOR_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
|
Map<Integer, JmxConnectorWrap> jmxMap = JMX_CONNECTOR_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
|
||||||
jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig));
|
jmxMap.put(brokerId, new JmxConnectorWrap(clusterId, brokerId, brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig));
|
||||||
JMX_CONNECTOR_MAP.put(clusterId, jmxMap);
|
JMX_CONNECTOR_MAP.put(clusterId, jmxMap);
|
||||||
|
|
||||||
Map<Integer, KafkaVersion> versionMap = KAFKA_VERSION_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
|
Map<Integer, KafkaVersion> versionMap = KAFKA_VERSION_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
|
||||||
@@ -539,9 +539,12 @@ public class PhysicalClusterMetadataManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static Set<String> getBrokerTopicNum(Long clusterId, Set<Integer> brokerIdSet) {
|
public static Set<String> getBrokerTopicNum(Long clusterId, Set<Integer> brokerIdSet) {
|
||||||
Set<String> topicNameSet = new HashSet<>();
|
|
||||||
|
|
||||||
Map<String, TopicMetadata> metadataMap = TOPIC_METADATA_MAP.get(clusterId);
|
Map<String, TopicMetadata> metadataMap = TOPIC_METADATA_MAP.get(clusterId);
|
||||||
|
if (metadataMap == null) {
|
||||||
|
return new HashSet<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<String> topicNameSet = new HashSet<>();
|
||||||
for (String topicName: metadataMap.keySet()) {
|
for (String topicName: metadataMap.keySet()) {
|
||||||
try {
|
try {
|
||||||
TopicMetadata tm = metadataMap.get(topicName);
|
TopicMetadata tm = metadataMap.get(topicName);
|
||||||
|
|||||||
@@ -39,10 +39,9 @@ public class SwaggerConfig implements WebMvcConfigurer {
|
|||||||
|
|
||||||
private ApiInfo apiInfo() {
|
private ApiInfo apiInfo() {
|
||||||
return new ApiInfoBuilder()
|
return new ApiInfoBuilder()
|
||||||
.title("Logi-KafkaManager 接口文档")
|
.title("LogiKM接口文档")
|
||||||
.description("欢迎使用滴滴Logi-KafkaManager")
|
.description("欢迎使用滴滴LogiKM")
|
||||||
.contact("huangyiminghappy@163.com")
|
.version("2.5.0")
|
||||||
.version("2.2.0")
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
4
pom.xml
4
pom.xml
@@ -17,8 +17,8 @@
|
|||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<kafka-manager.revision>2.5</kafka-manager.revision>
|
<kafka-manager.revision>2.5</kafka-manager.revision>
|
||||||
<swagger2.version>2.7.0</swagger2.version>
|
<swagger2.version>2.9.2</swagger2.version>
|
||||||
<swagger.version>1.5.13</swagger.version>
|
<swagger.version>1.5.21</swagger.version>
|
||||||
|
|
||||||
<maven.test.skip>true</maven.test.skip>
|
<maven.test.skip>true</maven.test.skip>
|
||||||
<downloadSources>true</downloadSources>
|
<downloadSources>true</downloadSources>
|
||||||
|
|||||||
Reference in New Issue
Block a user