[Feature]支持指定Server的具体Jmx端口(#965)

变更事项:
1、接入集群时,支持按照Broker粒度进行Jmx端口的配置;
2、设置Jmx端口的优先级为:指定Broker端口 》ZK中获取到的Broker端口 》指定Cluster端口;

补充说明:
1、该修改仅为后端修改,产品上暂未进行修改;
This commit is contained in:
ZQKC
2023-06-01 14:29:07 +08:00
committed by EricZeng
parent b3fd494398
commit e34e3f3e3d
10 changed files with 172 additions and 39 deletions

View File

@@ -202,7 +202,7 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
//补充非zk模式的JMXPort信息
if (!clusterPhy.getRunState().equals(ClusterRunStateEnum.RUN_ZK.getRunState())) {
JmxConfig jmxConfig = ConvertUtil.str2ObjByJson(clusterPhy.getJmxProperties(), JmxConfig.class);
voList.forEach(elem -> elem.setJmxPort(jmxConfig.getJmxPort() == null ? -1 : jmxConfig.getJmxPort()));
voList.forEach(elem -> elem.setJmxPort(jmxConfig.getFinallyJmxPort(String.valueOf(elem.getBrokerId()))));
}
return voList;

View File

@@ -4,6 +4,8 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.broker;
import com.alibaba.fastjson.TypeReference;
import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData;
import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -71,10 +73,10 @@ public class Broker implements Serializable {
metadata.setBrokerId(node.id());
metadata.setHost(node.host());
metadata.setPort(node.port());
metadata.setJmxPort(-1);
metadata.setJmxPort(JmxEnum.UNKNOWN.getPort());
metadata.setStartTimestamp(startTimestamp);
metadata.setRack(node.rack());
metadata.setStatus(1);
metadata.setStatus(Constant.ALIVE);
return metadata;
}

View File

@@ -0,0 +1,29 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.config;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
/**
* @author zengqiao
* @date 23/05/19
*/
@Data
@ApiModel(description = "Jmx配置")
public class JmxAuthConfig implements Serializable {
@ApiModelProperty(value="最大连接", example = "100")
protected Integer maxConn;
@ApiModelProperty(value="是否开启SSL如果开始则username 与 token 必须非空", example = "false")
protected Boolean openSSL;
@ApiModelProperty(value="SSL情况下的username", example = "Ks-Km")
protected String username;
@ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19")
protected String token;
}

View File

@@ -1,10 +1,12 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.config;
import com.xiaojukeji.know.streaming.km.common.bean.entity.jmx.ServerIdJmxPort;
import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* @author zengqiao
@@ -12,24 +14,69 @@ import java.io.Serializable;
*/
@Data
@ApiModel(description = "Jmx配置")
public class JmxConfig implements Serializable {
@ApiModelProperty(value="jmx端口", example = "8099")
public class JmxConfig extends JmxAuthConfig {
@ApiModelProperty(value="jmx端口,最低优先使用的端口", example = "8099")
private Integer jmxPort;
@ApiModelProperty(value="最大连接", example = "100")
private Integer maxConn;
@ApiModelProperty(value="是否开启SSL如果开始则username 与 token 必须非空", example = "false")
private Boolean openSSL;
@ApiModelProperty(value="SSL情况下的username", example = "Ks-Km")
private String username;
@ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19")
private String token;
@ApiModelProperty(value="使用哪个endpoint网络", example = "EXTERNAL")
private String useWhichEndpoint;
@ApiModelProperty(value="指定server的JMX端口, 最高优先使用的端口", example = "")
private List<ServerIdJmxPort> specifiedJmxPortList;
/**
* 选取最终的jmx端口
* @param serverId 服务ID
* @param metadataJmxPort ks从元信息中获取到的jmx端口
*/
public Integer getFinallyJmxPort(String serverId, Integer metadataJmxPort) {
if (specifiedJmxPortList == null || specifiedJmxPortList.isEmpty()) {
// 未进行特殊指定时zkJMX端口存在则优先使用zkJmxPort否则使用配置的jmxPort
return this.selectJmxPort(jmxPort, metadataJmxPort);
}
// 进行特殊配置时
for (ServerIdJmxPort serverIdJmxPort: specifiedJmxPortList) {
if (serverId.equals(serverIdJmxPort.getServerId()) && serverIdJmxPort.getJmxPort() != null) {
// 当前server有指定具体的jmx端口时则使用具体的端口
return serverIdJmxPort.getJmxPort();
}
}
return this.selectJmxPort(jmxPort, metadataJmxPort);
}
/**
* 选取最终的jmx端口
* @param serverId serverId
*/
public Integer getFinallyJmxPort(String serverId) {
return this.getFinallyJmxPort(serverId, null);
}
/**
* 选取jmx端口
* @param feJmxPort 前端页面配置的jmx端口
* @param metadataJmxPort ks从元信息中获取到的jmx端口
*/
private Integer selectJmxPort(Integer feJmxPort, Integer metadataJmxPort) {
if (metadataJmxPort == null) {
return feJmxPort != null? feJmxPort: JmxEnum.NOT_OPEN.getPort();
}
if (JmxEnum.NOT_OPEN.getPort().equals(metadataJmxPort)) {
// 如果元信息提示未开启,则直接返回未开启
return JmxEnum.NOT_OPEN.getPort();
}
if (JmxEnum.UNKNOWN.getPort().equals(metadataJmxPort)) {
// 如果元信息提示未知则直接返回feJmxPort 或者 未开启
return feJmxPort != null? feJmxPort: JmxEnum.NOT_OPEN.getPort();
}
// 其他情况,返回 metadataJmxPort
return metadataJmxPort;
}
}

View File

@@ -0,0 +1,25 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.jmx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author didi
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerIdJmxPort implements Serializable {
/**
* serverID
*/
private String serverId;
/**
* JMX端口
*/
private Integer jmxPort;
}

View File

@@ -0,0 +1,20 @@
package com.xiaojukeji.know.streaming.km.common.enums.jmx;
import lombok.Getter;
@Getter
public enum JmxEnum {
NOT_OPEN(-1, "未开启JMX端口"),
UNKNOWN(-2, "JMX端口未知"),
;
private final Integer port;
private final String message;
JmxEnum(Integer port, String message) {
this.port = port;
this.message = message;
}
}

View File

@@ -1,6 +1,8 @@
package com.xiaojukeji.know.streaming.km.common.jmx;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxAuthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxConfig;
import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import org.slf4j.Logger;
@@ -33,26 +35,21 @@ public class JmxConnectorWrap {
private final Long brokerStartupTime;
private final String host;
private final String jmxHost;
private final Integer port;
private final Integer jmxPort;
private JMXConnector jmxConnector;
private final AtomicInteger atomicInteger;
private JmxConfig jmxConfig;
private JmxAuthConfig jmxConfig;
public JmxConnectorWrap(String clientLogIdent, Long brokerStartupTime, String host, Integer port, JmxConfig jmxConfig) {
public JmxConnectorWrap(String clientLogIdent, Long brokerStartupTime, String jmxHost, Integer jmxPort, JmxAuthConfig jmxConfig) {
this.clientLogIdent=clientLogIdent;
this.brokerStartupTime = brokerStartupTime;
this.host = host;
if (port == null || port == -1 && jmxConfig.getJmxPort() != null) {
this.port = jmxConfig.getJmxPort();
} else {
this.port = port;
}
this.jmxHost = jmxHost;
this.jmxPort = (jmxPort == null? JmxEnum.UNKNOWN.getPort() : jmxPort);
this.jmxConfig = jmxConfig;
if (ValidateUtils.isNull(this.jmxConfig)) {
@@ -61,6 +58,7 @@ public class JmxConnectorWrap {
if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getMaxConn())) {
this.jmxConfig.setMaxConn(1000);
}
this.atomicInteger = new AtomicInteger(this.jmxConfig.getMaxConn());
}
@@ -68,7 +66,7 @@ public class JmxConnectorWrap {
if (jmxConnector != null) {
return true;
}
if (port == null || port == -1) {
if (jmxPort == null || jmxPort == -1) {
return false;
}
return createJmxConnector();
@@ -91,7 +89,10 @@ public class JmxConnectorWrap {
jmxConnector = null;
} catch (IOException e) {
LOGGER.warn("close JmxConnector exception, clientLogIdent:{} host:{} port:{}.", clientLogIdent, host, port, e);
LOGGER.error(
"method=close||clientLogIdent={}||jmxHost={}||jmxPort={}||msg=close jmx JmxConnector exception.",
clientLogIdent, jmxHost, jmxPort, e
);
}
}
@@ -159,7 +160,7 @@ public class JmxConnectorWrap {
if (jmxConnector != null) {
return true;
}
String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port);
String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", jmxHost, jmxPort);
try {
Map<String, Object> environment = new HashMap<String, Object>();
if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getToken())) {
@@ -174,12 +175,21 @@ public class JmxConnectorWrap {
}
jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment);
LOGGER.info("JMX connect success, clientLogIdent:{} host:{} port:{}.", clientLogIdent, host, port);
LOGGER.info(
"method=createJmxConnector||clientLogIdent={}||jmxHost={}||jmxPort={}||msg=jmx connect success.",
clientLogIdent, jmxHost, jmxPort
);
return true;
} catch (MalformedURLException e) {
LOGGER.error("JMX url exception, clientLogIdent:{} host:{} port:{} jmxUrl:{}", clientLogIdent, host, port, jmxUrl, e);
LOGGER.error(
"method=createJmxConnector||clientLogIdent={}||jmxHost={}||jmxPort={}||jmxUrl={}||msg=jmx url exception.",
clientLogIdent, jmxHost, jmxPort, jmxUrl, e
);
} catch (Exception e) {
LOGGER.error("JMX connect exception, clientLogIdent:{} host:{} port:{}.", clientLogIdent, host, port, e);
LOGGER.error(
"method=createJmxConnector||clientLogIdent={}||jmxHost={}||jmxPort={}||msg=jmx connect exception.",
clientLogIdent, jmxHost, jmxPort, e
);
}
return false;
}

View File

@@ -360,7 +360,7 @@ public class BrokerServiceImpl extends BaseKafkaVersionControlService implements
private Broker getStartTimeAndBuildBroker(Long clusterPhyId, Node newNode, JmxConfig jmxConfig) {
try {
Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), jmxConfig.getJmxPort(), jmxConfig);
Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), jmxConfig.getFinallyJmxPort(String.valueOf(newNode.id())), jmxConfig);
return Broker.buildFrom(clusterPhyId, newNode, startTime);
} catch (Exception e) {

View File

@@ -90,7 +90,7 @@ public class ConnectJMXClient extends AbstractConnectClusterChangeHandler {
"connectClusterId: " + connectCluster.getId() + " workerId: " + workerId,
null,
connectWorker.getHost(),
connectWorker.getJmxPort() != null ? connectWorker.getJmxPort() : jmxConfig.getJmxPort(),
jmxConfig.getFinallyJmxPort(workerId, connectWorker.getJmxPort()),
jmxConfig
);

View File

@@ -161,8 +161,8 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler {
JmxConnectorWrap jmxConnectorWrap = new JmxConnectorWrap(
"clusterPhyId: " + clusterPhy.getId() + " brokerId: " + brokerId,
broker.getStartTimestamp(),
jmxConfig != null ? broker.getJmxHost(jmxConfig.getUseWhichEndpoint()) : broker.getHost(),
broker.getJmxPort() != null ? broker.getJmxPort() : jmxConfig.getJmxPort(),
broker.getJmxHost(jmxConfig.getUseWhichEndpoint()),
jmxConfig.getFinallyJmxPort(String.valueOf(brokerId), broker.getJmxPort()),
jmxConfig
);