diff --git a/kafka-manager-common/pom.xml b/kafka-manager-common/pom.xml
index 6a8ff0cb..c914ffeb 100644
--- a/kafka-manager-common/pom.xml
+++ b/kafka-manager-common/pom.xml
@@ -109,5 +109,10 @@
junit
junit
+
+ org.projectlombok
+ lombok
+ compile
+
\ No newline at end of file
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java
index 4d69f914..463e9b1a 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java
@@ -17,6 +17,10 @@ public class KafkaConstant {
public static final String RETENTION_MS_KEY = "retention.ms";
+ public static final String EXTERNAL_KEY = "EXTERNAL";
+
+ public static final String INTERNAL_KEY = "INTERNAL";
+
private KafkaConstant() {
}
}
\ No newline at end of file
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java
new file mode 100644
index 00000000..a16b32b4
--- /dev/null
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java
@@ -0,0 +1,18 @@
+package com.xiaojukeji.kafka.manager.common.entity.ao.common;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class IpPortData implements Serializable {
+ private static final long serialVersionUID = -428897032994630685L;
+
+ private String ip;
+
+ private String port;
+}
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java
index 3c179b4f..e4e5063d 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java
@@ -1,6 +1,17 @@
package com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
+import com.xiaojukeji.kafka.manager.common.entity.ao.common.IpPortData;
+import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* @author zengqiao
@@ -18,22 +29,48 @@ import java.util.List;
* "version":4,
* "rack": "CY"
* }
+ *
+ * {
+ * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT","PLAINTEXT":"PLAINTEXT"},
+ * "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093","PLAINTEXT://10.179.162.202:9092"],
+ * "jmx_port":8099,
+ * "host":"10.179.162.202",
+ * "timestamp":"1628833925822",
+ * "port":9092,
+ * "version":4
+ * }
+ *
+ * {
+ * "listener_security_protocol_map":{"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"},
+ * "endpoints":["EXTERNAL://10.179.162.202:7092","INTERNAL://10.179.162.202:7093"],
+ * "jmx_port":8099,
+ * "host":null,
+ * "timestamp":"1627289710439",
+ * "port":-1,
+ * "version":4
+ * }
+ *
*/
-public class BrokerMetadata implements Cloneable {
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BrokerMetadata implements Serializable {
+ private static final long serialVersionUID = 3918113492423375809L;
+
private long clusterId;
private int brokerId;
private List endpoints;
+ // >
+ private Map endpointMap;
+
private String host;
private int port;
- /*
- * ZK上对应的字段就是这个名字, 不要进行修改
- */
- private int jmx_port;
+ @JsonProperty("jmx_port")
+ private int jmxPort;
private String version;
@@ -41,91 +78,54 @@ public class BrokerMetadata implements Cloneable {
private String rack;
- public long getClusterId() {
- return clusterId;
+ @JsonIgnore
+ public String getExternalHost() {
+ if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) {
+ return null;
+ }
+ return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp();
}
- public void setClusterId(long clusterId) {
- this.clusterId = clusterId;
+ @JsonIgnore
+ public String getInternalHost() {
+ if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) {
+ return null;
+ }
+ return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp();
}
- public int getBrokerId() {
- return brokerId;
- }
+ public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) {
+ brokerMetadata.setEndpointMap(new HashMap<>());
- public void setBrokerId(int brokerId) {
- this.brokerId = brokerId;
- }
+ if (brokerMetadata.getEndpoints().isEmpty()) {
+ return;
+ }
- public List getEndpoints() {
- return endpoints;
- }
+ // example EXTERNAL://10.179.162.202:7092
+ for (String endpoint: brokerMetadata.getEndpoints()) {
+ int idx1 = endpoint.indexOf("://");
+ int idx2 = endpoint.lastIndexOf(":");
+ if (idx1 == -1 || idx2 == -1 || idx1 == idx2) {
+ continue;
+ }
- public void setEndpoints(List endpoints) {
- this.endpoints = endpoints;
- }
+ String brokerHost = endpoint.substring(idx1 + "://".length(), idx2);
+ String brokerPort = endpoint.substring(idx2 + 1);
- public String getHost() {
- return host;
- }
+ brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort));
- public void setHost(String host) {
- this.host = host;
- }
+ if (KafkaConstant.EXTERNAL_KEY.equals(endpoint.substring(0, idx1))) {
+ // 优先使用external的地址进行展示
+ brokerMetadata.setHost(brokerHost);
+ brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort));
+ }
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public int getJmxPort() {
- return jmx_port;
- }
-
- public void setJmxPort(int jmxPort) {
- this.jmx_port = jmxPort;
- }
-
- public String getVersion() {
- return version;
- }
-
- public void setVersion(String version) {
- this.version = version;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- public String getRack() {
- return rack;
- }
-
- public void setRack(String rack) {
- this.rack = rack;
- }
-
- @Override
- public String toString() {
- return "BrokerMetadata{" +
- "clusterId=" + clusterId +
- ", brokerId=" + brokerId +
- ", endpoints=" + endpoints +
- ", host='" + host + '\'' +
- ", port=" + port +
- ", jmxPort=" + jmx_port +
- ", version='" + version + '\'' +
- ", timestamp=" + timestamp +
- ", rack='" + rack + '\'' +
- '}';
+ if (null == brokerMetadata.getHost()) {
+ brokerMetadata.setHost(brokerHost);
+ brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort));
+ }
+ }
}
}
+
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java
index a94ec9de..f5cdefe8 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java
@@ -74,15 +74,10 @@ public class BrokerStateListener implements StateChangeListener {
BrokerMetadata brokerMetadata = null;
try {
brokerMetadata = zkConfig.get(ZkPathUtil.getBrokerIdNodePath(brokerId), BrokerMetadata.class);
- if (!brokerMetadata.getEndpoints().isEmpty()) {
- String endpoint = brokerMetadata.getEndpoints().get(0);
- int idx = endpoint.indexOf("://");
- endpoint = endpoint.substring(idx + "://".length());
- idx = endpoint.indexOf(":");
- brokerMetadata.setHost(endpoint.substring(0, idx));
- brokerMetadata.setPort(Integer.parseInt(endpoint.substring(idx + 1)));
- }
+ // 解析并更新本次存储的broker元信息
+ BrokerMetadata.parseAndUpdateBrokerMetadata(brokerMetadata);
+
brokerMetadata.setClusterId(clusterId);
brokerMetadata.setBrokerId(brokerId);
PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxConfig);