From e34e3f3e3d73d49e27ff4bad2541511576c007e9 Mon Sep 17 00:00:00 2001 From: ZQKC Date: Thu, 1 Jun 2023 14:29:07 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]=E6=94=AF=E6=8C=81=E6=8C=87=E5=AE=9ASe?= =?UTF-8?q?rver=E7=9A=84=E5=85=B7=E4=BD=93Jmx=E7=AB=AF=E5=8F=A3(#965)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 变更事项: 1、接入集群时,支持按照Broker粒度进行Jmx端口的配置; 2、设置Jmx端口的优先级为:指定Broker端口 》ZK中获取到的Broker端口 》指定Cluster端口; 补充说明: 1、该修改仅为后端修改,产品上暂未进行修改; --- .../impl/ClusterBrokersManagerImpl.java | 2 +- .../km/common/bean/entity/broker/Broker.java | 6 +- .../bean/entity/config/JmxAuthConfig.java | 29 +++++++ .../common/bean/entity/config/JmxConfig.java | 77 +++++++++++++++---- .../bean/entity/jmx/ServerIdJmxPort.java | 25 ++++++ .../km/common/enums/jmx/JmxEnum.java | 20 +++++ .../km/common/jmx/JmxConnectorWrap.java | 44 +++++++---- .../broker/impl/BrokerServiceImpl.java | 2 +- .../persistence/connect/ConnectJMXClient.java | 2 +- .../km/persistence/kafka/KafkaJMXClient.java | 4 +- 10 files changed, 172 insertions(+), 39 deletions(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxAuthConfig.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/jmx/ServerIdJmxPort.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/jmx/JmxEnum.java diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java index ab5d6a6d..c77724dd 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java @@ -202,7 +202,7 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager { //补充非zk模式的JMXPort信息 if (!clusterPhy.getRunState().equals(ClusterRunStateEnum.RUN_ZK.getRunState())) { JmxConfig jmxConfig = ConvertUtil.str2ObjByJson(clusterPhy.getJmxProperties(), JmxConfig.class); - voList.forEach(elem -> elem.setJmxPort(jmxConfig.getJmxPort() == null ? -1 : jmxConfig.getJmxPort())); + voList.forEach(elem -> elem.setJmxPort(jmxConfig.getFinallyJmxPort(String.valueOf(elem.getBrokerId())))); } return voList; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java index 752aade0..35fa1f5a 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java @@ -4,6 +4,8 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.broker; import com.alibaba.fastjson.TypeReference; import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData; import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import lombok.AllArgsConstructor; import lombok.Data; @@ -71,10 +73,10 @@ public class Broker implements Serializable { metadata.setBrokerId(node.id()); metadata.setHost(node.host()); metadata.setPort(node.port()); - metadata.setJmxPort(-1); + metadata.setJmxPort(JmxEnum.UNKNOWN.getPort()); metadata.setStartTimestamp(startTimestamp); metadata.setRack(node.rack()); - metadata.setStatus(1); + metadata.setStatus(Constant.ALIVE); return metadata; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxAuthConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxAuthConfig.java new file mode 100644 index 00000000..02fec6b4 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxAuthConfig.java @@ -0,0 +1,29 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.config; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; + +/** + * @author zengqiao + * @date 23/05/19 + */ +@Data +@ApiModel(description = "Jmx配置") +public class JmxAuthConfig implements Serializable { + @ApiModelProperty(value="最大连接", example = "100") + protected Integer maxConn; + + @ApiModelProperty(value="是否开启SSL,如果开始则username 与 token 必须非空", example = "false") + protected Boolean openSSL; + + @ApiModelProperty(value="SSL情况下的username", example = "Ks-Km") + protected String username; + + @ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19") + protected String token; +} + + diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java index 87607c1f..7620e960 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java @@ -1,10 +1,12 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.config; +import com.xiaojukeji.know.streaming.km.common.bean.entity.jmx.ServerIdJmxPort; +import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; -import java.io.Serializable; +import java.util.List; /** * @author zengqiao @@ -12,24 +14,69 @@ import java.io.Serializable; */ @Data @ApiModel(description = "Jmx配置") -public class JmxConfig implements Serializable { - @ApiModelProperty(value="jmx端口", example = "8099") +public class JmxConfig extends JmxAuthConfig { + @ApiModelProperty(value="jmx端口,最低优先使用的端口", example = "8099") private Integer jmxPort; - @ApiModelProperty(value="最大连接", example = "100") - private Integer maxConn; - - @ApiModelProperty(value="是否开启SSL,如果开始则username 与 token 必须非空", example = "false") - private Boolean openSSL; - - @ApiModelProperty(value="SSL情况下的username", example = "Ks-Km") - private String username; - - @ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19") - private String token; - @ApiModelProperty(value="使用哪个endpoint网络", example = "EXTERNAL") private String useWhichEndpoint; + + @ApiModelProperty(value="指定server的JMX端口, 最高优先使用的端口", example = "") + private List specifiedJmxPortList; + + /** + * 选取最终的jmx端口 + * @param serverId 服务ID + * @param metadataJmxPort ks从元信息中获取到的jmx端口 + */ + public Integer getFinallyJmxPort(String serverId, Integer metadataJmxPort) { + if (specifiedJmxPortList == null || specifiedJmxPortList.isEmpty()) { + // 未进行特殊指定时,zkJMX端口存在则优先使用zkJmxPort,否则使用配置的jmxPort + return this.selectJmxPort(jmxPort, metadataJmxPort); + } + + // 进行特殊配置时 + for (ServerIdJmxPort serverIdJmxPort: specifiedJmxPortList) { + if (serverId.equals(serverIdJmxPort.getServerId()) && serverIdJmxPort.getJmxPort() != null) { + // 当前server有指定具体的jmx端口时,则使用具体的端口 + return serverIdJmxPort.getJmxPort(); + } + } + + return this.selectJmxPort(jmxPort, metadataJmxPort); + } + + /** + * 选取最终的jmx端口 + * @param serverId serverId + */ + public Integer getFinallyJmxPort(String serverId) { + return this.getFinallyJmxPort(serverId, null); + } + + /** + * 选取jmx端口 + * @param feJmxPort 前端页面配置的jmx端口 + * @param metadataJmxPort ks从元信息中获取到的jmx端口 + */ + private Integer selectJmxPort(Integer feJmxPort, Integer metadataJmxPort) { + if (metadataJmxPort == null) { + return feJmxPort != null? feJmxPort: JmxEnum.NOT_OPEN.getPort(); + } + + if (JmxEnum.NOT_OPEN.getPort().equals(metadataJmxPort)) { + // 如果元信息提示未开启,则直接返回未开启 + return JmxEnum.NOT_OPEN.getPort(); + } + + if (JmxEnum.UNKNOWN.getPort().equals(metadataJmxPort)) { + // 如果元信息提示未知,则直接返回feJmxPort 或者 未开启 + return feJmxPort != null? feJmxPort: JmxEnum.NOT_OPEN.getPort(); + } + + // 其他情况,返回 metadataJmxPort + return metadataJmxPort; + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/jmx/ServerIdJmxPort.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/jmx/ServerIdJmxPort.java new file mode 100644 index 00000000..df27cb87 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/jmx/ServerIdJmxPort.java @@ -0,0 +1,25 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.jmx; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author didi + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ServerIdJmxPort implements Serializable { + /** + * serverID + */ + private String serverId; + + /** + * JMX端口 + */ + private Integer jmxPort; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/jmx/JmxEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/jmx/JmxEnum.java new file mode 100644 index 00000000..314402e8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/jmx/JmxEnum.java @@ -0,0 +1,20 @@ +package com.xiaojukeji.know.streaming.km.common.enums.jmx; + +import lombok.Getter; + +@Getter +public enum JmxEnum { + NOT_OPEN(-1, "未开启JMX端口"), + + UNKNOWN(-2, "JMX端口未知"), + + ; + + private final Integer port; + private final String message; + + JmxEnum(Integer port, String message) { + this.port = port; + this.message = message; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java index d9cfb082..071072ad 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java @@ -1,6 +1,8 @@ package com.xiaojukeji.know.streaming.km.common.jmx; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxAuthConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxConfig; +import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum; import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import org.slf4j.Logger; @@ -33,26 +35,21 @@ public class JmxConnectorWrap { private final Long brokerStartupTime; - private final String host; + private final String jmxHost; - private final Integer port; + private final Integer jmxPort; private JMXConnector jmxConnector; private final AtomicInteger atomicInteger; - private JmxConfig jmxConfig; + private JmxAuthConfig jmxConfig; - public JmxConnectorWrap(String clientLogIdent, Long brokerStartupTime, String host, Integer port, JmxConfig jmxConfig) { + public JmxConnectorWrap(String clientLogIdent, Long brokerStartupTime, String jmxHost, Integer jmxPort, JmxAuthConfig jmxConfig) { this.clientLogIdent=clientLogIdent; this.brokerStartupTime = brokerStartupTime; - this.host = host; - - if (port == null || port == -1 && jmxConfig.getJmxPort() != null) { - this.port = jmxConfig.getJmxPort(); - } else { - this.port = port; - } + this.jmxHost = jmxHost; + this.jmxPort = (jmxPort == null? JmxEnum.UNKNOWN.getPort() : jmxPort); this.jmxConfig = jmxConfig; if (ValidateUtils.isNull(this.jmxConfig)) { @@ -61,6 +58,7 @@ public class JmxConnectorWrap { if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getMaxConn())) { this.jmxConfig.setMaxConn(1000); } + this.atomicInteger = new AtomicInteger(this.jmxConfig.getMaxConn()); } @@ -68,7 +66,7 @@ public class JmxConnectorWrap { if (jmxConnector != null) { return true; } - if (port == null || port == -1) { + if (jmxPort == null || jmxPort == -1) { return false; } return createJmxConnector(); @@ -91,7 +89,10 @@ public class JmxConnectorWrap { jmxConnector = null; } catch (IOException e) { - LOGGER.warn("close JmxConnector exception, clientLogIdent:{} host:{} port:{}.", clientLogIdent, host, port, e); + LOGGER.error( + "method=close||clientLogIdent={}||jmxHost={}||jmxPort={}||msg=close jmx JmxConnector exception.", + clientLogIdent, jmxHost, jmxPort, e + ); } } @@ -159,7 +160,7 @@ public class JmxConnectorWrap { if (jmxConnector != null) { return true; } - String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port); + String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", jmxHost, jmxPort); try { Map environment = new HashMap(); if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getToken())) { @@ -174,12 +175,21 @@ public class JmxConnectorWrap { } jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment); - LOGGER.info("JMX connect success, clientLogIdent:{} host:{} port:{}.", clientLogIdent, host, port); + LOGGER.info( + "method=createJmxConnector||clientLogIdent={}||jmxHost={}||jmxPort={}||msg=jmx connect success.", + clientLogIdent, jmxHost, jmxPort + ); return true; } catch (MalformedURLException e) { - LOGGER.error("JMX url exception, clientLogIdent:{} host:{} port:{} jmxUrl:{}", clientLogIdent, host, port, jmxUrl, e); + LOGGER.error( + "method=createJmxConnector||clientLogIdent={}||jmxHost={}||jmxPort={}||jmxUrl={}||msg=jmx url exception.", + clientLogIdent, jmxHost, jmxPort, jmxUrl, e + ); } catch (Exception e) { - LOGGER.error("JMX connect exception, clientLogIdent:{} host:{} port:{}.", clientLogIdent, host, port, e); + LOGGER.error( + "method=createJmxConnector||clientLogIdent={}||jmxHost={}||jmxPort={}||msg=jmx connect exception.", + clientLogIdent, jmxHost, jmxPort, e + ); } return false; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index 97dc00c8..e34c035c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -360,7 +360,7 @@ public class BrokerServiceImpl extends BaseKafkaVersionControlService implements private Broker getStartTimeAndBuildBroker(Long clusterPhyId, Node newNode, JmxConfig jmxConfig) { try { - Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), jmxConfig.getJmxPort(), jmxConfig); + Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), jmxConfig.getFinallyJmxPort(String.valueOf(newNode.id())), jmxConfig); return Broker.buildFrom(clusterPhyId, newNode, startTime); } catch (Exception e) { diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/connect/ConnectJMXClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/connect/ConnectJMXClient.java index 727ad7f6..300243b7 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/connect/ConnectJMXClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/connect/ConnectJMXClient.java @@ -90,7 +90,7 @@ public class ConnectJMXClient extends AbstractConnectClusterChangeHandler { "connectClusterId: " + connectCluster.getId() + " workerId: " + workerId, null, connectWorker.getHost(), - connectWorker.getJmxPort() != null ? connectWorker.getJmxPort() : jmxConfig.getJmxPort(), + jmxConfig.getFinallyJmxPort(workerId, connectWorker.getJmxPort()), jmxConfig ); diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java index 1ace6742..e759da60 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java @@ -161,8 +161,8 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler { JmxConnectorWrap jmxConnectorWrap = new JmxConnectorWrap( "clusterPhyId: " + clusterPhy.getId() + " brokerId: " + brokerId, broker.getStartTimestamp(), - jmxConfig != null ? broker.getJmxHost(jmxConfig.getUseWhichEndpoint()) : broker.getHost(), - broker.getJmxPort() != null ? broker.getJmxPort() : jmxConfig.getJmxPort(), + broker.getJmxHost(jmxConfig.getUseWhichEndpoint()), + jmxConfig.getFinallyJmxPort(String.valueOf(brokerId), broker.getJmxPort()), jmxConfig );