ZK-增加四字命令信息的获取

This commit is contained in:
zengqiao
2022-10-08 14:52:17 +08:00
parent ff05a951fd
commit 69a7212986
11 changed files with 679 additions and 0 deletions

View File

@@ -56,6 +56,7 @@ public enum ResultStatus {
KAFKA_OPERATE_FAILED(8010, "Kafka操作失败"),
MYSQL_OPERATE_FAILED(8020, "MySQL操作失败"),
ZK_OPERATE_FAILED(8030, "ZK操作失败"),
ZK_FOUR_LETTER_CMD_FORBIDDEN(8031, "ZK四字命令被禁止"),
ES_OPERATE_ERROR(8040, "ES操作失败"),
HTTP_REQ_ERROR(8050, "第三方http请求异常"),

View File

@@ -0,0 +1,9 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
import java.io.Serializable;
/**
* 四字命令结果数据的基础类
*/
public class BaseFourLetterWordCmdData implements Serializable {
}

View File

@@ -0,0 +1,38 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
import lombok.Data;
/**
* clientPort=2183
* dataDir=/data1/data/zkData2/version-2
* dataLogDir=/data1/data/zkLog2/version-2
* tickTime=2000
* maxClientCnxns=60
* minSessionTimeout=4000
* maxSessionTimeout=40000
* serverId=2
* initLimit=15
* syncLimit=10
* electionAlg=3
* electionPort=4445
* quorumPort=4444
* peerType=0
*/
@Data
public class ConfigCmdData extends BaseFourLetterWordCmdData {
private Long clientPort;
private String dataDir;
private String dataLogDir;
private Long tickTime;
private Long maxClientCnxns;
private Long minSessionTimeout;
private Long maxSessionTimeout;
private Integer serverId;
private String initLimit;
private Long syncLimit;
private Long electionAlg;
private Long electionPort;
private Long quorumPort;
private Long peerType;
}

View File

@@ -0,0 +1,39 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
import lombok.Data;
/**
* zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT
* zk_avg_latency 0
* zk_max_latency 399
* zk_min_latency 0
* zk_packets_received 234857
* zk_packets_sent 234860
* zk_num_alive_connections 4
* zk_outstanding_requests 0
* zk_server_state follower
* zk_znode_count 35566
* zk_watch_count 39
* zk_ephemerals_count 10
* zk_approximate_data_size 3356708
* zk_open_file_descriptor_count 35
* zk_max_file_descriptor_count 819200
*/
@Data
public class MonitorCmdData extends BaseFourLetterWordCmdData {
private String zkVersion;
private Long zkAvgLatency;
private Long zkMaxLatency;
private Long zkMinLatency;
private Long zkPacketsReceived;
private Long zkPacketsSent;
private Long zkNumAliveConnections;
private Long zkOutstandingRequests;
private String zkServerState;
private Long zkZnodeCount;
private Long zkWatchCount;
private Long zkEphemeralsCount;
private Long zkApproximateDataSize;
private Long zkOpenFileDescriptorCount;
private Long zkMaxFileDescriptorCount;
}

View File

@@ -0,0 +1,30 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
import lombok.Data;
/**
* Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
* Latency min/avg/max: 0/0/2209
* Received: 278202469
* Sent: 279449055
* Connections: 31
* Outstanding: 0
* Zxid: 0x20033fc12
* Mode: leader
* Node count: 10084
* Proposal sizes last/min/max: 36/32/31260 leader特有
*/
@Data
public class ServerCmdData extends BaseFourLetterWordCmdData {
private String zkVersion;
private Long zkAvgLatency;
private Long zkMaxLatency;
private Long zkMinLatency;
private Long zkPacketsReceived;
private Long zkPacketsSent;
private Long zkNumAliveConnections;
private Long zkOutstandingRequests;
private String zkServerState;
private Long zkZnodeCount;
private Long zkZxid;
}

View File

@@ -0,0 +1,116 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ConfigCmdData;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* clientPort=2183
* dataDir=/data1/data/zkData2/version-2
* dataLogDir=/data1/data/zkLog2/version-2
* tickTime=2000
* maxClientCnxns=60
* minSessionTimeout=4000
* maxSessionTimeout=40000
* serverId=2
* initLimit=15
* syncLimit=10
* electionAlg=3
* electionPort=4445
* quorumPort=4444
* peerType=0
*/
@Data
public class ConfigCmdDataParser implements FourLetterWordDataParser<ConfigCmdData> {
private static final ILog LOGGER = LogFactory.getLog(ConfigCmdDataParser.class);
private Result<ConfigCmdData> dataResult = null;
@Override
public String getCmd() {
return FourLetterWordUtil.ConfigCmd;
}
@Override
public ConfigCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
Map<String, String> dataMap = new HashMap<>();
for (String elem : cmdData.split("\n")) {
if (elem.isEmpty()) {
continue;
}
int idx = elem.indexOf('=');
if (idx >= 0) {
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
}
}
ConfigCmdData configCmdData = new ConfigCmdData();
dataMap.entrySet().stream().forEach(elem -> {
try {
switch (elem.getKey()) {
case "clientPort":
configCmdData.setClientPort(Long.valueOf(elem.getValue()));
break;
case "dataDir":
configCmdData.setDataDir(elem.getValue());
break;
case "dataLogDir":
configCmdData.setDataLogDir(elem.getValue());
break;
case "tickTime":
configCmdData.setTickTime(Long.valueOf(elem.getValue()));
break;
case "maxClientCnxns":
configCmdData.setMaxClientCnxns(Long.valueOf(elem.getValue()));
break;
case "minSessionTimeout":
configCmdData.setMinSessionTimeout(Long.valueOf(elem.getValue()));
break;
case "maxSessionTimeout":
configCmdData.setMaxSessionTimeout(Long.valueOf(elem.getValue()));
break;
case "serverId":
configCmdData.setServerId(Integer.valueOf(elem.getValue()));
break;
case "initLimit":
configCmdData.setInitLimit(elem.getValue());
break;
case "syncLimit":
configCmdData.setSyncLimit(Long.valueOf(elem.getValue()));
break;
case "electionAlg":
configCmdData.setElectionAlg(Long.valueOf(elem.getValue()));
break;
case "electionPort":
configCmdData.setElectionPort(Long.valueOf(elem.getValue()));
break;
case "quorumPort":
configCmdData.setQuorumPort(Long.valueOf(elem.getValue()));
break;
case "peerType":
configCmdData.setPeerType(Long.valueOf(elem.getValue()));
break;
default:
LOGGER.warn(
"class=ConfigCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
elem.getKey(), elem.getValue()
);
}
} catch (Exception e) {
LOGGER.error(
"class=ConfigCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
);
}
});
return configCmdData;
}
}

View File

@@ -0,0 +1,10 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
/**
* 四字命令结果解析类
*/
public interface FourLetterWordDataParser<T> {
String getCmd();
T parseAndInitData(Long clusterPhyId, String host, int port, String cmdData);
}

View File

@@ -0,0 +1,117 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT
* zk_avg_latency 0
* zk_max_latency 399
* zk_min_latency 0
* zk_packets_received 234857
* zk_packets_sent 234860
* zk_num_alive_connections 4
* zk_outstanding_requests 0
* zk_server_state follower
* zk_znode_count 35566
* zk_watch_count 39
* zk_ephemerals_count 10
* zk_approximate_data_size 3356708
* zk_open_file_descriptor_count 35
* zk_max_file_descriptor_count 819200
*/
@Data
public class MonitorCmdDataParser implements FourLetterWordDataParser<MonitorCmdData> {
private static final ILog LOGGER = LogFactory.getLog(MonitorCmdDataParser.class);
@Override
public String getCmd() {
return FourLetterWordUtil.MonitorCmd;
}
@Override
public MonitorCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
Map<String, String> dataMap = new HashMap<>();
for (String elem : cmdData.split("\n")) {
if (elem.isEmpty()) {
continue;
}
int idx = elem.indexOf('\t');
if (idx >= 0) {
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
}
}
MonitorCmdData monitorCmdData = new MonitorCmdData();
dataMap.entrySet().stream().forEach(elem -> {
try {
switch (elem.getKey()) {
case "zk_version":
monitorCmdData.setZkVersion(elem.getValue().split("-")[0]);
break;
case "zk_avg_latency":
monitorCmdData.setZkAvgLatency(Long.valueOf(elem.getValue()));
break;
case "zk_max_latency":
monitorCmdData.setZkMaxLatency(Long.valueOf(elem.getValue()));
break;
case "zk_min_latency":
monitorCmdData.setZkMinLatency(Long.valueOf(elem.getValue()));
break;
case "zk_packets_received":
monitorCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
break;
case "zk_packets_sent":
monitorCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
break;
case "zk_num_alive_connections":
monitorCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
break;
case "zk_outstanding_requests":
monitorCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
break;
case "zk_server_state":
monitorCmdData.setZkServerState(elem.getValue());
break;
case "zk_znode_count":
monitorCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
break;
case "zk_watch_count":
monitorCmdData.setZkWatchCount(Long.valueOf(elem.getValue()));
break;
case "zk_ephemerals_count":
monitorCmdData.setZkEphemeralsCount(Long.valueOf(elem.getValue()));
break;
case "zk_approximate_data_size":
monitorCmdData.setZkApproximateDataSize(Long.valueOf(elem.getValue()));
break;
case "zk_open_file_descriptor_count":
monitorCmdData.setZkOpenFileDescriptorCount(Long.valueOf(elem.getValue()));
break;
case "zk_max_file_descriptor_count":
monitorCmdData.setZkMaxFileDescriptorCount(Long.valueOf(elem.getValue()));
break;
default:
LOGGER.warn(
"class=MonitorCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
elem.getKey(), elem.getValue()
);
}
} catch (Exception e) {
LOGGER.error(
"class=MonitorCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
);
}
});
return monitorCmdData;
}
}

View File

@@ -0,0 +1,97 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
* Latency min/avg/max: 0/0/2209
* Received: 278202469
* Sent: 279449055
* Connections: 31
* Outstanding: 0
* Zxid: 0x20033fc12
* Mode: leader
* Node count: 10084
* Proposal sizes last/min/max: 36/32/31260 leader特有
*/
@Data
public class ServerCmdDataParser implements FourLetterWordDataParser<ServerCmdData> {
private static final ILog LOGGER = LogFactory.getLog(ServerCmdDataParser.class);
@Override
public String getCmd() {
return FourLetterWordUtil.ServerCmd;
}
@Override
public ServerCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
Map<String, String> dataMap = new HashMap<>();
for (String elem : cmdData.split("\n")) {
if (elem.isEmpty()) {
continue;
}
int idx = elem.indexOf(':');
if (idx >= 0) {
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
}
}
ServerCmdData serverCmdData = new ServerCmdData();
dataMap.entrySet().stream().forEach(elem -> {
try {
switch (elem.getKey()) {
case "Zookeeper version":
serverCmdData.setZkVersion(elem.getValue().split("-")[0]);
break;
case "Latency min/avg/max":
String[] data = elem.getValue().split("/");
serverCmdData.setZkMinLatency(Long.valueOf(data[0]));
serverCmdData.setZkAvgLatency(Long.valueOf(data[1]));
serverCmdData.setZkMaxLatency(Long.valueOf(data[2]));
break;
case "Received":
serverCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
break;
case "Sent":
serverCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
break;
case "Connections":
serverCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
break;
case "Outstanding":
serverCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
break;
case "Mode":
serverCmdData.setZkServerState(elem.getValue());
break;
case "Node count":
serverCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
break;
case "Zxid":
serverCmdData.setZkZxid(Long.parseUnsignedLong(elem.getValue().trim().substring(2), 16));
break;
default:
LOGGER.warn(
"class=ServerCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
elem.getKey(), elem.getValue()
);
}
} catch (Exception e) {
LOGGER.error(
"class=ServerCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
);
}
});
return serverCmdData;
}
}

View File

@@ -0,0 +1,163 @@
package com.xiaojukeji.know.streaming.km.common.utils.zookeeper;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.FourLetterWordDataParser;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.common.X509Util;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Set;
public class FourLetterWordUtil {
private static final ILog LOGGER = LogFactory.getLog(FourLetterWordUtil.class);
public static final String MonitorCmd = "mntr";
public static final String ConfigCmd = "conf";
public static final String ServerCmd = "srvr";
private static final Set<String> supportedCommands = new HashSet<>();
public static <T> Result<T> executeFourLetterCmd(Long clusterPhyId,
String host,
int port,
boolean secure,
int timeout,
FourLetterWordDataParser<T> dataParser) {
try {
if (!supportedCommands.contains(dataParser.getCmd())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, String.format("ZK %s命令暂未进行支持", dataParser.getCmd()));
}
String cmdData = send4LetterWord(host, port, dataParser.getCmd(), secure, timeout);
if (cmdData.contains("not executed because it is not in the whitelist.")) {
return Result.buildFromRSAndMsg(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN, cmdData);
}
if (ValidateUtils.isBlank(cmdData)) {
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, cmdData);
}
return Result.buildSuc(dataParser.parseAndInitData(clusterPhyId, host, port, cmdData));
} catch (Exception e) {
LOGGER.error(
"class=FourLetterWordUtil||method=executeFourLetterCmd||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
clusterPhyId, host, port, dataParser.getCmd(), secure, timeout, e
);
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
}
}
/**************************************************** private method ****************************************************/
private static String send4LetterWord(
String host,
int port,
String cmd,
boolean secure,
int timeout) throws IOException, X509Exception.SSLContextException {
long startTime = System.currentTimeMillis();
LOGGER.info("connecting to {} {}", host, port);
Socket socket = null;
OutputStream outputStream = null;
BufferedReader bufferedReader = null;
try {
InetSocketAddress hostaddress = host != null
? new InetSocketAddress(host, port)
: new InetSocketAddress(InetAddress.getByName(null), port);
if (secure) {
LOGGER.info("using secure socket");
try (X509Util x509Util = new ClientX509Util()) {
SSLContext sslContext = x509Util.getDefaultSSLContext();
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
SSLSocket sslSock = (SSLSocket) socketFactory.createSocket();
sslSock.connect(hostaddress, timeout);
sslSock.startHandshake();
socket = sslSock;
}
} else {
socket = new Socket();
socket.connect(hostaddress, timeout);
}
socket.setSoTimeout(timeout);
outputStream = socket.getOutputStream();
outputStream.write(cmd.getBytes());
outputStream.flush();
// 等待InputStream有数据
while (System.currentTimeMillis() - startTime <= timeout && socket.getInputStream().available() <= 0) {
BackoffUtils.backoff(10);
}
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
StringBuilder sb = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
sb.append(line).append("\n");
}
return sb.toString();
} catch (SocketTimeoutException e) {
throw new IOException("Exception while executing four letter word: " + cmd, e);
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
LOGGER.error(
"class=FourLetterWordUtil||method=send4LetterWord||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
host, port, cmd, secure, timeout, e
);
}
}
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException e) {
LOGGER.error(
"class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
host, port, cmd, secure, timeout, e
);
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
LOGGER.error(
"class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
host, port, cmd, secure, timeout, e
);
}
}
}
}
static {
supportedCommands.add(MonitorCmd);
supportedCommands.add(ConfigCmd);
supportedCommands.add(ServerCmd);
}
}

View File

@@ -0,0 +1,59 @@
package com.xiaojukeji.know.streaming.km.common.utils.zookeeper;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import org.apache.zookeeper.client.ConnectStringParser;
import org.apache.zookeeper.common.NetUtils;
import java.util.ArrayList;
import java.util.List;
import static org.apache.zookeeper.common.StringUtils.split;
public class ZookeeperUtils {
private static final int DEFAULT_PORT = 2181;
/**
* 解析ZK地址
* @see ConnectStringParser
*/
public static List<Tuple<String, Integer>> connectStringParser(String connectString) throws Exception {
List<Tuple<String, Integer>> ipPortList = new ArrayList<>();
if (connectString == null) {
return ipPortList;
}
// parse out chroot, if any
int off = connectString.indexOf('/');
if (off >= 0) {
connectString = connectString.substring(0, off);
}
List<String> hostsList = split(connectString, ",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
if (hostAndPort.length != 0) {
host = hostAndPort[0];
if (hostAndPort.length == 2) {
port = Integer.parseInt(hostAndPort[1]);
}
} else {
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
// otherwise : is at the end of the string, ignore
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
}
ipPortList.add(new Tuple<>(host, port));
}
return ipPortList;
}
}