diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java index f4218020..084ea5a6 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/BackoffUtils.java @@ -1,9 +1,18 @@ package com.xiaojukeji.kafka.manager.common.utils; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + public class BackoffUtils { private BackoffUtils() { } + /** + * 需要进行延迟的事件 + * <事件名,延迟结束事件> + */ + private static final Map NEED_BACK_OFF_EVENT_MAP = new ConcurrentHashMap<>(); + public static void backoff(long timeUnitMs) { if (timeUnitMs <= 0) { return; @@ -17,4 +26,50 @@ public class BackoffUtils { // ignore } } + + /** + * 记录延迟设置 + * @param backoffEventKey 回退事件key + * @param backoffTimeUnitMs 回退时间(ms) + */ + public static void putNeedBackoffEvent(String backoffEventKey, Long backoffTimeUnitMs) { + if (backoffEventKey == null || backoffTimeUnitMs == null || backoffTimeUnitMs <= 0) { + return; + } + + NEED_BACK_OFF_EVENT_MAP.put(backoffEventKey, backoffTimeUnitMs + System.currentTimeMillis()); + } + + /** + * 移除回退设置 + * @param backoffEventKey 回退事件key + */ + public static void removeNeedBackoffEvent(String backoffEventKey) { + NEED_BACK_OFF_EVENT_MAP.remove(backoffEventKey); + } + + /** + * 检查是否需要回退 + * @param backoffEventKey 回退事件key + * @return + */ + public static boolean isNeedBackoff(String backoffEventKey) { + Long backoffEventEndTimeUnitMs = NEED_BACK_OFF_EVENT_MAP.get(backoffEventKey); + if (backoffEventEndTimeUnitMs == null) { + return false; + } + + if (backoffEventEndTimeUnitMs > System.currentTimeMillis()) { + return true; + } + + // 移除 + try { + NEED_BACK_OFF_EVENT_MAP.remove(backoffEventKey, backoffEventEndTimeUnitMs); + } catch (Exception e) { + // 如果key不存在,这里可能出现NPE,不过不管什么异常都可以忽略 + } + + return false; + } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java index bbc913c4..f5c380c2 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConfig.java @@ -1,5 +1,10 @@ package com.xiaojukeji.kafka.manager.common.utils.jmx; +import lombok.Data; +import lombok.ToString; + +@Data +@ToString public class JmxConfig { /** * 单台最大连接数 @@ -21,45 +26,8 @@ public class JmxConfig { */ private Boolean openSSL; - public Integer getMaxConn() { - return maxConn; - } - - public void setMaxConn(Integer maxConn) { - this.maxConn = maxConn; - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public Boolean isOpenSSL() { - return openSSL; - } - - public void setOpenSSL(Boolean openSSL) { - this.openSSL = openSSL; - } - - @Override - public String toString() { - return "JmxConfig{" + - "maxConn=" + maxConn + - ", username='" + username + '\'' + - ", password='" + password + '\'' + - ", openSSL=" + openSSL + - '}'; - } + /** + * 连接重试回退事件 + */ + private Long retryConnectBackoffTimeUnitMs; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java index db1e2341..c66c7bc6 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; /** * JMXConnector包装类 @@ -41,6 +42,8 @@ public class JmxConnectorWrap { private JmxConfig jmxConfig; + private final ReentrantLock modifyJMXConnectorLock = new ReentrantLock(); + public JmxConnectorWrap(Long physicalClusterId, Integer brokerId, String host, int port, JmxConfig jmxConfig) { this.physicalClusterId = physicalClusterId; this.brokerId = brokerId; @@ -51,7 +54,12 @@ public class JmxConnectorWrap { this.jmxConfig = new JmxConfig(); } if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getMaxConn())) { - this.jmxConfig.setMaxConn(1); + // 默认设置20 + this.jmxConfig.setMaxConn(20); + } + if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getRetryConnectBackoffTimeUnitMs())) { + // 默认回退10分钟 + this.jmxConfig.setRetryConnectBackoffTimeUnitMs(10 * 60 * 1000L); } this.atomicInteger = new AtomicInteger(this.jmxConfig.getMaxConn()); } @@ -63,17 +71,40 @@ public class JmxConnectorWrap { if (port == -1) { return false; } - return createJmxConnector(); + return safeCreateJmxConnector(); } - public synchronized void close() { + public void close() { + this.closeJmxConnect(); + } + + public void closeJmxConnect() { if (jmxConnector == null) { return; } + try { + modifyJMXConnectorLock.lock(); + + // 移除设置的backoff事件 + BackoffUtils.removeNeedBackoffEvent(buildConnectJmxFailedBackoffEventKey(physicalClusterId, brokerId)); + jmxConnector.close(); - } catch (IOException e) { - LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); + } catch (Exception e) { + LOGGER.error("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); + } finally { + jmxConnector = null; + + modifyJMXConnectorLock.unlock(); + } + } + + private boolean safeCreateJmxConnector() { + try { + modifyJMXConnectorLock.lock(); + return createJmxConnector(); + } finally { + modifyJMXConnectorLock.unlock(); } } @@ -81,6 +112,12 @@ public class JmxConnectorWrap { if (jmxConnector != null) { return true; } + + if (BackoffUtils.isNeedBackoff(buildConnectJmxFailedBackoffEventKey(physicalClusterId, brokerId))) { + // 被设置了需要进行回退,则本次不进行创建 + return false; + } + String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port); try { Map environment = new HashMap(); @@ -88,7 +125,9 @@ public class JmxConnectorWrap { // fixed by riyuetianmu environment.put(JMXConnector.CREDENTIALS, new String[]{this.jmxConfig.getUsername(), this.jmxConfig.getPassword()}); } - if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) { + + if (jmxConfig.getOpenSSL() != null && this.jmxConfig.getOpenSSL()) { + // 开启ssl environment.put(Context.SECURITY_PROTOCOL, "ssl"); SslRMIClientSocketFactory clientSocketFactory = new SslRMIClientSocketFactory(); environment.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, clientSocketFactory); @@ -96,13 +135,17 @@ public class JmxConnectorWrap { } jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment); - LOGGER.info("JMX connect success, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port); + LOGGER.info("connect JMX success, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port); return true; } catch (MalformedURLException e) { - LOGGER.error("JMX url exception, physicalClusterId:{} brokerId:{} host:{} port:{} jmxUrl:{}", physicalClusterId, brokerId, host, port, jmxUrl, e); + LOGGER.error("connect JMX failed, JMX url exception, physicalClusterId:{} brokerId:{} host:{} port:{} jmxUrl:{}.", physicalClusterId, brokerId, host, port, jmxUrl, e); } catch (Exception e) { - LOGGER.error("JMX connect exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); + LOGGER.error("connect JMX failed, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); } + + // 设置连接backoff + BackoffUtils.putNeedBackoffEvent(buildConnectJmxFailedBackoffEventKey(physicalClusterId, brokerId), this.jmxConfig.getRetryConnectBackoffTimeUnitMs()); + return false; } @@ -116,6 +159,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.getAttribute(name, attribute); + } catch (IOException ioe) { + // io错误,则重置连接 + this.closeJmxConnect(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -131,6 +179,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.getAttributes(name, attributes); + } catch (IOException ioe) { + // io错误,则重置连接 + this.closeJmxConnect(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -143,6 +196,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.queryNames(name, query); + } catch (IOException ioe) { + // io错误,则重置连接 + this.closeJmxConnect(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -165,4 +223,8 @@ public class JmxConnectorWrap { } } } + + private static String buildConnectJmxFailedBackoffEventKey(Long physicalClusterId, Integer brokerId) { + return "CONNECT_JMX_FAILED_BACK_OFF_EVENT_PHY_" + physicalClusterId + "_BROKER_" + brokerId; + } }