add v2.2.0 feature & fix

This commit is contained in:
zengqiao
2021-01-23 13:19:29 +08:00
parent fc109fd1b1
commit 850d43df63
34 changed files with 429 additions and 156 deletions

View File

@@ -69,6 +69,19 @@ public class LogicalClusterMetadataManager {
return LOGICAL_CLUSTER_ID_BROKER_ID_MAP.getOrDefault(logicClusterId, new HashSet<>());
}
public Long getTopicLogicalClusterId(Long physicalClusterId, String topicName) {
if (!LOADED.get()) {
flush();
}
Map<String, Long> logicalClusterIdMap = TOPIC_LOGICAL_MAP.get(physicalClusterId);
if (ValidateUtils.isNull(logicalClusterIdMap)) {
return null;
}
return logicalClusterIdMap.get(topicName);
}
public LogicalClusterDO getTopicLogicalCluster(Long physicalClusterId, String topicName) {
if (!LOADED.get()) {
flush();

View File

@@ -4,9 +4,11 @@ import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
@@ -118,8 +120,15 @@ public class PhysicalClusterMetadataManager {
return;
}
JmxConfig jmxConfig = null;
try {
jmxConfig = JsonUtils.stringToObj(clusterDO.getJmxProperties(), JmxConfig.class);
} catch (Exception e) {
LOGGER.error("class=PhysicalClusterMetadataManager||method=addNew||clusterDO={}||msg=parse jmx properties failed", JsonUtils.toJSONString(clusterDO));
}
//增加Broker监控
BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig, configUtils.getJmxMaxConn());
BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig, jmxConfig);
brokerListener.init();
zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener);
@@ -280,7 +289,7 @@ public class PhysicalClusterMetadataManager {
//---------------------------Broker元信息相关--------------
public static void putBrokerMetadata(Long clusterId, Integer brokerId, BrokerMetadata brokerMetadata, Integer jmxMaxConn) {
public static void putBrokerMetadata(Long clusterId, Integer brokerId, BrokerMetadata brokerMetadata, JmxConfig jmxConfig) {
Map<Integer, BrokerMetadata> metadataMap = BROKER_METADATA_MAP.get(clusterId);
if (metadataMap == null) {
return;
@@ -288,7 +297,7 @@ public class PhysicalClusterMetadataManager {
metadataMap.put(brokerId, brokerMetadata);
Map<Integer, JmxConnectorWrap> jmxMap = JMX_CONNECTOR_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxMaxConn));
jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig));
JMX_CONNECTOR_MAP.put(clusterId, jmxMap);
Map<Integer, KafkaVersion> versionMap = KAFKA_VERSION_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());

View File

@@ -203,6 +203,7 @@ public class ClusterServiceImpl implements ClusterService {
zk.close();
}
} catch (Throwable t) {
return false;
}
}
return true;

View File

@@ -113,6 +113,7 @@ public class LogicalClusterServiceImpl implements LogicalClusterService {
LogicalCluster logicalCluster = new LogicalCluster();
logicalCluster.setLogicalClusterId(logicalClusterDO.getId());
logicalCluster.setLogicalClusterName(logicalClusterDO.getName());
logicalCluster.setLogicalClusterIdentification(logicalClusterDO.getIdentification());
logicalCluster.setClusterVersion(
physicalClusterMetadataManager.getKafkaVersion(
logicalClusterDO.getClusterId(),

View File

@@ -13,9 +13,6 @@ public class ConfigUtils {
@Value(value = "${custom.idc}")
private String idc;
@Value("${custom.jmx.max-conn}")
private Integer jmxMaxConn;
@Value(value = "${spring.profiles.active}")
private String kafkaManagerEnv;
@@ -30,14 +27,6 @@ public class ConfigUtils {
this.idc = idc;
}
public Integer getJmxMaxConn() {
return jmxMaxConn;
}
public void setJmxMaxConn(Integer jmxMaxConn) {
this.jmxMaxConn = jmxMaxConn;
}
public String getKafkaManagerEnv() {
return kafkaManagerEnv;
}

View File

@@ -1,5 +1,6 @@
package com.xiaojukeji.kafka.manager.service.zookeeper;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
import com.xiaojukeji.kafka.manager.common.zookeeper.StateChangeListener;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
@@ -22,12 +23,12 @@ public class BrokerStateListener implements StateChangeListener {
private ZkConfigImpl zkConfig;
private Integer jmxMaxConn;
private JmxConfig jmxConfig;
public BrokerStateListener(Long clusterId, ZkConfigImpl zkConfig, Integer jmxMaxConn) {
public BrokerStateListener(Long clusterId, ZkConfigImpl zkConfig, JmxConfig jmxConfig) {
this.clusterId = clusterId;
this.zkConfig = zkConfig;
this.jmxMaxConn = jmxMaxConn;
this.jmxConfig = jmxConfig;
}
@Override
@@ -84,7 +85,7 @@ public class BrokerStateListener implements StateChangeListener {
}
brokerMetadata.setClusterId(clusterId);
brokerMetadata.setBrokerId(brokerId);
PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxMaxConn);
PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxConfig);
} catch (Exception e) {
LOGGER.error("add broker failed, clusterId:{} brokerMetadata:{}.", clusterId, brokerMetadata, e);
}