diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java index 0fb65589..ca7c01c4 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java @@ -90,6 +90,8 @@ public class JmxConnectorWrap { } try { jmxConnector.close(); + + jmxConnector = null; } catch (IOException e) { LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); } @@ -105,6 +107,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.getAttribute(name, attribute); + } catch (IOException ioe) { + // 如果是因为连接断开,则进行重新连接,并抛出异常 + reInitDueIOException(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -120,6 +127,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.getAttributes(name, attributes); + } catch (IOException ioe) { + // 如果是因为连接断开,则进行重新连接,并抛出异常 + reInitDueIOException(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -131,6 +143,11 @@ public class JmxConnectorWrap { acquire(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); return mBeanServerConnection.queryNames(name, query); + } catch (IOException ioe) { + // 如果是因为连接断开,则进行重新连接,并抛出异常 + reInitDueIOException(); + + throw ioe; } finally { atomicInteger.incrementAndGet(); } @@ -186,4 +203,26 @@ public class JmxConnectorWrap { } } } + + private synchronized void reInitDueIOException() { + try { + if (jmxConnector == null) { + return; + } + + // 检查是否正常 + jmxConnector.getConnectionId(); + + // 如果正常则直接返回 + return; + } catch (Exception e) { + // ignore + } + + // 关闭旧的 + this.close(); + + // 重新创建 + this.checkJmxConnectionAndInitIfNeed(); + } }