From 4df2dc09fea79e2238541f408202fef01d13ae56 Mon Sep 17 00:00:00 2001 From: xuguang Date: Wed, 12 Jan 2022 16:15:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9BrokerMetadata?= =?UTF-8?q?=E4=B8=ADendpoints=E4=B8=BAinternal|External=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=E7=9A=84=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-manager-common/pom.xml | 5 + .../common/constant/KafkaConstant.java | 4 + .../common/entity/ao/common/IpPortData.java | 18 ++ .../znode/brokers/BrokerMetadata.java | 160 +++++++++--------- .../zookeeper/BrokerStateListener.java | 11 +- 5 files changed, 110 insertions(+), 88 deletions(-) create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java diff --git a/kafka-manager-common/pom.xml b/kafka-manager-common/pom.xml index 6a8ff0cb..c914ffeb 100644 --- a/kafka-manager-common/pom.xml +++ b/kafka-manager-common/pom.xml @@ -109,5 +109,10 @@ junit junit + + org.projectlombok + lombok + compile + \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java index 4d69f914..463e9b1a 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java @@ -17,6 +17,10 @@ public class KafkaConstant { public static final String RETENTION_MS_KEY = "retention.ms"; + public static final String EXTERNAL_KEY = "EXTERNAL"; + + public static final String INTERNAL_KEY = "INTERNAL"; + private KafkaConstant() { } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java new file mode 100644 index 00000000..a16b32b4 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.kafka.manager.common.entity.ao.common; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IpPortData implements Serializable { + private static final long serialVersionUID = -428897032994630685L; + + private String ip; + + private String port; +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java index 3c179b4f..e4e5063d 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java @@ -1,6 +1,17 @@ package com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; +import com.xiaojukeji.kafka.manager.common.entity.ao.common.IpPortData; +import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @author zengqiao @@ -18,22 +29,48 @@ import java.util.List; * "version":4, * "rack": "CY" * } + * + * { + * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT","PLAINTEXT":"PLAINTEXT"}, + * "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093","PLAINTEXT://10.179.162.202:9092"], + * "jmx_port":8099, + * "host":"10.179.162.202", + * "timestamp":"1628833925822", + * "port":9092, + * "version":4 + * } + * + * { + * "listener_security_protocol_map":{"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"}, + * "endpoints":["EXTERNAL://10.179.162.202:7092","INTERNAL://10.179.162.202:7093"], + * "jmx_port":8099, + * "host":null, + * "timestamp":"1627289710439", + * "port":-1, + * "version":4 + * } + * */ -public class BrokerMetadata implements Cloneable { +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class BrokerMetadata implements Serializable { + private static final long serialVersionUID = 3918113492423375809L; + private long clusterId; private int brokerId; private List endpoints; + // > + private Map endpointMap; + private String host; private int port; - /* - * ZK上对应的字段就是这个名字, 不要进行修改 - */ - private int jmx_port; + @JsonProperty("jmx_port") + private int jmxPort; private String version; @@ -41,91 +78,54 @@ public class BrokerMetadata implements Cloneable { private String rack; - public long getClusterId() { - return clusterId; + @JsonIgnore + public String getExternalHost() { + if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) { + return null; + } + return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp(); } - public void setClusterId(long clusterId) { - this.clusterId = clusterId; + @JsonIgnore + public String getInternalHost() { + if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) { + return null; + } + return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp(); } - public int getBrokerId() { - return brokerId; - } + public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) { + brokerMetadata.setEndpointMap(new HashMap<>()); - public void setBrokerId(int brokerId) { - this.brokerId = brokerId; - } + if (brokerMetadata.getEndpoints().isEmpty()) { + return; + } - public List getEndpoints() { - return endpoints; - } + // example EXTERNAL://10.179.162.202:7092 + for (String endpoint: brokerMetadata.getEndpoints()) { + int idx1 = endpoint.indexOf("://"); + int idx2 = endpoint.lastIndexOf(":"); + if (idx1 == -1 || idx2 == -1 || idx1 == idx2) { + continue; + } - public void setEndpoints(List endpoints) { - this.endpoints = endpoints; - } + String brokerHost = endpoint.substring(idx1 + "://".length(), idx2); + String brokerPort = endpoint.substring(idx2 + 1); - public String getHost() { - return host; - } + brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort)); - public void setHost(String host) { - this.host = host; - } + if (KafkaConstant.EXTERNAL_KEY.equals(endpoint.substring(0, idx1))) { + // 优先使用external的地址进行展示 + brokerMetadata.setHost(brokerHost); + brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort)); + } - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public int getJmxPort() { - return jmx_port; - } - - public void setJmxPort(int jmxPort) { - this.jmx_port = jmxPort; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public String getRack() { - return rack; - } - - public void setRack(String rack) { - this.rack = rack; - } - - @Override - public String toString() { - return "BrokerMetadata{" + - "clusterId=" + clusterId + - ", brokerId=" + brokerId + - ", endpoints=" + endpoints + - ", host='" + host + '\'' + - ", port=" + port + - ", jmxPort=" + jmx_port + - ", version='" + version + '\'' + - ", timestamp=" + timestamp + - ", rack='" + rack + '\'' + - '}'; + if (null == brokerMetadata.getHost()) { + brokerMetadata.setHost(brokerHost); + brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort)); + } + } } } + diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java index a94ec9de..f5cdefe8 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java @@ -74,15 +74,10 @@ public class BrokerStateListener implements StateChangeListener { BrokerMetadata brokerMetadata = null; try { brokerMetadata = zkConfig.get(ZkPathUtil.getBrokerIdNodePath(brokerId), BrokerMetadata.class); - if (!brokerMetadata.getEndpoints().isEmpty()) { - String endpoint = brokerMetadata.getEndpoints().get(0); - int idx = endpoint.indexOf("://"); - endpoint = endpoint.substring(idx + "://".length()); - idx = endpoint.indexOf(":"); - brokerMetadata.setHost(endpoint.substring(0, idx)); - brokerMetadata.setPort(Integer.parseInt(endpoint.substring(idx + 1))); - } + // 解析并更新本次存储的broker元信息 + BrokerMetadata.parseAndUpdateBrokerMetadata(brokerMetadata); + brokerMetadata.setClusterId(clusterId); brokerMetadata.setBrokerId(brokerId); PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxConfig);