mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
ZK-增加四字命令信息的获取
This commit is contained in:
@@ -56,6 +56,7 @@ public enum ResultStatus {
|
|||||||
KAFKA_OPERATE_FAILED(8010, "Kafka操作失败"),
|
KAFKA_OPERATE_FAILED(8010, "Kafka操作失败"),
|
||||||
MYSQL_OPERATE_FAILED(8020, "MySQL操作失败"),
|
MYSQL_OPERATE_FAILED(8020, "MySQL操作失败"),
|
||||||
ZK_OPERATE_FAILED(8030, "ZK操作失败"),
|
ZK_OPERATE_FAILED(8030, "ZK操作失败"),
|
||||||
|
ZK_FOUR_LETTER_CMD_FORBIDDEN(8031, "ZK四字命令被禁止"),
|
||||||
ES_OPERATE_ERROR(8040, "ES操作失败"),
|
ES_OPERATE_ERROR(8040, "ES操作失败"),
|
||||||
HTTP_REQ_ERROR(8050, "第三方http请求异常"),
|
HTTP_REQ_ERROR(8050, "第三方http请求异常"),
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,9 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 四字命令结果数据的基础类
|
||||||
|
*/
|
||||||
|
public class BaseFourLetterWordCmdData implements Serializable {
|
||||||
|
}
|
||||||
@@ -0,0 +1,38 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* clientPort=2183
|
||||||
|
* dataDir=/data1/data/zkData2/version-2
|
||||||
|
* dataLogDir=/data1/data/zkLog2/version-2
|
||||||
|
* tickTime=2000
|
||||||
|
* maxClientCnxns=60
|
||||||
|
* minSessionTimeout=4000
|
||||||
|
* maxSessionTimeout=40000
|
||||||
|
* serverId=2
|
||||||
|
* initLimit=15
|
||||||
|
* syncLimit=10
|
||||||
|
* electionAlg=3
|
||||||
|
* electionPort=4445
|
||||||
|
* quorumPort=4444
|
||||||
|
* peerType=0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class ConfigCmdData extends BaseFourLetterWordCmdData {
|
||||||
|
private Long clientPort;
|
||||||
|
private String dataDir;
|
||||||
|
private String dataLogDir;
|
||||||
|
private Long tickTime;
|
||||||
|
private Long maxClientCnxns;
|
||||||
|
private Long minSessionTimeout;
|
||||||
|
private Long maxSessionTimeout;
|
||||||
|
private Integer serverId;
|
||||||
|
private String initLimit;
|
||||||
|
private Long syncLimit;
|
||||||
|
private Long electionAlg;
|
||||||
|
private Long electionPort;
|
||||||
|
private Long quorumPort;
|
||||||
|
private Long peerType;
|
||||||
|
}
|
||||||
@@ -0,0 +1,39 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT
|
||||||
|
* zk_avg_latency 0
|
||||||
|
* zk_max_latency 399
|
||||||
|
* zk_min_latency 0
|
||||||
|
* zk_packets_received 234857
|
||||||
|
* zk_packets_sent 234860
|
||||||
|
* zk_num_alive_connections 4
|
||||||
|
* zk_outstanding_requests 0
|
||||||
|
* zk_server_state follower
|
||||||
|
* zk_znode_count 35566
|
||||||
|
* zk_watch_count 39
|
||||||
|
* zk_ephemerals_count 10
|
||||||
|
* zk_approximate_data_size 3356708
|
||||||
|
* zk_open_file_descriptor_count 35
|
||||||
|
* zk_max_file_descriptor_count 819200
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class MonitorCmdData extends BaseFourLetterWordCmdData {
|
||||||
|
private String zkVersion;
|
||||||
|
private Long zkAvgLatency;
|
||||||
|
private Long zkMaxLatency;
|
||||||
|
private Long zkMinLatency;
|
||||||
|
private Long zkPacketsReceived;
|
||||||
|
private Long zkPacketsSent;
|
||||||
|
private Long zkNumAliveConnections;
|
||||||
|
private Long zkOutstandingRequests;
|
||||||
|
private String zkServerState;
|
||||||
|
private Long zkZnodeCount;
|
||||||
|
private Long zkWatchCount;
|
||||||
|
private Long zkEphemeralsCount;
|
||||||
|
private Long zkApproximateDataSize;
|
||||||
|
private Long zkOpenFileDescriptorCount;
|
||||||
|
private Long zkMaxFileDescriptorCount;
|
||||||
|
}
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
|
||||||
|
* Latency min/avg/max: 0/0/2209
|
||||||
|
* Received: 278202469
|
||||||
|
* Sent: 279449055
|
||||||
|
* Connections: 31
|
||||||
|
* Outstanding: 0
|
||||||
|
* Zxid: 0x20033fc12
|
||||||
|
* Mode: leader
|
||||||
|
* Node count: 10084
|
||||||
|
* Proposal sizes last/min/max: 36/32/31260 leader特有
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class ServerCmdData extends BaseFourLetterWordCmdData {
|
||||||
|
private String zkVersion;
|
||||||
|
private Long zkAvgLatency;
|
||||||
|
private Long zkMaxLatency;
|
||||||
|
private Long zkMinLatency;
|
||||||
|
private Long zkPacketsReceived;
|
||||||
|
private Long zkPacketsSent;
|
||||||
|
private Long zkNumAliveConnections;
|
||||||
|
private Long zkOutstandingRequests;
|
||||||
|
private String zkServerState;
|
||||||
|
private Long zkZnodeCount;
|
||||||
|
private Long zkZxid;
|
||||||
|
}
|
||||||
@@ -0,0 +1,116 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ConfigCmdData;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* clientPort=2183
|
||||||
|
* dataDir=/data1/data/zkData2/version-2
|
||||||
|
* dataLogDir=/data1/data/zkLog2/version-2
|
||||||
|
* tickTime=2000
|
||||||
|
* maxClientCnxns=60
|
||||||
|
* minSessionTimeout=4000
|
||||||
|
* maxSessionTimeout=40000
|
||||||
|
* serverId=2
|
||||||
|
* initLimit=15
|
||||||
|
* syncLimit=10
|
||||||
|
* electionAlg=3
|
||||||
|
* electionPort=4445
|
||||||
|
* quorumPort=4444
|
||||||
|
* peerType=0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class ConfigCmdDataParser implements FourLetterWordDataParser<ConfigCmdData> {
|
||||||
|
private static final ILog LOGGER = LogFactory.getLog(ConfigCmdDataParser.class);
|
||||||
|
|
||||||
|
private Result<ConfigCmdData> dataResult = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCmd() {
|
||||||
|
return FourLetterWordUtil.ConfigCmd;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ConfigCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
|
||||||
|
Map<String, String> dataMap = new HashMap<>();
|
||||||
|
for (String elem : cmdData.split("\n")) {
|
||||||
|
if (elem.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int idx = elem.indexOf('=');
|
||||||
|
if (idx >= 0) {
|
||||||
|
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ConfigCmdData configCmdData = new ConfigCmdData();
|
||||||
|
dataMap.entrySet().stream().forEach(elem -> {
|
||||||
|
try {
|
||||||
|
switch (elem.getKey()) {
|
||||||
|
case "clientPort":
|
||||||
|
configCmdData.setClientPort(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "dataDir":
|
||||||
|
configCmdData.setDataDir(elem.getValue());
|
||||||
|
break;
|
||||||
|
case "dataLogDir":
|
||||||
|
configCmdData.setDataLogDir(elem.getValue());
|
||||||
|
break;
|
||||||
|
case "tickTime":
|
||||||
|
configCmdData.setTickTime(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "maxClientCnxns":
|
||||||
|
configCmdData.setMaxClientCnxns(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "minSessionTimeout":
|
||||||
|
configCmdData.setMinSessionTimeout(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "maxSessionTimeout":
|
||||||
|
configCmdData.setMaxSessionTimeout(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "serverId":
|
||||||
|
configCmdData.setServerId(Integer.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "initLimit":
|
||||||
|
configCmdData.setInitLimit(elem.getValue());
|
||||||
|
break;
|
||||||
|
case "syncLimit":
|
||||||
|
configCmdData.setSyncLimit(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "electionAlg":
|
||||||
|
configCmdData.setElectionAlg(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "electionPort":
|
||||||
|
configCmdData.setElectionPort(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "quorumPort":
|
||||||
|
configCmdData.setQuorumPort(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "peerType":
|
||||||
|
configCmdData.setPeerType(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOGGER.warn(
|
||||||
|
"class=ConfigCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
|
||||||
|
elem.getKey(), elem.getValue()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"class=ConfigCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
|
||||||
|
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return configCmdData;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 四字命令结果解析类
|
||||||
|
*/
|
||||||
|
public interface FourLetterWordDataParser<T> {
|
||||||
|
String getCmd();
|
||||||
|
|
||||||
|
T parseAndInitData(Long clusterPhyId, String host, int port, String cmdData);
|
||||||
|
}
|
||||||
@@ -0,0 +1,117 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT
|
||||||
|
* zk_avg_latency 0
|
||||||
|
* zk_max_latency 399
|
||||||
|
* zk_min_latency 0
|
||||||
|
* zk_packets_received 234857
|
||||||
|
* zk_packets_sent 234860
|
||||||
|
* zk_num_alive_connections 4
|
||||||
|
* zk_outstanding_requests 0
|
||||||
|
* zk_server_state follower
|
||||||
|
* zk_znode_count 35566
|
||||||
|
* zk_watch_count 39
|
||||||
|
* zk_ephemerals_count 10
|
||||||
|
* zk_approximate_data_size 3356708
|
||||||
|
* zk_open_file_descriptor_count 35
|
||||||
|
* zk_max_file_descriptor_count 819200
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class MonitorCmdDataParser implements FourLetterWordDataParser<MonitorCmdData> {
|
||||||
|
private static final ILog LOGGER = LogFactory.getLog(MonitorCmdDataParser.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCmd() {
|
||||||
|
return FourLetterWordUtil.MonitorCmd;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MonitorCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
|
||||||
|
Map<String, String> dataMap = new HashMap<>();
|
||||||
|
for (String elem : cmdData.split("\n")) {
|
||||||
|
if (elem.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int idx = elem.indexOf('\t');
|
||||||
|
if (idx >= 0) {
|
||||||
|
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MonitorCmdData monitorCmdData = new MonitorCmdData();
|
||||||
|
dataMap.entrySet().stream().forEach(elem -> {
|
||||||
|
try {
|
||||||
|
switch (elem.getKey()) {
|
||||||
|
case "zk_version":
|
||||||
|
monitorCmdData.setZkVersion(elem.getValue().split("-")[0]);
|
||||||
|
break;
|
||||||
|
case "zk_avg_latency":
|
||||||
|
monitorCmdData.setZkAvgLatency(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_max_latency":
|
||||||
|
monitorCmdData.setZkMaxLatency(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_min_latency":
|
||||||
|
monitorCmdData.setZkMinLatency(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_packets_received":
|
||||||
|
monitorCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_packets_sent":
|
||||||
|
monitorCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_num_alive_connections":
|
||||||
|
monitorCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_outstanding_requests":
|
||||||
|
monitorCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_server_state":
|
||||||
|
monitorCmdData.setZkServerState(elem.getValue());
|
||||||
|
break;
|
||||||
|
case "zk_znode_count":
|
||||||
|
monitorCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_watch_count":
|
||||||
|
monitorCmdData.setZkWatchCount(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_ephemerals_count":
|
||||||
|
monitorCmdData.setZkEphemeralsCount(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_approximate_data_size":
|
||||||
|
monitorCmdData.setZkApproximateDataSize(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_open_file_descriptor_count":
|
||||||
|
monitorCmdData.setZkOpenFileDescriptorCount(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "zk_max_file_descriptor_count":
|
||||||
|
monitorCmdData.setZkMaxFileDescriptorCount(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOGGER.warn(
|
||||||
|
"class=MonitorCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
|
||||||
|
elem.getKey(), elem.getValue()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"class=MonitorCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
|
||||||
|
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return monitorCmdData;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,97 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
|
||||||
|
* Latency min/avg/max: 0/0/2209
|
||||||
|
* Received: 278202469
|
||||||
|
* Sent: 279449055
|
||||||
|
* Connections: 31
|
||||||
|
* Outstanding: 0
|
||||||
|
* Zxid: 0x20033fc12
|
||||||
|
* Mode: leader
|
||||||
|
* Node count: 10084
|
||||||
|
* Proposal sizes last/min/max: 36/32/31260 leader特有
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class ServerCmdDataParser implements FourLetterWordDataParser<ServerCmdData> {
|
||||||
|
private static final ILog LOGGER = LogFactory.getLog(ServerCmdDataParser.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCmd() {
|
||||||
|
return FourLetterWordUtil.ServerCmd;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
|
||||||
|
Map<String, String> dataMap = new HashMap<>();
|
||||||
|
for (String elem : cmdData.split("\n")) {
|
||||||
|
if (elem.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int idx = elem.indexOf(':');
|
||||||
|
if (idx >= 0) {
|
||||||
|
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ServerCmdData serverCmdData = new ServerCmdData();
|
||||||
|
dataMap.entrySet().stream().forEach(elem -> {
|
||||||
|
try {
|
||||||
|
switch (elem.getKey()) {
|
||||||
|
case "Zookeeper version":
|
||||||
|
serverCmdData.setZkVersion(elem.getValue().split("-")[0]);
|
||||||
|
break;
|
||||||
|
case "Latency min/avg/max":
|
||||||
|
String[] data = elem.getValue().split("/");
|
||||||
|
serverCmdData.setZkMinLatency(Long.valueOf(data[0]));
|
||||||
|
serverCmdData.setZkAvgLatency(Long.valueOf(data[1]));
|
||||||
|
serverCmdData.setZkMaxLatency(Long.valueOf(data[2]));
|
||||||
|
break;
|
||||||
|
case "Received":
|
||||||
|
serverCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "Sent":
|
||||||
|
serverCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "Connections":
|
||||||
|
serverCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "Outstanding":
|
||||||
|
serverCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "Mode":
|
||||||
|
serverCmdData.setZkServerState(elem.getValue());
|
||||||
|
break;
|
||||||
|
case "Node count":
|
||||||
|
serverCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
|
||||||
|
break;
|
||||||
|
case "Zxid":
|
||||||
|
serverCmdData.setZkZxid(Long.parseUnsignedLong(elem.getValue().trim().substring(2), 16));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOGGER.warn(
|
||||||
|
"class=ServerCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
|
||||||
|
elem.getKey(), elem.getValue()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"class=ServerCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
|
||||||
|
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return serverCmdData;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,163 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.utils.zookeeper;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.FourLetterWordDataParser;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||||
|
import org.apache.zookeeper.common.ClientX509Util;
|
||||||
|
import org.apache.zookeeper.common.X509Exception;
|
||||||
|
import org.apache.zookeeper.common.X509Util;
|
||||||
|
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
import javax.net.ssl.SSLSocket;
|
||||||
|
import javax.net.ssl.SSLSocketFactory;
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class FourLetterWordUtil {
|
||||||
|
private static final ILog LOGGER = LogFactory.getLog(FourLetterWordUtil.class);
|
||||||
|
|
||||||
|
public static final String MonitorCmd = "mntr";
|
||||||
|
public static final String ConfigCmd = "conf";
|
||||||
|
public static final String ServerCmd = "srvr";
|
||||||
|
|
||||||
|
private static final Set<String> supportedCommands = new HashSet<>();
|
||||||
|
|
||||||
|
public static <T> Result<T> executeFourLetterCmd(Long clusterPhyId,
|
||||||
|
String host,
|
||||||
|
int port,
|
||||||
|
boolean secure,
|
||||||
|
int timeout,
|
||||||
|
FourLetterWordDataParser<T> dataParser) {
|
||||||
|
try {
|
||||||
|
if (!supportedCommands.contains(dataParser.getCmd())) {
|
||||||
|
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, String.format("ZK %s命令暂未进行支持", dataParser.getCmd()));
|
||||||
|
}
|
||||||
|
|
||||||
|
String cmdData = send4LetterWord(host, port, dataParser.getCmd(), secure, timeout);
|
||||||
|
if (cmdData.contains("not executed because it is not in the whitelist.")) {
|
||||||
|
return Result.buildFromRSAndMsg(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN, cmdData);
|
||||||
|
}
|
||||||
|
if (ValidateUtils.isBlank(cmdData)) {
|
||||||
|
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, cmdData);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Result.buildSuc(dataParser.parseAndInitData(clusterPhyId, host, port, cmdData));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"class=FourLetterWordUtil||method=executeFourLetterCmd||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
|
||||||
|
clusterPhyId, host, port, dataParser.getCmd(), secure, timeout, e
|
||||||
|
);
|
||||||
|
|
||||||
|
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**************************************************** private method ****************************************************/
|
||||||
|
|
||||||
|
private static String send4LetterWord(
|
||||||
|
String host,
|
||||||
|
int port,
|
||||||
|
String cmd,
|
||||||
|
boolean secure,
|
||||||
|
int timeout) throws IOException, X509Exception.SSLContextException {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
LOGGER.info("connecting to {} {}", host, port);
|
||||||
|
|
||||||
|
Socket socket = null;
|
||||||
|
OutputStream outputStream = null;
|
||||||
|
BufferedReader bufferedReader = null;
|
||||||
|
try {
|
||||||
|
InetSocketAddress hostaddress = host != null
|
||||||
|
? new InetSocketAddress(host, port)
|
||||||
|
: new InetSocketAddress(InetAddress.getByName(null), port);
|
||||||
|
if (secure) {
|
||||||
|
LOGGER.info("using secure socket");
|
||||||
|
try (X509Util x509Util = new ClientX509Util()) {
|
||||||
|
SSLContext sslContext = x509Util.getDefaultSSLContext();
|
||||||
|
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
|
||||||
|
SSLSocket sslSock = (SSLSocket) socketFactory.createSocket();
|
||||||
|
sslSock.connect(hostaddress, timeout);
|
||||||
|
sslSock.startHandshake();
|
||||||
|
socket = sslSock;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
socket = new Socket();
|
||||||
|
socket.connect(hostaddress, timeout);
|
||||||
|
}
|
||||||
|
socket.setSoTimeout(timeout);
|
||||||
|
|
||||||
|
outputStream = socket.getOutputStream();
|
||||||
|
outputStream.write(cmd.getBytes());
|
||||||
|
outputStream.flush();
|
||||||
|
|
||||||
|
// 等待InputStream有数据
|
||||||
|
while (System.currentTimeMillis() - startTime <= timeout && socket.getInputStream().available() <= 0) {
|
||||||
|
BackoffUtils.backoff(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
String line;
|
||||||
|
while ((line = bufferedReader.readLine()) != null) {
|
||||||
|
sb.append(line).append("\n");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
throw new IOException("Exception while executing four letter word: " + cmd, e);
|
||||||
|
} finally {
|
||||||
|
if (outputStream != null) {
|
||||||
|
try {
|
||||||
|
outputStream.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"class=FourLetterWordUtil||method=send4LetterWord||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
|
||||||
|
host, port, cmd, secure, timeout, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bufferedReader != null) {
|
||||||
|
try {
|
||||||
|
bufferedReader.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
|
||||||
|
host, port, cmd, secure, timeout, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (socket != null) {
|
||||||
|
try {
|
||||||
|
socket.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
|
||||||
|
host, port, cmd, secure, timeout, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
supportedCommands.add(MonitorCmd);
|
||||||
|
supportedCommands.add(ConfigCmd);
|
||||||
|
supportedCommands.add(ServerCmd);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,59 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.utils.zookeeper;
|
||||||
|
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||||
|
import org.apache.zookeeper.client.ConnectStringParser;
|
||||||
|
import org.apache.zookeeper.common.NetUtils;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.zookeeper.common.StringUtils.split;
|
||||||
|
|
||||||
|
public class ZookeeperUtils {
|
||||||
|
private static final int DEFAULT_PORT = 2181;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 解析ZK地址
|
||||||
|
* @see ConnectStringParser
|
||||||
|
*/
|
||||||
|
public static List<Tuple<String, Integer>> connectStringParser(String connectString) throws Exception {
|
||||||
|
List<Tuple<String, Integer>> ipPortList = new ArrayList<>();
|
||||||
|
|
||||||
|
if (connectString == null) {
|
||||||
|
return ipPortList;
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse out chroot, if any
|
||||||
|
int off = connectString.indexOf('/');
|
||||||
|
if (off >= 0) {
|
||||||
|
connectString = connectString.substring(0, off);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> hostsList = split(connectString, ",");
|
||||||
|
for (String host : hostsList) {
|
||||||
|
int port = DEFAULT_PORT;
|
||||||
|
String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
|
||||||
|
if (hostAndPort.length != 0) {
|
||||||
|
host = hostAndPort[0];
|
||||||
|
if (hostAndPort.length == 2) {
|
||||||
|
port = Integer.parseInt(hostAndPort[1]);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int pidx = host.lastIndexOf(':');
|
||||||
|
if (pidx >= 0) {
|
||||||
|
// otherwise : is at the end of the string, ignore
|
||||||
|
if (pidx < host.length() - 1) {
|
||||||
|
port = Integer.parseInt(host.substring(pidx + 1));
|
||||||
|
}
|
||||||
|
host = host.substring(0, pidx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ipPortList.add(new Tuple<>(host, port));
|
||||||
|
}
|
||||||
|
|
||||||
|
return ipPortList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user