mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-25 04:32:12 +08:00
增加简单回退工具类,增加Jmx连接失败回退功能机制,优化Jmx连接失败日志
This commit is contained in:
@@ -1,9 +1,18 @@
|
||||
package com.xiaojukeji.kafka.manager.common.utils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class BackoffUtils {
|
||||
private BackoffUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* 需要进行延迟的事件
|
||||
* <事件名,延迟结束事件>
|
||||
*/
|
||||
private static final Map<String, Long> NEED_BACK_OFF_EVENT_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
public static void backoff(long timeUnitMs) {
|
||||
if (timeUnitMs <= 0) {
|
||||
return;
|
||||
@@ -17,4 +26,50 @@ public class BackoffUtils {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录延迟设置
|
||||
* @param backoffEventKey 回退事件key
|
||||
* @param backoffTimeUnitMs 回退时间(ms)
|
||||
*/
|
||||
public static void putNeedBackoffEvent(String backoffEventKey, Long backoffTimeUnitMs) {
|
||||
if (backoffEventKey == null || backoffTimeUnitMs == null || backoffTimeUnitMs <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
NEED_BACK_OFF_EVENT_MAP.put(backoffEventKey, backoffTimeUnitMs + System.currentTimeMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除回退设置
|
||||
* @param backoffEventKey 回退事件key
|
||||
*/
|
||||
public static void removeNeedBackoffEvent(String backoffEventKey) {
|
||||
NEED_BACK_OFF_EVENT_MAP.remove(backoffEventKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否需要回退
|
||||
* @param backoffEventKey 回退事件key
|
||||
* @return
|
||||
*/
|
||||
public static boolean isNeedBackoff(String backoffEventKey) {
|
||||
Long backoffEventEndTimeUnitMs = NEED_BACK_OFF_EVENT_MAP.get(backoffEventKey);
|
||||
if (backoffEventEndTimeUnitMs == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (backoffEventEndTimeUnitMs > System.currentTimeMillis()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 移除
|
||||
try {
|
||||
NEED_BACK_OFF_EVENT_MAP.remove(backoffEventKey, backoffEventEndTimeUnitMs);
|
||||
} catch (Exception e) {
|
||||
// 如果key不存在,这里可能出现NPE,不过不管什么异常都可以忽略
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
package com.xiaojukeji.kafka.manager.common.utils.jmx;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.ToString;
|
||||
|
||||
@Data
|
||||
@ToString
|
||||
public class JmxConfig {
|
||||
/**
|
||||
* 单台最大连接数
|
||||
@@ -21,45 +26,8 @@ public class JmxConfig {
|
||||
*/
|
||||
private Boolean openSSL;
|
||||
|
||||
public Integer getMaxConn() {
|
||||
return maxConn;
|
||||
}
|
||||
|
||||
public void setMaxConn(Integer maxConn) {
|
||||
this.maxConn = maxConn;
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
return username;
|
||||
}
|
||||
|
||||
public void setUsername(String username) {
|
||||
this.username = username;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public Boolean isOpenSSL() {
|
||||
return openSSL;
|
||||
}
|
||||
|
||||
public void setOpenSSL(Boolean openSSL) {
|
||||
this.openSSL = openSSL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "JmxConfig{" +
|
||||
"maxConn=" + maxConn +
|
||||
", username='" + username + '\'' +
|
||||
", password='" + password + '\'' +
|
||||
", openSSL=" + openSSL +
|
||||
'}';
|
||||
}
|
||||
/**
|
||||
* 连接重试回退事件
|
||||
*/
|
||||
private Long retryConnectBackoffTimeUnitMs;
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* JMXConnector包装类
|
||||
@@ -41,6 +42,8 @@ public class JmxConnectorWrap {
|
||||
|
||||
private JmxConfig jmxConfig;
|
||||
|
||||
private final ReentrantLock modifyJMXConnectorLock = new ReentrantLock();
|
||||
|
||||
public JmxConnectorWrap(Long physicalClusterId, Integer brokerId, String host, int port, JmxConfig jmxConfig) {
|
||||
this.physicalClusterId = physicalClusterId;
|
||||
this.brokerId = brokerId;
|
||||
@@ -51,7 +54,12 @@ public class JmxConnectorWrap {
|
||||
this.jmxConfig = new JmxConfig();
|
||||
}
|
||||
if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getMaxConn())) {
|
||||
this.jmxConfig.setMaxConn(1);
|
||||
// 默认设置20
|
||||
this.jmxConfig.setMaxConn(20);
|
||||
}
|
||||
if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getRetryConnectBackoffTimeUnitMs())) {
|
||||
// 默认回退10分钟
|
||||
this.jmxConfig.setRetryConnectBackoffTimeUnitMs(10 * 60 * 1000L);
|
||||
}
|
||||
this.atomicInteger = new AtomicInteger(this.jmxConfig.getMaxConn());
|
||||
}
|
||||
@@ -63,17 +71,40 @@ public class JmxConnectorWrap {
|
||||
if (port == -1) {
|
||||
return false;
|
||||
}
|
||||
return createJmxConnector();
|
||||
return safeCreateJmxConnector();
|
||||
}
|
||||
|
||||
public synchronized void close() {
|
||||
public void close() {
|
||||
this.closeJmxConnect();
|
||||
}
|
||||
|
||||
public void closeJmxConnect() {
|
||||
if (jmxConnector == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
modifyJMXConnectorLock.lock();
|
||||
|
||||
// 移除设置的backoff事件
|
||||
BackoffUtils.removeNeedBackoffEvent(buildConnectJmxFailedBackoffEventKey(physicalClusterId, brokerId));
|
||||
|
||||
jmxConnector.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e);
|
||||
} finally {
|
||||
jmxConnector = null;
|
||||
|
||||
modifyJMXConnectorLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean safeCreateJmxConnector() {
|
||||
try {
|
||||
modifyJMXConnectorLock.lock();
|
||||
return createJmxConnector();
|
||||
} finally {
|
||||
modifyJMXConnectorLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,6 +112,12 @@ public class JmxConnectorWrap {
|
||||
if (jmxConnector != null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (BackoffUtils.isNeedBackoff(buildConnectJmxFailedBackoffEventKey(physicalClusterId, brokerId))) {
|
||||
// 被设置了需要进行回退,则本次不进行创建
|
||||
return false;
|
||||
}
|
||||
|
||||
String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port);
|
||||
try {
|
||||
Map<String, Object> environment = new HashMap<String, Object>();
|
||||
@@ -88,7 +125,9 @@ public class JmxConnectorWrap {
|
||||
// fixed by riyuetianmu
|
||||
environment.put(JMXConnector.CREDENTIALS, new String[]{this.jmxConfig.getUsername(), this.jmxConfig.getPassword()});
|
||||
}
|
||||
if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) {
|
||||
|
||||
if (jmxConfig.getOpenSSL() != null && this.jmxConfig.getOpenSSL()) {
|
||||
// 开启ssl
|
||||
environment.put(Context.SECURITY_PROTOCOL, "ssl");
|
||||
SslRMIClientSocketFactory clientSocketFactory = new SslRMIClientSocketFactory();
|
||||
environment.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, clientSocketFactory);
|
||||
@@ -96,13 +135,17 @@ public class JmxConnectorWrap {
|
||||
}
|
||||
|
||||
jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment);
|
||||
LOGGER.info("JMX connect success, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port);
|
||||
LOGGER.info("connect JMX success, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port);
|
||||
return true;
|
||||
} catch (MalformedURLException e) {
|
||||
LOGGER.error("JMX url exception, physicalClusterId:{} brokerId:{} host:{} port:{} jmxUrl:{}", physicalClusterId, brokerId, host, port, jmxUrl, e);
|
||||
LOGGER.error("connect JMX failed, JMX url exception, physicalClusterId:{} brokerId:{} host:{} port:{} jmxUrl:{}.", physicalClusterId, brokerId, host, port, jmxUrl, e);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("JMX connect exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e);
|
||||
LOGGER.error("connect JMX failed, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e);
|
||||
}
|
||||
|
||||
// 设置连接backoff
|
||||
BackoffUtils.putNeedBackoffEvent(buildConnectJmxFailedBackoffEventKey(physicalClusterId, brokerId), this.jmxConfig.getRetryConnectBackoffTimeUnitMs());
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -116,6 +159,11 @@ public class JmxConnectorWrap {
|
||||
acquire();
|
||||
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
|
||||
return mBeanServerConnection.getAttribute(name, attribute);
|
||||
} catch (IOException ioe) {
|
||||
// io错误,则重置连接
|
||||
this.closeJmxConnect();
|
||||
|
||||
throw ioe;
|
||||
} finally {
|
||||
atomicInteger.incrementAndGet();
|
||||
}
|
||||
@@ -131,6 +179,11 @@ public class JmxConnectorWrap {
|
||||
acquire();
|
||||
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
|
||||
return mBeanServerConnection.getAttributes(name, attributes);
|
||||
} catch (IOException ioe) {
|
||||
// io错误,则重置连接
|
||||
this.closeJmxConnect();
|
||||
|
||||
throw ioe;
|
||||
} finally {
|
||||
atomicInteger.incrementAndGet();
|
||||
}
|
||||
@@ -143,6 +196,11 @@ public class JmxConnectorWrap {
|
||||
acquire();
|
||||
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
|
||||
return mBeanServerConnection.queryNames(name, query);
|
||||
} catch (IOException ioe) {
|
||||
// io错误,则重置连接
|
||||
this.closeJmxConnect();
|
||||
|
||||
throw ioe;
|
||||
} finally {
|
||||
atomicInteger.incrementAndGet();
|
||||
}
|
||||
@@ -165,4 +223,8 @@ public class JmxConnectorWrap {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static String buildConnectJmxFailedBackoffEventKey(Long physicalClusterId, Integer brokerId) {
|
||||
return "CONNECT_JMX_FAILED_BACK_OFF_EVENT_PHY_" + physicalClusterId + "_BROKER_" + brokerId;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user