diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/result/ResultStatus.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/result/ResultStatus.java index 842e1106..252146c9 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/result/ResultStatus.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/result/ResultStatus.java @@ -56,6 +56,7 @@ public enum ResultStatus { KAFKA_OPERATE_FAILED(8010, "Kafka操作失败"), MYSQL_OPERATE_FAILED(8020, "MySQL操作失败"), ZK_OPERATE_FAILED(8030, "ZK操作失败"), + ZK_FOUR_LETTER_CMD_FORBIDDEN(8031, "ZK四字命令被禁止"), ES_OPERATE_ERROR(8040, "ES操作失败"), HTTP_REQ_ERROR(8050, "第三方http请求异常"), diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/BaseFourLetterWordCmdData.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/BaseFourLetterWordCmdData.java new file mode 100644 index 00000000..3e5713a8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/BaseFourLetterWordCmdData.java @@ -0,0 +1,9 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword; + +import java.io.Serializable; + +/** + * 四字命令结果数据的基础类 + */ +public class BaseFourLetterWordCmdData implements Serializable { +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ConfigCmdData.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ConfigCmdData.java new file mode 100644 index 00000000..d0982f47 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ConfigCmdData.java @@ -0,0 +1,38 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword; + +import lombok.Data; + + +/** + * clientPort=2183 + * dataDir=/data1/data/zkData2/version-2 + * dataLogDir=/data1/data/zkLog2/version-2 + * tickTime=2000 + * maxClientCnxns=60 + * minSessionTimeout=4000 + * maxSessionTimeout=40000 + * serverId=2 + * initLimit=15 + * syncLimit=10 + * electionAlg=3 + * electionPort=4445 + * quorumPort=4444 + * peerType=0 + */ +@Data +public class ConfigCmdData extends BaseFourLetterWordCmdData { + private Long clientPort; + private String dataDir; + private String dataLogDir; + private Long tickTime; + private Long maxClientCnxns; + private Long minSessionTimeout; + private Long maxSessionTimeout; + private Integer serverId; + private String initLimit; + private Long syncLimit; + private Long electionAlg; + private Long electionPort; + private Long quorumPort; + private Long peerType; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/MonitorCmdData.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/MonitorCmdData.java new file mode 100644 index 00000000..7ea1339b --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/MonitorCmdData.java @@ -0,0 +1,39 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword; + +import lombok.Data; + +/** + * zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT + * zk_avg_latency 0 + * zk_max_latency 399 + * zk_min_latency 0 + * zk_packets_received 234857 + * zk_packets_sent 234860 + * zk_num_alive_connections 4 + * zk_outstanding_requests 0 + * zk_server_state follower + * zk_znode_count 35566 + * zk_watch_count 39 + * zk_ephemerals_count 10 + * zk_approximate_data_size 3356708 + * zk_open_file_descriptor_count 35 + * zk_max_file_descriptor_count 819200 + */ +@Data +public class MonitorCmdData extends BaseFourLetterWordCmdData { + private String zkVersion; + private Long zkAvgLatency; + private Long zkMaxLatency; + private Long zkMinLatency; + private Long zkPacketsReceived; + private Long zkPacketsSent; + private Long zkNumAliveConnections; + private Long zkOutstandingRequests; + private String zkServerState; + private Long zkZnodeCount; + private Long zkWatchCount; + private Long zkEphemeralsCount; + private Long zkApproximateDataSize; + private Long zkOpenFileDescriptorCount; + private Long zkMaxFileDescriptorCount; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ServerCmdData.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ServerCmdData.java new file mode 100644 index 00000000..38bd2cf9 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ServerCmdData.java @@ -0,0 +1,30 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword; + +import lombok.Data; + +/** + * Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT + * Latency min/avg/max: 0/0/2209 + * Received: 278202469 + * Sent: 279449055 + * Connections: 31 + * Outstanding: 0 + * Zxid: 0x20033fc12 + * Mode: leader + * Node count: 10084 + * Proposal sizes last/min/max: 36/32/31260 leader特有 + */ +@Data +public class ServerCmdData extends BaseFourLetterWordCmdData { + private String zkVersion; + private Long zkAvgLatency; + private Long zkMaxLatency; + private Long zkMinLatency; + private Long zkPacketsReceived; + private Long zkPacketsSent; + private Long zkNumAliveConnections; + private Long zkOutstandingRequests; + private String zkServerState; + private Long zkZnodeCount; + private Long zkZxid; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ConfigCmdDataParser.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ConfigCmdDataParser.java new file mode 100644 index 00000000..35ec153b --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ConfigCmdDataParser.java @@ -0,0 +1,116 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ConfigCmdData; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import lombok.Data; + +import java.util.HashMap; +import java.util.Map; + +/** + * clientPort=2183 + * dataDir=/data1/data/zkData2/version-2 + * dataLogDir=/data1/data/zkLog2/version-2 + * tickTime=2000 + * maxClientCnxns=60 + * minSessionTimeout=4000 + * maxSessionTimeout=40000 + * serverId=2 + * initLimit=15 + * syncLimit=10 + * electionAlg=3 + * electionPort=4445 + * quorumPort=4444 + * peerType=0 + */ +@Data +public class ConfigCmdDataParser implements FourLetterWordDataParser { + private static final ILog LOGGER = LogFactory.getLog(ConfigCmdDataParser.class); + + private Result dataResult = null; + + @Override + public String getCmd() { + return FourLetterWordUtil.ConfigCmd; + } + + @Override + public ConfigCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) { + Map dataMap = new HashMap<>(); + for (String elem : cmdData.split("\n")) { + if (elem.isEmpty()) { + continue; + } + + int idx = elem.indexOf('='); + if (idx >= 0) { + dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim()); + } + } + + ConfigCmdData configCmdData = new ConfigCmdData(); + dataMap.entrySet().stream().forEach(elem -> { + try { + switch (elem.getKey()) { + case "clientPort": + configCmdData.setClientPort(Long.valueOf(elem.getValue())); + break; + case "dataDir": + configCmdData.setDataDir(elem.getValue()); + break; + case "dataLogDir": + configCmdData.setDataLogDir(elem.getValue()); + break; + case "tickTime": + configCmdData.setTickTime(Long.valueOf(elem.getValue())); + break; + case "maxClientCnxns": + configCmdData.setMaxClientCnxns(Long.valueOf(elem.getValue())); + break; + case "minSessionTimeout": + configCmdData.setMinSessionTimeout(Long.valueOf(elem.getValue())); + break; + case "maxSessionTimeout": + configCmdData.setMaxSessionTimeout(Long.valueOf(elem.getValue())); + break; + case "serverId": + configCmdData.setServerId(Integer.valueOf(elem.getValue())); + break; + case "initLimit": + configCmdData.setInitLimit(elem.getValue()); + break; + case "syncLimit": + configCmdData.setSyncLimit(Long.valueOf(elem.getValue())); + break; + case "electionAlg": + configCmdData.setElectionAlg(Long.valueOf(elem.getValue())); + break; + case "electionPort": + configCmdData.setElectionPort(Long.valueOf(elem.getValue())); + break; + case "quorumPort": + configCmdData.setQuorumPort(Long.valueOf(elem.getValue())); + break; + case "peerType": + configCmdData.setPeerType(Long.valueOf(elem.getValue())); + break; + default: + LOGGER.warn( + "class=ConfigCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!", + elem.getKey(), elem.getValue() + ); + } + } catch (Exception e) { + LOGGER.error( + "class=ConfigCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!", + clusterPhyId, host, port, elem.getKey(), elem.getValue(), e + ); + } + }); + + return configCmdData; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/FourLetterWordDataParser.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/FourLetterWordDataParser.java new file mode 100644 index 00000000..58bb2368 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/FourLetterWordDataParser.java @@ -0,0 +1,10 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser; + +/** + * 四字命令结果解析类 + */ +public interface FourLetterWordDataParser { + String getCmd(); + + T parseAndInitData(Long clusterPhyId, String host, int port, String cmdData); +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/MonitorCmdDataParser.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/MonitorCmdDataParser.java new file mode 100644 index 00000000..a33f4da3 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/MonitorCmdDataParser.java @@ -0,0 +1,117 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import lombok.Data; + +import java.util.HashMap; +import java.util.Map; + +/** + * zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT + * zk_avg_latency 0 + * zk_max_latency 399 + * zk_min_latency 0 + * zk_packets_received 234857 + * zk_packets_sent 234860 + * zk_num_alive_connections 4 + * zk_outstanding_requests 0 + * zk_server_state follower + * zk_znode_count 35566 + * zk_watch_count 39 + * zk_ephemerals_count 10 + * zk_approximate_data_size 3356708 + * zk_open_file_descriptor_count 35 + * zk_max_file_descriptor_count 819200 + */ +@Data +public class MonitorCmdDataParser implements FourLetterWordDataParser { + private static final ILog LOGGER = LogFactory.getLog(MonitorCmdDataParser.class); + + @Override + public String getCmd() { + return FourLetterWordUtil.MonitorCmd; + } + + @Override + public MonitorCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) { + Map dataMap = new HashMap<>(); + for (String elem : cmdData.split("\n")) { + if (elem.isEmpty()) { + continue; + } + + int idx = elem.indexOf('\t'); + if (idx >= 0) { + dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim()); + } + } + + MonitorCmdData monitorCmdData = new MonitorCmdData(); + dataMap.entrySet().stream().forEach(elem -> { + try { + switch (elem.getKey()) { + case "zk_version": + monitorCmdData.setZkVersion(elem.getValue().split("-")[0]); + break; + case "zk_avg_latency": + monitorCmdData.setZkAvgLatency(Long.valueOf(elem.getValue())); + break; + case "zk_max_latency": + monitorCmdData.setZkMaxLatency(Long.valueOf(elem.getValue())); + break; + case "zk_min_latency": + monitorCmdData.setZkMinLatency(Long.valueOf(elem.getValue())); + break; + case "zk_packets_received": + monitorCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue())); + break; + case "zk_packets_sent": + monitorCmdData.setZkPacketsSent(Long.valueOf(elem.getValue())); + break; + case "zk_num_alive_connections": + monitorCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue())); + break; + case "zk_outstanding_requests": + monitorCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue())); + break; + case "zk_server_state": + monitorCmdData.setZkServerState(elem.getValue()); + break; + case "zk_znode_count": + monitorCmdData.setZkZnodeCount(Long.valueOf(elem.getValue())); + break; + case "zk_watch_count": + monitorCmdData.setZkWatchCount(Long.valueOf(elem.getValue())); + break; + case "zk_ephemerals_count": + monitorCmdData.setZkEphemeralsCount(Long.valueOf(elem.getValue())); + break; + case "zk_approximate_data_size": + monitorCmdData.setZkApproximateDataSize(Long.valueOf(elem.getValue())); + break; + case "zk_open_file_descriptor_count": + monitorCmdData.setZkOpenFileDescriptorCount(Long.valueOf(elem.getValue())); + break; + case "zk_max_file_descriptor_count": + monitorCmdData.setZkMaxFileDescriptorCount(Long.valueOf(elem.getValue())); + break; + default: + LOGGER.warn( + "class=MonitorCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!", + elem.getKey(), elem.getValue() + ); + } + } catch (Exception e) { + LOGGER.error( + "class=MonitorCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!", + clusterPhyId, host, port, elem.getKey(), elem.getValue(), e + ); + } + }); + + return monitorCmdData; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ServerCmdDataParser.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ServerCmdDataParser.java new file mode 100644 index 00000000..f91f19a8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ServerCmdDataParser.java @@ -0,0 +1,97 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import lombok.Data; + +import java.util.HashMap; +import java.util.Map; + +/** + * Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT + * Latency min/avg/max: 0/0/2209 + * Received: 278202469 + * Sent: 279449055 + * Connections: 31 + * Outstanding: 0 + * Zxid: 0x20033fc12 + * Mode: leader + * Node count: 10084 + * Proposal sizes last/min/max: 36/32/31260 leader特有 + */ +@Data +public class ServerCmdDataParser implements FourLetterWordDataParser { + private static final ILog LOGGER = LogFactory.getLog(ServerCmdDataParser.class); + + @Override + public String getCmd() { + return FourLetterWordUtil.ServerCmd; + } + + @Override + public ServerCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) { + Map dataMap = new HashMap<>(); + for (String elem : cmdData.split("\n")) { + if (elem.isEmpty()) { + continue; + } + + int idx = elem.indexOf(':'); + if (idx >= 0) { + dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim()); + } + } + + ServerCmdData serverCmdData = new ServerCmdData(); + dataMap.entrySet().stream().forEach(elem -> { + try { + switch (elem.getKey()) { + case "Zookeeper version": + serverCmdData.setZkVersion(elem.getValue().split("-")[0]); + break; + case "Latency min/avg/max": + String[] data = elem.getValue().split("/"); + serverCmdData.setZkMinLatency(Long.valueOf(data[0])); + serverCmdData.setZkAvgLatency(Long.valueOf(data[1])); + serverCmdData.setZkMaxLatency(Long.valueOf(data[2])); + break; + case "Received": + serverCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue())); + break; + case "Sent": + serverCmdData.setZkPacketsSent(Long.valueOf(elem.getValue())); + break; + case "Connections": + serverCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue())); + break; + case "Outstanding": + serverCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue())); + break; + case "Mode": + serverCmdData.setZkServerState(elem.getValue()); + break; + case "Node count": + serverCmdData.setZkZnodeCount(Long.valueOf(elem.getValue())); + break; + case "Zxid": + serverCmdData.setZkZxid(Long.parseUnsignedLong(elem.getValue().trim().substring(2), 16)); + break; + default: + LOGGER.warn( + "class=ServerCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!", + elem.getKey(), elem.getValue() + ); + } + } catch (Exception e) { + LOGGER.error( + "class=ServerCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!", + clusterPhyId, host, port, elem.getKey(), elem.getValue(), e + ); + } + }); + + return serverCmdData; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/FourLetterWordUtil.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/FourLetterWordUtil.java new file mode 100644 index 00000000..a3ae31af --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/FourLetterWordUtil.java @@ -0,0 +1,163 @@ +package com.xiaojukeji.know.streaming.km.common.utils.zookeeper; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.FourLetterWordDataParser; +import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import org.apache.zookeeper.common.ClientX509Util; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.common.X509Util; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.HashSet; +import java.util.Set; + +public class FourLetterWordUtil { + private static final ILog LOGGER = LogFactory.getLog(FourLetterWordUtil.class); + + public static final String MonitorCmd = "mntr"; + public static final String ConfigCmd = "conf"; + public static final String ServerCmd = "srvr"; + + private static final Set supportedCommands = new HashSet<>(); + + public static Result executeFourLetterCmd(Long clusterPhyId, + String host, + int port, + boolean secure, + int timeout, + FourLetterWordDataParser dataParser) { + try { + if (!supportedCommands.contains(dataParser.getCmd())) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, String.format("ZK %s命令暂未进行支持", dataParser.getCmd())); + } + + String cmdData = send4LetterWord(host, port, dataParser.getCmd(), secure, timeout); + if (cmdData.contains("not executed because it is not in the whitelist.")) { + return Result.buildFromRSAndMsg(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN, cmdData); + } + if (ValidateUtils.isBlank(cmdData)) { + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, cmdData); + } + + return Result.buildSuc(dataParser.parseAndInitData(clusterPhyId, host, port, cmdData)); + } catch (Exception e) { + LOGGER.error( + "class=FourLetterWordUtil||method=executeFourLetterCmd||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!", + clusterPhyId, host, port, dataParser.getCmd(), secure, timeout, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } + + + /**************************************************** private method ****************************************************/ + + private static String send4LetterWord( + String host, + int port, + String cmd, + boolean secure, + int timeout) throws IOException, X509Exception.SSLContextException { + long startTime = System.currentTimeMillis(); + + LOGGER.info("connecting to {} {}", host, port); + + Socket socket = null; + OutputStream outputStream = null; + BufferedReader bufferedReader = null; + try { + InetSocketAddress hostaddress = host != null + ? new InetSocketAddress(host, port) + : new InetSocketAddress(InetAddress.getByName(null), port); + if (secure) { + LOGGER.info("using secure socket"); + try (X509Util x509Util = new ClientX509Util()) { + SSLContext sslContext = x509Util.getDefaultSSLContext(); + SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + SSLSocket sslSock = (SSLSocket) socketFactory.createSocket(); + sslSock.connect(hostaddress, timeout); + sslSock.startHandshake(); + socket = sslSock; + } + } else { + socket = new Socket(); + socket.connect(hostaddress, timeout); + } + socket.setSoTimeout(timeout); + + outputStream = socket.getOutputStream(); + outputStream.write(cmd.getBytes()); + outputStream.flush(); + + // 等待InputStream有数据 + while (System.currentTimeMillis() - startTime <= timeout && socket.getInputStream().available() <= 0) { + BackoffUtils.backoff(10); + } + + bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + sb.append(line).append("\n"); + } + return sb.toString(); + } catch (SocketTimeoutException e) { + throw new IOException("Exception while executing four letter word: " + cmd, e); + } finally { + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + LOGGER.error( + "class=FourLetterWordUtil||method=send4LetterWord||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!", + host, port, cmd, secure, timeout, e + ); + } + } + + if (bufferedReader != null) { + try { + bufferedReader.close(); + } catch (IOException e) { + LOGGER.error( + "class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!", + host, port, cmd, secure, timeout, e + ); + } + } + + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + LOGGER.error( + "class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!", + host, port, cmd, secure, timeout, e + ); + } + } + } + } + + static { + supportedCommands.add(MonitorCmd); + supportedCommands.add(ConfigCmd); + supportedCommands.add(ServerCmd); + } + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/ZookeeperUtils.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/ZookeeperUtils.java new file mode 100644 index 00000000..9d8c6c5b --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/ZookeeperUtils.java @@ -0,0 +1,59 @@ +package com.xiaojukeji.know.streaming.km.common.utils.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import org.apache.zookeeper.client.ConnectStringParser; +import org.apache.zookeeper.common.NetUtils; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.zookeeper.common.StringUtils.split; + +public class ZookeeperUtils { + private static final int DEFAULT_PORT = 2181; + + /** + * 解析ZK地址 + * @see ConnectStringParser + */ + public static List> connectStringParser(String connectString) throws Exception { + List> ipPortList = new ArrayList<>(); + + if (connectString == null) { + return ipPortList; + } + + // parse out chroot, if any + int off = connectString.indexOf('/'); + if (off >= 0) { + connectString = connectString.substring(0, off); + } + + List hostsList = split(connectString, ","); + for (String host : hostsList) { + int port = DEFAULT_PORT; + String[] hostAndPort = NetUtils.getIPV6HostAndPort(host); + if (hostAndPort.length != 0) { + host = hostAndPort[0]; + if (hostAndPort.length == 2) { + port = Integer.parseInt(hostAndPort[1]); + } + } else { + int pidx = host.lastIndexOf(':'); + if (pidx >= 0) { + // otherwise : is at the end of the string, ignore + if (pidx < host.length() - 1) { + port = Integer.parseInt(host.substring(pidx + 1)); + } + host = host.substring(0, pidx); + } + } + + ipPortList.add(new Tuple<>(host, port)); + } + + return ipPortList; + } + + +}