mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
@@ -7,7 +7,22 @@
|
||||
|
||||
### 6.2.0、升级至 `master` 版本
|
||||
|
||||
暂无
|
||||
```sql
|
||||
DROP TABLE IF EXISTS `ks_km_zookeeper`;
|
||||
CREATE TABLE `ks_km_zookeeper` (
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||
`cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '物理集群ID',
|
||||
`host` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper主机名',
|
||||
`port` int(16) NOT NULL DEFAULT '-1' COMMENT 'zookeeper端口',
|
||||
`role` int(16) NOT NULL DEFAULT '-1' COMMENT '角色, leader follower observer',
|
||||
`version` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper版本',
|
||||
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活,11存活但是4字命令使用不了',
|
||||
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_cluster_phy_id_host_port` (`cluster_phy_id`,`host`, `port`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Zookeeper信息表';
|
||||
```
|
||||
|
||||
|
||||
### 6.2.1、升级至 `v3.0.0` 版本
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.cluster;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO;
|
||||
|
||||
/**
|
||||
* 多集群总体状态
|
||||
*/
|
||||
public interface ClusterZookeepersManager {
|
||||
Result<ClusterZookeepersStateVO> getClusterPhyZookeepersState(Long clusterPhyId);
|
||||
|
||||
PaginationResult<ClusterZookeepersOverviewVO> getClusterPhyZookeepersOverview(Long clusterPhyId, ClusterZookeepersOverviewDTO dto);
|
||||
|
||||
Result<ZnodeVO> getZnodeVO(Long clusterPhyId, String path);
|
||||
}
|
||||
@@ -0,0 +1,137 @@
|
||||
package com.xiaojukeji.know.streaming.km.biz.cluster.impl;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterZookeepersManager;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Service
|
||||
public class ClusterZookeepersManagerImpl implements ClusterZookeepersManager {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ClusterZookeepersManagerImpl.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperService zookeeperService;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperMetricService zookeeperMetricService;
|
||||
|
||||
@Autowired
|
||||
private ZnodeService znodeService;
|
||||
|
||||
@Override
|
||||
public Result<ClusterZookeepersStateVO> getClusterPhyZookeepersState(Long clusterPhyId) {
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
|
||||
if (clusterPhy == null) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
|
||||
}
|
||||
|
||||
// // TODO
|
||||
// private Integer healthState;
|
||||
// private Integer healthCheckPassed;
|
||||
// private Integer healthCheckTotal;
|
||||
|
||||
List<ZookeeperInfo> infoList = zookeeperService.listFromDBByCluster(clusterPhyId);
|
||||
|
||||
ClusterZookeepersStateVO vo = new ClusterZookeepersStateVO();
|
||||
vo.setTotalServerCount(infoList.size());
|
||||
vo.setAliveFollowerCount(0);
|
||||
vo.setTotalFollowerCount(0);
|
||||
vo.setAliveObserverCount(0);
|
||||
vo.setTotalObserverCount(0);
|
||||
vo.setAliveServerCount(0);
|
||||
for (ZookeeperInfo info: infoList) {
|
||||
if (info.getRole().equals(ZKRoleEnum.LEADER.getRole())) {
|
||||
vo.setLeaderNode(info.getHost());
|
||||
}
|
||||
|
||||
if (info.getRole().equals(ZKRoleEnum.FOLLOWER.getRole())) {
|
||||
vo.setTotalFollowerCount(vo.getTotalFollowerCount() + 1);
|
||||
vo.setAliveFollowerCount(info.alive()? vo.getAliveFollowerCount() + 1: vo.getAliveFollowerCount());
|
||||
}
|
||||
|
||||
if (info.getRole().equals(ZKRoleEnum.OBSERVER.getRole())) {
|
||||
vo.setTotalObserverCount(vo.getTotalObserverCount() + 1);
|
||||
vo.setAliveObserverCount(info.alive()? vo.getAliveObserverCount() + 1: vo.getAliveObserverCount());
|
||||
}
|
||||
|
||||
if (info.alive()) {
|
||||
vo.setAliveServerCount(vo.getAliveServerCount() + 1);
|
||||
}
|
||||
}
|
||||
|
||||
Result<ZookeeperMetrics> metricsResult = zookeeperMetricService.collectMetricsFromZookeeper(new ZookeeperMetricParam(
|
||||
clusterPhyId,
|
||||
infoList.stream().filter(elem -> elem.alive()).map(item -> new Tuple<String, Integer>(item.getHost(), item.getPort())).collect(Collectors.toList()),
|
||||
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class),
|
||||
ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT
|
||||
));
|
||||
if (metricsResult.failed()) {
|
||||
LOGGER.error(
|
||||
"class=ClusterZookeepersManagerImpl||method=getClusterPhyZookeepersState||clusterPhyId={}||errMsg={}",
|
||||
clusterPhyId, metricsResult.getMessage()
|
||||
);
|
||||
return Result.buildSuc(vo);
|
||||
}
|
||||
Float watchCount = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT);
|
||||
vo.setWatchCount(watchCount != null? watchCount.intValue(): null);
|
||||
|
||||
return Result.buildSuc(vo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PaginationResult<ClusterZookeepersOverviewVO> getClusterPhyZookeepersOverview(Long clusterPhyId, ClusterZookeepersOverviewDTO dto) {
|
||||
//获取集群zookeeper列表
|
||||
List<ClusterZookeepersOverviewVO> clusterZookeepersOverviewVOList = ConvertUtil.list2List(zookeeperService.listFromDBByCluster(clusterPhyId), ClusterZookeepersOverviewVO.class);
|
||||
|
||||
//搜索
|
||||
clusterZookeepersOverviewVOList = PaginationUtil.pageByFuzzyFilter(clusterZookeepersOverviewVOList, dto.getSearchKeywords(), Arrays.asList("host"));
|
||||
|
||||
//分页
|
||||
PaginationResult<ClusterZookeepersOverviewVO> paginationResult = PaginationUtil.pageBySubData(clusterZookeepersOverviewVOList, dto);
|
||||
|
||||
return paginationResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<ZnodeVO> getZnodeVO(Long clusterPhyId, String path) {
|
||||
Result<Znode> result = znodeService.getZnode(clusterPhyId, path);
|
||||
if (result.failed()) {
|
||||
return Result.buildFromIgnoreData(result);
|
||||
}
|
||||
return Result.buildSuc(ConvertUtil.obj2ObjByJSON(result.getData(), ZnodeVO.class));
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
}
|
||||
@@ -0,0 +1,122 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.metric;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ZookeeperMetricEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_ZOOKEEPER;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Component
|
||||
public class ZookeeperMetricCollector extends AbstractMetricCollector<ZookeeperMetricPO> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
|
||||
@Autowired
|
||||
private VersionControlService versionControlService;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperMetricService zookeeperMetricService;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperService zookeeperService;
|
||||
|
||||
@Autowired
|
||||
private KafkaControllerService kafkaControllerService;
|
||||
|
||||
@Override
|
||||
public void collectMetrics(ClusterPhy clusterPhy) {
|
||||
Long startTime = System.currentTimeMillis();
|
||||
Long clusterPhyId = clusterPhy.getId();
|
||||
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
|
||||
List<ZookeeperInfo> aliveZKList = zookeeperService.listFromDBByCluster(clusterPhyId)
|
||||
.stream()
|
||||
.filter(elem -> Constant.ALIVE.equals(elem.getStatus()))
|
||||
.collect(Collectors.toList());
|
||||
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);
|
||||
|
||||
ZookeeperMetrics metrics = ZookeeperMetrics.initWithMetric(clusterPhyId, Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (float)Constant.INVALID_CODE);
|
||||
if (ValidateUtils.isEmptyList(aliveZKList)) {
|
||||
// 没有存活的ZK时,发布事件,然后直接返回
|
||||
publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics)));
|
||||
return;
|
||||
}
|
||||
|
||||
// 构造参数
|
||||
ZookeeperMetricParam param = new ZookeeperMetricParam(
|
||||
clusterPhyId,
|
||||
aliveZKList.stream().map(elem -> new Tuple<String, Integer>(elem.getHost(), elem.getPort())).collect(Collectors.toList()),
|
||||
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class),
|
||||
kafkaController == null? Constant.INVALID_CODE: kafkaController.getBrokerId(),
|
||||
null
|
||||
);
|
||||
|
||||
for(VersionControlItem v : items) {
|
||||
try {
|
||||
if(null != metrics.getMetrics().get(v.getName())) {
|
||||
continue;
|
||||
}
|
||||
param.setMetricName(v.getName());
|
||||
|
||||
Result<ZookeeperMetrics> ret = zookeeperMetricService.collectMetricsFromZookeeper(param);
|
||||
if(null == ret || ret.failed() || null == ret.getData()){
|
||||
continue;
|
||||
}
|
||||
|
||||
metrics.putMetric(ret.getData().getMetrics());
|
||||
|
||||
if(!EnvUtil.isOnline()){
|
||||
LOGGER.info(
|
||||
"class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||metricValue={}",
|
||||
clusterPhyId, v.getName(), ConvertUtil.obj2Json(ret.getData().getMetrics())
|
||||
);
|
||||
}
|
||||
} catch (Exception e){
|
||||
LOGGER.error(
|
||||
"class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||errMsg=exception!",
|
||||
clusterPhyId, v.getName(), e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
|
||||
|
||||
publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics)));
|
||||
|
||||
LOGGER.info(
|
||||
"class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||startTime={}||costTime={}||msg=msg=collect finished.",
|
||||
clusterPhyId, startTime, System.currentTimeMillis() - startTime
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionItemTypeEnum collectorType() {
|
||||
return METRIC_ZOOKEEPER;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.xiaojukeji.know.streaming.km.collector.sink;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ZookeeperMetricEvent;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX;
|
||||
|
||||
@Component
|
||||
public class ZookeeperMetricESSender extends AbstractMetricESSender implements ApplicationListener<ZookeeperMetricEvent> {
|
||||
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
|
||||
|
||||
@PostConstruct
|
||||
public void init(){
|
||||
LOGGER.info("class=ZookeeperMetricESSender||method=init||msg=init finished");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ZookeeperMetricEvent event) {
|
||||
send2es(ZOOKEEPER_INDEX, ConvertUtil.list2List(event.getZookeeperMetrics(), ZookeeperMetricPO.class));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.dto.cluster;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @author wyc
|
||||
* @date 2022/9/23
|
||||
*/
|
||||
@Data
|
||||
public class ClusterZookeepersOverviewDTO extends PaginationBaseDTO {
|
||||
|
||||
}
|
||||
@@ -1,8 +1,8 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.config;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Properties;
|
||||
@@ -11,7 +11,6 @@ import java.util.Properties;
|
||||
* @author zengqiao
|
||||
* @date 22/02/24
|
||||
*/
|
||||
@Data
|
||||
@ApiModel(description = "ZK配置")
|
||||
public class ZKConfig implements Serializable {
|
||||
@ApiModelProperty(value="ZK的jmx配置")
|
||||
@@ -21,11 +20,51 @@ public class ZKConfig implements Serializable {
|
||||
private Boolean openSecure = false;
|
||||
|
||||
@ApiModelProperty(value="ZK的Session超时时间", example = "15000")
|
||||
private Long sessionTimeoutUnitMs = 15000L;
|
||||
private Integer sessionTimeoutUnitMs = 15000;
|
||||
|
||||
@ApiModelProperty(value="ZK的Request超时时间", example = "5000")
|
||||
private Long requestTimeoutUnitMs = 5000L;
|
||||
private Integer requestTimeoutUnitMs = 5000;
|
||||
|
||||
@ApiModelProperty(value="ZK的Request超时时间")
|
||||
private Properties otherProps = new Properties();
|
||||
|
||||
public JmxConfig getJmxConfig() {
|
||||
return jmxConfig == null? new JmxConfig(): jmxConfig;
|
||||
}
|
||||
|
||||
public void setJmxConfig(JmxConfig jmxConfig) {
|
||||
this.jmxConfig = jmxConfig;
|
||||
}
|
||||
|
||||
public Boolean getOpenSecure() {
|
||||
return openSecure != null && openSecure;
|
||||
}
|
||||
|
||||
public void setOpenSecure(Boolean openSecure) {
|
||||
this.openSecure = openSecure;
|
||||
}
|
||||
|
||||
public Integer getSessionTimeoutUnitMs() {
|
||||
return sessionTimeoutUnitMs == null? Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS: sessionTimeoutUnitMs;
|
||||
}
|
||||
|
||||
public void setSessionTimeoutUnitMs(Integer sessionTimeoutUnitMs) {
|
||||
this.sessionTimeoutUnitMs = sessionTimeoutUnitMs;
|
||||
}
|
||||
|
||||
public Integer getRequestTimeoutUnitMs() {
|
||||
return requestTimeoutUnitMs == null? Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS: requestTimeoutUnitMs;
|
||||
}
|
||||
|
||||
public void setRequestTimeoutUnitMs(Integer requestTimeoutUnitMs) {
|
||||
this.requestTimeoutUnitMs = requestTimeoutUnitMs;
|
||||
}
|
||||
|
||||
public Properties getOtherProps() {
|
||||
return otherProps == null? new Properties() : otherProps;
|
||||
}
|
||||
|
||||
public void setOtherProps(Properties otherProps) {
|
||||
this.otherProps = otherProps;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.ToString;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/6/17
|
||||
*/
|
||||
@Data
|
||||
@ToString
|
||||
public class ZookeeperMetrics extends BaseMetrics {
|
||||
public ZookeeperMetrics(Long clusterPhyId) {
|
||||
super(clusterPhyId);
|
||||
}
|
||||
|
||||
public static ZookeeperMetrics initWithMetric(Long clusterPhyId, String metric, Float value) {
|
||||
ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId);
|
||||
metrics.setClusterPhyId( clusterPhyId );
|
||||
metrics.putMetric(metric, value);
|
||||
return metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String unique() {
|
||||
return "ZK@" + clusterPhyId;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class ZookeeperMetricParam extends MetricParam {
|
||||
private Long clusterPhyId;
|
||||
|
||||
private List<Tuple<String, Integer>> zkAddressList;
|
||||
|
||||
private ZKConfig zkConfig;
|
||||
|
||||
private String metricName;
|
||||
|
||||
private Integer kafkaControllerId;
|
||||
|
||||
public ZookeeperMetricParam(Long clusterPhyId,
|
||||
List<Tuple<String, Integer>> zkAddressList,
|
||||
ZKConfig zkConfig,
|
||||
String metricName) {
|
||||
this.clusterPhyId = clusterPhyId;
|
||||
this.zkAddressList = zkAddressList;
|
||||
this.zkConfig = zkConfig;
|
||||
this.metricName = metricName;
|
||||
}
|
||||
|
||||
public ZookeeperMetricParam(Long clusterPhyId,
|
||||
List<Tuple<String, Integer>> zkAddressList,
|
||||
ZKConfig zkConfig,
|
||||
Integer kafkaControllerId,
|
||||
String metricName) {
|
||||
this.clusterPhyId = clusterPhyId;
|
||||
this.zkAddressList = zkAddressList;
|
||||
this.zkConfig = zkConfig;
|
||||
this.kafkaControllerId = kafkaControllerId;
|
||||
this.metricName = metricName;
|
||||
}
|
||||
}
|
||||
@@ -56,6 +56,7 @@ public enum ResultStatus {
|
||||
KAFKA_OPERATE_FAILED(8010, "Kafka操作失败"),
|
||||
MYSQL_OPERATE_FAILED(8020, "MySQL操作失败"),
|
||||
ZK_OPERATE_FAILED(8030, "ZK操作失败"),
|
||||
ZK_FOUR_LETTER_CMD_FORBIDDEN(8031, "ZK四字命令被禁止"),
|
||||
ES_OPERATE_ERROR(8040, "ES操作失败"),
|
||||
HTTP_REQ_ERROR(8050, "第三方http请求异常"),
|
||||
|
||||
|
||||
@@ -23,6 +23,8 @@ public class VersionMetricControlItem extends VersionControlItem{
|
||||
public static final String CATEGORY_PERFORMANCE = "Performance";
|
||||
public static final String CATEGORY_FLOW = "Flow";
|
||||
|
||||
public static final String CATEGORY_CLIENT = "Client";
|
||||
|
||||
/**
|
||||
* 指标单位名称,非指标的没有
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper;
|
||||
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
@Data
|
||||
public class Znode {
|
||||
@ApiModelProperty(value = "节点名称", example = "broker")
|
||||
private String name;
|
||||
|
||||
@ApiModelProperty(value = "节点数据", example = "saassad")
|
||||
private String data;
|
||||
|
||||
@ApiModelProperty(value = "节点属性", example = "")
|
||||
private Stat stat;
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.BaseEntity;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class ZookeeperInfo extends BaseEntity {
|
||||
/**
|
||||
* 集群Id
|
||||
*/
|
||||
private Long clusterPhyId;
|
||||
|
||||
/**
|
||||
* 主机
|
||||
*/
|
||||
private String host;
|
||||
|
||||
/**
|
||||
* 端口
|
||||
*/
|
||||
private Integer port;
|
||||
|
||||
/**
|
||||
* 角色
|
||||
*/
|
||||
private String role;
|
||||
|
||||
/**
|
||||
* 版本
|
||||
*/
|
||||
private String version;
|
||||
|
||||
/**
|
||||
* ZK状态
|
||||
*/
|
||||
private Integer status;
|
||||
|
||||
public boolean alive() {
|
||||
return !(Constant.DOWN.equals(status));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 四字命令结果数据的基础类
|
||||
*/
|
||||
public class BaseFourLetterWordCmdData implements Serializable {
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
|
||||
/**
|
||||
* clientPort=2183
|
||||
* dataDir=/data1/data/zkData2/version-2
|
||||
* dataLogDir=/data1/data/zkLog2/version-2
|
||||
* tickTime=2000
|
||||
* maxClientCnxns=60
|
||||
* minSessionTimeout=4000
|
||||
* maxSessionTimeout=40000
|
||||
* serverId=2
|
||||
* initLimit=15
|
||||
* syncLimit=10
|
||||
* electionAlg=3
|
||||
* electionPort=4445
|
||||
* quorumPort=4444
|
||||
* peerType=0
|
||||
*/
|
||||
@Data
|
||||
public class ConfigCmdData extends BaseFourLetterWordCmdData {
|
||||
private Long clientPort;
|
||||
private String dataDir;
|
||||
private String dataLogDir;
|
||||
private Long tickTime;
|
||||
private Long maxClientCnxns;
|
||||
private Long minSessionTimeout;
|
||||
private Long maxSessionTimeout;
|
||||
private Integer serverId;
|
||||
private String initLimit;
|
||||
private Long syncLimit;
|
||||
private Long electionAlg;
|
||||
private Long electionPort;
|
||||
private Long quorumPort;
|
||||
private Long peerType;
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT
|
||||
* zk_avg_latency 0
|
||||
* zk_max_latency 399
|
||||
* zk_min_latency 0
|
||||
* zk_packets_received 234857
|
||||
* zk_packets_sent 234860
|
||||
* zk_num_alive_connections 4
|
||||
* zk_outstanding_requests 0
|
||||
* zk_server_state follower
|
||||
* zk_znode_count 35566
|
||||
* zk_watch_count 39
|
||||
* zk_ephemerals_count 10
|
||||
* zk_approximate_data_size 3356708
|
||||
* zk_open_file_descriptor_count 35
|
||||
* zk_max_file_descriptor_count 819200
|
||||
*/
|
||||
@Data
|
||||
public class MonitorCmdData extends BaseFourLetterWordCmdData {
|
||||
private String zkVersion;
|
||||
private Long zkAvgLatency;
|
||||
private Long zkMaxLatency;
|
||||
private Long zkMinLatency;
|
||||
private Long zkPacketsReceived;
|
||||
private Long zkPacketsSent;
|
||||
private Long zkNumAliveConnections;
|
||||
private Long zkOutstandingRequests;
|
||||
private String zkServerState;
|
||||
private Long zkZnodeCount;
|
||||
private Long zkWatchCount;
|
||||
private Long zkEphemeralsCount;
|
||||
private Long zkApproximateDataSize;
|
||||
private Long zkOpenFileDescriptorCount;
|
||||
private Long zkMaxFileDescriptorCount;
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
|
||||
* Latency min/avg/max: 0/0/2209
|
||||
* Received: 278202469
|
||||
* Sent: 279449055
|
||||
* Connections: 31
|
||||
* Outstanding: 0
|
||||
* Zxid: 0x20033fc12
|
||||
* Mode: leader
|
||||
* Node count: 10084
|
||||
* Proposal sizes last/min/max: 36/32/31260 leader特有
|
||||
*/
|
||||
@Data
|
||||
public class ServerCmdData extends BaseFourLetterWordCmdData {
|
||||
private String zkVersion;
|
||||
private Long zkAvgLatency;
|
||||
private Long zkMaxLatency;
|
||||
private Long zkMinLatency;
|
||||
private Long zkPacketsReceived;
|
||||
private Long zkPacketsSent;
|
||||
private Long zkNumAliveConnections;
|
||||
private Long zkOutstandingRequests;
|
||||
private String zkServerState;
|
||||
private Long zkZnodeCount;
|
||||
private Long zkZxid;
|
||||
}
|
||||
@@ -0,0 +1,116 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ConfigCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* clientPort=2183
|
||||
* dataDir=/data1/data/zkData2/version-2
|
||||
* dataLogDir=/data1/data/zkLog2/version-2
|
||||
* tickTime=2000
|
||||
* maxClientCnxns=60
|
||||
* minSessionTimeout=4000
|
||||
* maxSessionTimeout=40000
|
||||
* serverId=2
|
||||
* initLimit=15
|
||||
* syncLimit=10
|
||||
* electionAlg=3
|
||||
* electionPort=4445
|
||||
* quorumPort=4444
|
||||
* peerType=0
|
||||
*/
|
||||
@Data
|
||||
public class ConfigCmdDataParser implements FourLetterWordDataParser<ConfigCmdData> {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ConfigCmdDataParser.class);
|
||||
|
||||
private Result<ConfigCmdData> dataResult = null;
|
||||
|
||||
@Override
|
||||
public String getCmd() {
|
||||
return FourLetterWordUtil.ConfigCmd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
|
||||
Map<String, String> dataMap = new HashMap<>();
|
||||
for (String elem : cmdData.split("\n")) {
|
||||
if (elem.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int idx = elem.indexOf('=');
|
||||
if (idx >= 0) {
|
||||
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
|
||||
}
|
||||
}
|
||||
|
||||
ConfigCmdData configCmdData = new ConfigCmdData();
|
||||
dataMap.entrySet().stream().forEach(elem -> {
|
||||
try {
|
||||
switch (elem.getKey()) {
|
||||
case "clientPort":
|
||||
configCmdData.setClientPort(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "dataDir":
|
||||
configCmdData.setDataDir(elem.getValue());
|
||||
break;
|
||||
case "dataLogDir":
|
||||
configCmdData.setDataLogDir(elem.getValue());
|
||||
break;
|
||||
case "tickTime":
|
||||
configCmdData.setTickTime(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "maxClientCnxns":
|
||||
configCmdData.setMaxClientCnxns(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "minSessionTimeout":
|
||||
configCmdData.setMinSessionTimeout(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "maxSessionTimeout":
|
||||
configCmdData.setMaxSessionTimeout(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "serverId":
|
||||
configCmdData.setServerId(Integer.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "initLimit":
|
||||
configCmdData.setInitLimit(elem.getValue());
|
||||
break;
|
||||
case "syncLimit":
|
||||
configCmdData.setSyncLimit(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "electionAlg":
|
||||
configCmdData.setElectionAlg(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "electionPort":
|
||||
configCmdData.setElectionPort(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "quorumPort":
|
||||
configCmdData.setQuorumPort(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "peerType":
|
||||
configCmdData.setPeerType(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
default:
|
||||
LOGGER.warn(
|
||||
"class=ConfigCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
|
||||
elem.getKey(), elem.getValue()
|
||||
);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"class=ConfigCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
|
||||
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return configCmdData;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
|
||||
|
||||
/**
|
||||
* 四字命令结果解析类
|
||||
*/
|
||||
public interface FourLetterWordDataParser<T> {
|
||||
String getCmd();
|
||||
|
||||
T parseAndInitData(Long clusterPhyId, String host, int port, String cmdData);
|
||||
}
|
||||
@@ -0,0 +1,117 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT
|
||||
* zk_avg_latency 0
|
||||
* zk_max_latency 399
|
||||
* zk_min_latency 0
|
||||
* zk_packets_received 234857
|
||||
* zk_packets_sent 234860
|
||||
* zk_num_alive_connections 4
|
||||
* zk_outstanding_requests 0
|
||||
* zk_server_state follower
|
||||
* zk_znode_count 35566
|
||||
* zk_watch_count 39
|
||||
* zk_ephemerals_count 10
|
||||
* zk_approximate_data_size 3356708
|
||||
* zk_open_file_descriptor_count 35
|
||||
* zk_max_file_descriptor_count 819200
|
||||
*/
|
||||
@Data
|
||||
public class MonitorCmdDataParser implements FourLetterWordDataParser<MonitorCmdData> {
|
||||
private static final ILog LOGGER = LogFactory.getLog(MonitorCmdDataParser.class);
|
||||
|
||||
@Override
|
||||
public String getCmd() {
|
||||
return FourLetterWordUtil.MonitorCmd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MonitorCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
|
||||
Map<String, String> dataMap = new HashMap<>();
|
||||
for (String elem : cmdData.split("\n")) {
|
||||
if (elem.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int idx = elem.indexOf('\t');
|
||||
if (idx >= 0) {
|
||||
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
|
||||
}
|
||||
}
|
||||
|
||||
MonitorCmdData monitorCmdData = new MonitorCmdData();
|
||||
dataMap.entrySet().stream().forEach(elem -> {
|
||||
try {
|
||||
switch (elem.getKey()) {
|
||||
case "zk_version":
|
||||
monitorCmdData.setZkVersion(elem.getValue().split("-")[0]);
|
||||
break;
|
||||
case "zk_avg_latency":
|
||||
monitorCmdData.setZkAvgLatency(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_max_latency":
|
||||
monitorCmdData.setZkMaxLatency(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_min_latency":
|
||||
monitorCmdData.setZkMinLatency(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_packets_received":
|
||||
monitorCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_packets_sent":
|
||||
monitorCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_num_alive_connections":
|
||||
monitorCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_outstanding_requests":
|
||||
monitorCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_server_state":
|
||||
monitorCmdData.setZkServerState(elem.getValue());
|
||||
break;
|
||||
case "zk_znode_count":
|
||||
monitorCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_watch_count":
|
||||
monitorCmdData.setZkWatchCount(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_ephemerals_count":
|
||||
monitorCmdData.setZkEphemeralsCount(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_approximate_data_size":
|
||||
monitorCmdData.setZkApproximateDataSize(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_open_file_descriptor_count":
|
||||
monitorCmdData.setZkOpenFileDescriptorCount(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "zk_max_file_descriptor_count":
|
||||
monitorCmdData.setZkMaxFileDescriptorCount(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
default:
|
||||
LOGGER.warn(
|
||||
"class=MonitorCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
|
||||
elem.getKey(), elem.getValue()
|
||||
);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"class=MonitorCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
|
||||
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return monitorCmdData;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
|
||||
* Latency min/avg/max: 0/0/2209
|
||||
* Received: 278202469
|
||||
* Sent: 279449055
|
||||
* Connections: 31
|
||||
* Outstanding: 0
|
||||
* Zxid: 0x20033fc12
|
||||
* Mode: leader
|
||||
* Node count: 10084
|
||||
* Proposal sizes last/min/max: 36/32/31260 leader特有
|
||||
*/
|
||||
@Data
|
||||
public class ServerCmdDataParser implements FourLetterWordDataParser<ServerCmdData> {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ServerCmdDataParser.class);
|
||||
|
||||
@Override
|
||||
public String getCmd() {
|
||||
return FourLetterWordUtil.ServerCmd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
|
||||
Map<String, String> dataMap = new HashMap<>();
|
||||
for (String elem : cmdData.split("\n")) {
|
||||
if (elem.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int idx = elem.indexOf(':');
|
||||
if (idx >= 0) {
|
||||
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
|
||||
}
|
||||
}
|
||||
|
||||
ServerCmdData serverCmdData = new ServerCmdData();
|
||||
dataMap.entrySet().stream().forEach(elem -> {
|
||||
try {
|
||||
switch (elem.getKey()) {
|
||||
case "Zookeeper version":
|
||||
serverCmdData.setZkVersion(elem.getValue().split("-")[0]);
|
||||
break;
|
||||
case "Latency min/avg/max":
|
||||
String[] data = elem.getValue().split("/");
|
||||
serverCmdData.setZkMinLatency(Long.valueOf(data[0]));
|
||||
serverCmdData.setZkAvgLatency(Long.valueOf(data[1]));
|
||||
serverCmdData.setZkMaxLatency(Long.valueOf(data[2]));
|
||||
break;
|
||||
case "Received":
|
||||
serverCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "Sent":
|
||||
serverCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "Connections":
|
||||
serverCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "Outstanding":
|
||||
serverCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "Mode":
|
||||
serverCmdData.setZkServerState(elem.getValue());
|
||||
break;
|
||||
case "Node count":
|
||||
serverCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
|
||||
break;
|
||||
case "Zxid":
|
||||
serverCmdData.setZkZxid(Long.parseUnsignedLong(elem.getValue().trim().substring(2), 16));
|
||||
break;
|
||||
default:
|
||||
LOGGER.warn(
|
||||
"class=ServerCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
|
||||
elem.getKey(), elem.getValue()
|
||||
);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"class=ServerCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
|
||||
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return serverCmdData;
|
||||
}
|
||||
}
|
||||
@@ -8,8 +8,6 @@ import org.springframework.context.ApplicationEvent;
|
||||
*/
|
||||
@Getter
|
||||
public class BaseMetricEvent extends ApplicationEvent {
|
||||
|
||||
|
||||
public BaseMetricEvent(Object source) {
|
||||
super( source );
|
||||
}
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.event.metric;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Getter
|
||||
public class ZookeeperMetricEvent extends BaseMetricEvent {
|
||||
|
||||
private List<ZookeeperMetrics> zookeeperMetrics;
|
||||
|
||||
public ZookeeperMetricEvent(Object source, List<ZookeeperMetrics> zookeeperMetrics) {
|
||||
super( source );
|
||||
this.zookeeperMetrics = zookeeperMetrics;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.po.metrice;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.utils.CommonUtils.monitorTimestamp2min;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class ZookeeperMetricPO extends BaseMetricESPO {
|
||||
public ZookeeperMetricPO(Long clusterPhyId){
|
||||
super(clusterPhyId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return "ZK@" + clusterPhyId + "@" + monitorTimestamp2min(timestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRoutingValue() {
|
||||
return String.valueOf(clusterPhyId);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "zookeeper")
|
||||
public class ZookeeperInfoPO extends BasePO {
|
||||
/**
|
||||
* 集群Id
|
||||
*/
|
||||
private Long clusterPhyId;
|
||||
|
||||
/**
|
||||
* 主机
|
||||
*/
|
||||
private String host;
|
||||
|
||||
/**
|
||||
* 端口
|
||||
*/
|
||||
private Integer port;
|
||||
|
||||
/**
|
||||
* 角色
|
||||
*/
|
||||
private String role;
|
||||
|
||||
/**
|
||||
* 版本
|
||||
*/
|
||||
private String version;
|
||||
|
||||
/**
|
||||
* ZK状态
|
||||
*/
|
||||
private Integer status;
|
||||
}
|
||||
@@ -1,16 +1,12 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
@@ -26,19 +22,4 @@ public class MetricMultiLinesVO {
|
||||
|
||||
@ApiModelProperty(value = "指标名称对应的指标线")
|
||||
private List<MetricLineVO> metricLines;
|
||||
|
||||
public List<MetricPointVO> getMetricPoints(String resName) {
|
||||
if (ValidateUtils.isNull(metricLines)) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
List<MetricLineVO> voList = metricLines.stream().filter(elem -> elem.getName().equals(resName)).collect(Collectors.toList());
|
||||
if (ValidateUtils.isEmptyList(voList)) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
// 仅获取idx=0的指标
|
||||
return voList.get(0).getMetricPoints();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @author wyc
|
||||
* @date 2022/9/23
|
||||
*/
|
||||
@Data
|
||||
@ApiModel(description = "Zookeeper信息概览")
|
||||
public class ClusterZookeepersOverviewVO {
|
||||
@ApiModelProperty(value = "主机ip", example = "121.0.0.1")
|
||||
private String host;
|
||||
|
||||
@ApiModelProperty(value = "端口号", example = "2416")
|
||||
private Integer port;
|
||||
|
||||
@ApiModelProperty(value = "版本", example = "1.1.2")
|
||||
private String version;
|
||||
|
||||
@ApiModelProperty(value = "角色", example = "Leader")
|
||||
private String role;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
|
||||
/**
|
||||
* @author wyc
|
||||
* @date 2022/9/23
|
||||
*/
|
||||
@Data
|
||||
@ApiModel(description = "ZK状态信息")
|
||||
public class ClusterZookeepersStateVO {
|
||||
@ApiModelProperty(value = "健康检查状态", example = "1")
|
||||
private Integer healthState;
|
||||
|
||||
@ApiModelProperty(value = "健康检查通过数", example = "1")
|
||||
private Integer healthCheckPassed;
|
||||
|
||||
@ApiModelProperty(value = "健康检查总数", example = "1")
|
||||
private Integer healthCheckTotal;
|
||||
|
||||
@ApiModelProperty(value = "ZK的Leader机器", example = "127.0.0.1")
|
||||
private String leaderNode;
|
||||
|
||||
@ApiModelProperty(value = "Watch数", example = "123456")
|
||||
private Integer watchCount;
|
||||
|
||||
@ApiModelProperty(value = "节点存活数", example = "8")
|
||||
private Integer aliveServerCount;
|
||||
|
||||
@ApiModelProperty(value = "总节点数", example = "10")
|
||||
private Integer totalServerCount;
|
||||
|
||||
@ApiModelProperty(value = "Follower角色存活数", example = "8")
|
||||
private Integer aliveFollowerCount;
|
||||
|
||||
@ApiModelProperty(value = "Follower角色总数", example = "10")
|
||||
private Integer totalFollowerCount;
|
||||
|
||||
@ApiModelProperty(value = "Observer角色存活数", example = "3")
|
||||
private Integer aliveObserverCount;
|
||||
|
||||
@ApiModelProperty(value = "Observer角色总数", example = "3")
|
||||
private Integer totalObserverCount;
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @author wyc
|
||||
* @date 2022/9/23
|
||||
*/
|
||||
@Data
|
||||
public class ZnodeStatVO {
|
||||
@ApiModelProperty(value = "节点被创建时的事物的ID", example = "0x1f09")
|
||||
private Long czxid;
|
||||
|
||||
@ApiModelProperty(value = "创建时间", example = "Sat Mar 16 15:38:34 CST 2019")
|
||||
private Long ctime;
|
||||
|
||||
@ApiModelProperty(value = "节点最后一次被修改时的事物的ID", example = "0x1f09")
|
||||
private Long mzxid;
|
||||
|
||||
@ApiModelProperty(value = "最后一次修改时间", example = "Sat Mar 16 15:38:34 CST 2019")
|
||||
private Long mtime;
|
||||
|
||||
@ApiModelProperty(value = "子节点列表最近一次呗修改的事物ID", example = "0x31")
|
||||
private Long pzxid;
|
||||
|
||||
@ApiModelProperty(value = "子节点版本号", example = "0")
|
||||
private Integer cversion;
|
||||
|
||||
@ApiModelProperty(value = "数据版本号", example = "0")
|
||||
private Integer version;
|
||||
|
||||
@ApiModelProperty(value = "ACL版本号", example = "0")
|
||||
private Integer aversion;
|
||||
|
||||
@ApiModelProperty(value = "创建临时节点的事物ID,持久节点事物为0", example = "0")
|
||||
private Long ephemeralOwner;
|
||||
|
||||
@ApiModelProperty(value = "数据长度,每个节点都可保存数据", example = "22")
|
||||
private Integer dataLength;
|
||||
|
||||
@ApiModelProperty(value = "子节点的个数", example = "6")
|
||||
private Integer numChildren;
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @author wyc
|
||||
* @date 2022/9/23
|
||||
*/
|
||||
@Data
|
||||
public class ZnodeVO {
|
||||
|
||||
@ApiModelProperty(value = "节点名称", example = "broker")
|
||||
private String name;
|
||||
|
||||
@ApiModelProperty(value = "节点数据", example = "saassad")
|
||||
private String data;
|
||||
|
||||
@ApiModelProperty(value = "节点属性", example = "")
|
||||
private ZnodeStatVO stat;
|
||||
|
||||
}
|
||||
@@ -23,8 +23,8 @@ public class Constant {
|
||||
public static final Integer YES = 1;
|
||||
public static final Integer NO = 0;
|
||||
|
||||
public static final Integer ALIVE = 1;
|
||||
public static final Integer DOWN = 0;
|
||||
public static final Integer ALIVE = 1;
|
||||
public static final Integer DOWN = 0;
|
||||
|
||||
public static final Integer ONE_HUNDRED = 100;
|
||||
|
||||
@@ -33,6 +33,7 @@ public class Constant {
|
||||
public static final Long B_TO_MB = 1024L * 1024L;
|
||||
|
||||
public static final Integer DEFAULT_SESSION_TIMEOUT_UNIT_MS = 15000;
|
||||
public static final Integer DEFAULT_REQUEST_TIMEOUT_UNIT_MS = 5000;
|
||||
|
||||
public static final Float MIN_HEALTH_SCORE = 10f;
|
||||
|
||||
@@ -66,4 +67,5 @@ public class Constant {
|
||||
|
||||
public static final Integer DEFAULT_RETRY_TIME = 3;
|
||||
|
||||
public static final Integer ZK_ALIVE_BUT_4_LETTER_FORBIDDEN = 11;
|
||||
}
|
||||
|
||||
@@ -34,6 +34,8 @@ public class ESConstant {
|
||||
|
||||
public static final String TOTAL = "total";
|
||||
|
||||
public static final Integer DEFAULT_RETRY_TIME = 3;
|
||||
|
||||
private ESConstant() {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -644,4 +644,89 @@ public class ESIndexConstant {
|
||||
" \"aliases\" : { }\n" +
|
||||
" }";
|
||||
|
||||
public final static String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric";
|
||||
public final static String ZOOKEEPER_TEMPLATE = "{\n" +
|
||||
" \"order\" : 10,\n" +
|
||||
" \"index_patterns\" : [\n" +
|
||||
" \"ks_kafka_zookeeper_metric*\"\n" +
|
||||
" ],\n" +
|
||||
" \"settings\" : {\n" +
|
||||
" \"index\" : {\n" +
|
||||
" \"number_of_shards\" : \"10\"\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"mappings\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"routingValue\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"clusterPhyId\" : {\n" +
|
||||
" \"type\" : \"long\"\n" +
|
||||
" },\n" +
|
||||
" \"metrics\" : {\n" +
|
||||
" \"properties\" : {\n" +
|
||||
" \"AvgRequestLatency\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"MinRequestLatency\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"MaxRequestLatency\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"OutstandingRequests\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"NodeCount\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"WatchCount\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"NumAliveConnections\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"PacketsReceived\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"PacketsSent\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"EphemeralsCount\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"ApproximateDataSize\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"OpenFileDescriptorCount\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" },\n" +
|
||||
" \"MaxFileDescriptorCount\" : {\n" +
|
||||
" \"type\" : \"double\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"key\" : {\n" +
|
||||
" \"type\" : \"text\",\n" +
|
||||
" \"fields\" : {\n" +
|
||||
" \"keyword\" : {\n" +
|
||||
" \"ignore_above\" : 256,\n" +
|
||||
" \"type\" : \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"timestamp\" : {\n" +
|
||||
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
|
||||
" \"type\" : \"date\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"aliases\" : { }\n" +
|
||||
" }";
|
||||
}
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.converter;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
public class ZnodeConverter {
|
||||
ZnodeConverter(){
|
||||
|
||||
}
|
||||
|
||||
public static Znode convert2Znode(Tuple<byte[], Stat> dataAndStat, String path) {
|
||||
Znode znode = new Znode();
|
||||
znode.setStat(dataAndStat.getV2());
|
||||
znode.setData(dataAndStat.getV1() == null ? null : new String(dataAndStat.getV1()));
|
||||
znode.setName(path.substring(path.lastIndexOf('/') + 1));
|
||||
return znode;
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,9 @@ public enum VersionItemTypeEnum {
|
||||
METRIC_GROUP(102, "group_metric"),
|
||||
METRIC_BROKER(103, "broker_metric"),
|
||||
METRIC_PARTITION(104, "partition_metric"),
|
||||
METRIC_REPLICATION (105, "replication_metric"),
|
||||
METRIC_REPLICATION(105, "replication_metric"),
|
||||
|
||||
METRIC_ZOOKEEPER(110, "zookeeper_metric"),
|
||||
|
||||
/**
|
||||
* 服务端查询
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.enums.zookeeper;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public enum ZKRoleEnum {
|
||||
LEADER("leader"),
|
||||
|
||||
FOLLOWER("follower"),
|
||||
|
||||
OBSERVER("observer"),
|
||||
|
||||
UNKNOWN("unknown"),
|
||||
|
||||
;
|
||||
|
||||
private final String role;
|
||||
|
||||
ZKRoleEnum(String role) {
|
||||
this.role = role;
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,12 @@ public class JmxAttribute {
|
||||
|
||||
public static final String PERCENTILE_99 = "99thPercentile";
|
||||
|
||||
public static final String MAX = "Max";
|
||||
|
||||
public static final String MEAN = "Mean";
|
||||
|
||||
public static final String MIN = "Min";
|
||||
|
||||
public static final String VALUE = "Value";
|
||||
|
||||
public static final String CONNECTION_COUNT = "connection-count";
|
||||
|
||||
@@ -63,6 +63,12 @@ public class JmxName {
|
||||
/*********************************************************** cluster ***********************************************************/
|
||||
public static final String JMX_CLUSTER_PARTITION_UNDER_REPLICATED = "kafka.cluster:type=Partition,name=UnderReplicated";
|
||||
|
||||
/*********************************************************** zookeeper ***********************************************************/
|
||||
|
||||
public static final String JMX_ZK_REQUEST_LATENCY_MS = "kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs";
|
||||
public static final String JMX_ZK_SYNC_CONNECTS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperSyncConnectsPerSec";
|
||||
public static final String JMX_ZK_DISCONNECTORS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec";
|
||||
|
||||
private JmxName() {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,163 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.utils.zookeeper;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.FourLetterWordDataParser;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import org.apache.zookeeper.common.ClientX509Util;
|
||||
import org.apache.zookeeper.common.X509Exception;
|
||||
import org.apache.zookeeper.common.X509Util;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLSocket;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
public class FourLetterWordUtil {
|
||||
private static final ILog LOGGER = LogFactory.getLog(FourLetterWordUtil.class);
|
||||
|
||||
public static final String MonitorCmd = "mntr";
|
||||
public static final String ConfigCmd = "conf";
|
||||
public static final String ServerCmd = "srvr";
|
||||
|
||||
private static final Set<String> supportedCommands = new HashSet<>();
|
||||
|
||||
public static <T> Result<T> executeFourLetterCmd(Long clusterPhyId,
|
||||
String host,
|
||||
int port,
|
||||
boolean secure,
|
||||
int timeout,
|
||||
FourLetterWordDataParser<T> dataParser) {
|
||||
try {
|
||||
if (!supportedCommands.contains(dataParser.getCmd())) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, String.format("ZK %s命令暂未进行支持", dataParser.getCmd()));
|
||||
}
|
||||
|
||||
String cmdData = send4LetterWord(host, port, dataParser.getCmd(), secure, timeout);
|
||||
if (cmdData.contains("not executed because it is not in the whitelist.")) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN, cmdData);
|
||||
}
|
||||
if (ValidateUtils.isBlank(cmdData)) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, cmdData);
|
||||
}
|
||||
|
||||
return Result.buildSuc(dataParser.parseAndInitData(clusterPhyId, host, port, cmdData));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"class=FourLetterWordUtil||method=executeFourLetterCmd||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
|
||||
clusterPhyId, host, port, dataParser.getCmd(), secure, timeout, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private static String send4LetterWord(
|
||||
String host,
|
||||
int port,
|
||||
String cmd,
|
||||
boolean secure,
|
||||
int timeout) throws IOException, X509Exception.SSLContextException {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
LOGGER.info("connecting to {} {}", host, port);
|
||||
|
||||
Socket socket = null;
|
||||
OutputStream outputStream = null;
|
||||
BufferedReader bufferedReader = null;
|
||||
try {
|
||||
InetSocketAddress hostaddress = host != null
|
||||
? new InetSocketAddress(host, port)
|
||||
: new InetSocketAddress(InetAddress.getByName(null), port);
|
||||
if (secure) {
|
||||
LOGGER.info("using secure socket");
|
||||
try (X509Util x509Util = new ClientX509Util()) {
|
||||
SSLContext sslContext = x509Util.getDefaultSSLContext();
|
||||
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
|
||||
SSLSocket sslSock = (SSLSocket) socketFactory.createSocket();
|
||||
sslSock.connect(hostaddress, timeout);
|
||||
sslSock.startHandshake();
|
||||
socket = sslSock;
|
||||
}
|
||||
} else {
|
||||
socket = new Socket();
|
||||
socket.connect(hostaddress, timeout);
|
||||
}
|
||||
socket.setSoTimeout(timeout);
|
||||
|
||||
outputStream = socket.getOutputStream();
|
||||
outputStream.write(cmd.getBytes());
|
||||
outputStream.flush();
|
||||
|
||||
// 等待InputStream有数据
|
||||
while (System.currentTimeMillis() - startTime <= timeout && socket.getInputStream().available() <= 0) {
|
||||
BackoffUtils.backoff(10);
|
||||
}
|
||||
|
||||
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String line;
|
||||
while ((line = bufferedReader.readLine()) != null) {
|
||||
sb.append(line).append("\n");
|
||||
}
|
||||
return sb.toString();
|
||||
} catch (SocketTimeoutException e) {
|
||||
throw new IOException("Exception while executing four letter word: " + cmd, e);
|
||||
} finally {
|
||||
if (outputStream != null) {
|
||||
try {
|
||||
outputStream.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.error(
|
||||
"class=FourLetterWordUtil||method=send4LetterWord||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
|
||||
host, port, cmd, secure, timeout, e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (bufferedReader != null) {
|
||||
try {
|
||||
bufferedReader.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.error(
|
||||
"class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
|
||||
host, port, cmd, secure, timeout, e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (socket != null) {
|
||||
try {
|
||||
socket.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.error(
|
||||
"class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
|
||||
host, port, cmd, secure, timeout, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static {
|
||||
supportedCommands.add(MonitorCmd);
|
||||
supportedCommands.add(ConfigCmd);
|
||||
supportedCommands.add(ServerCmd);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
package com.xiaojukeji.know.streaming.km.common.utils.zookeeper;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import org.apache.zookeeper.client.ConnectStringParser;
|
||||
import org.apache.zookeeper.common.NetUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.zookeeper.common.StringUtils.split;
|
||||
|
||||
public class ZookeeperUtils {
|
||||
private static final int DEFAULT_PORT = 2181;
|
||||
|
||||
/**
|
||||
* 解析ZK地址
|
||||
* @see ConnectStringParser
|
||||
*/
|
||||
public static List<Tuple<String, Integer>> connectStringParser(String connectString) throws Exception {
|
||||
List<Tuple<String, Integer>> ipPortList = new ArrayList<>();
|
||||
|
||||
if (connectString == null) {
|
||||
return ipPortList;
|
||||
}
|
||||
|
||||
// parse out chroot, if any
|
||||
int off = connectString.indexOf('/');
|
||||
if (off >= 0) {
|
||||
connectString = connectString.substring(0, off);
|
||||
}
|
||||
|
||||
List<String> hostsList = split(connectString, ",");
|
||||
for (String host : hostsList) {
|
||||
int port = DEFAULT_PORT;
|
||||
String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
|
||||
if (hostAndPort.length != 0) {
|
||||
host = hostAndPort[0];
|
||||
if (hostAndPort.length == 2) {
|
||||
port = Integer.parseInt(hostAndPort[1]);
|
||||
}
|
||||
} else {
|
||||
int pidx = host.lastIndexOf(':');
|
||||
if (pidx >= 0) {
|
||||
// otherwise : is at the end of the string, ignore
|
||||
if (pidx < host.length() - 1) {
|
||||
port = Integer.parseInt(host.substring(pidx + 1));
|
||||
}
|
||||
host = host.substring(0, pidx);
|
||||
}
|
||||
}
|
||||
|
||||
ipPortList.add(new Tuple<>(host, port));
|
||||
}
|
||||
|
||||
return ipPortList;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -343,17 +343,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
|
||||
|
||||
private Broker getStartTimeAndBuildBroker(Long clusterPhyId, Node newNode, JmxConfig jmxConfig) {
|
||||
try {
|
||||
Object object = jmxDAO.getJmxValue(
|
||||
clusterPhyId,
|
||||
newNode.id(),
|
||||
newNode.host(),
|
||||
null,
|
||||
jmxConfig,
|
||||
new ObjectName("java.lang:type=Runtime"),
|
||||
"StartTime"
|
||||
);
|
||||
Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), null, jmxConfig);
|
||||
|
||||
return Broker.buildFrom(clusterPhyId, newNode, object != null? (Long) object: null);
|
||||
return Broker.buildFrom(clusterPhyId, newNode, startTime);
|
||||
} catch (Exception e) {
|
||||
log.error("class=BrokerServiceImpl||method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,141 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.version.metrics;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_ZOOKEEPER;
|
||||
import static com.xiaojukeji.know.streaming.km.common.jmx.JmxAttribute.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.jmx.JmxName.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl.ZookeeperMetricServiceImpl.*;
|
||||
|
||||
@Component
|
||||
public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric {
|
||||
|
||||
/**
|
||||
* 性能
|
||||
*/
|
||||
public static final String ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY = "AvgRequestLatency";
|
||||
public static final String ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY = "MinRequestLatency";
|
||||
public static final String ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY = "MaxRequestLatency";
|
||||
public static final String ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS = "OutstandingRequests";
|
||||
public static final String ZOOKEEPER_METRIC_NODE_COUNT = "NodeCount";
|
||||
public static final String ZOOKEEPER_METRIC_WATCH_COUNT = "WatchCount";
|
||||
public static final String ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS = "NumAliveConnections";
|
||||
public static final String ZOOKEEPER_METRIC_PACKETS_RECEIVED = "PacketsReceived";
|
||||
public static final String ZOOKEEPER_METRIC_PACKETS_SENT = "PacketsSent";
|
||||
public static final String ZOOKEEPER_METRIC_EPHEMERALS_COUNT = "EphemeralsCount";
|
||||
public static final String ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE = "ApproximateDataSize";
|
||||
public static final String ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT = "OpenFileDescriptorCount";
|
||||
public static final String ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT = "MaxFileDescriptorCount";
|
||||
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC = "KafkaZKDisconnectsPerSec";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC = "KafkaZKSyncConnectsPerSec";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH = "KafkaZKRequestLatencyMs_99thPercentile";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX = "KafkaZKRequestLatencyMs_Max";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN = "KafkaZKRequestLatencyMs_Mean";
|
||||
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN = "KafkaZKRequestLatencyMs_Min";
|
||||
|
||||
|
||||
public static final String ZOOKEEPER_METRIC_COLLECT_COST_TIME = Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME;
|
||||
|
||||
@Override
|
||||
public int versionItemType() {
|
||||
return METRIC_ZOOKEEPER.getCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<VersionMetricControlItem> init(){
|
||||
List<VersionMetricControlItem> items = new ArrayList<>();
|
||||
|
||||
// 性能指标
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY).unit("ms").desc("平均响应延迟").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY).unit("ms").desc("最小响应延迟").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY).unit("ms").desc("最大响应延迟").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS).unit("个").desc("堆积请求数").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_NODE_COUNT).unit("个").desc("ZNode数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_WATCH_COUNT).unit("个").desc("Watch数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS).unit("个").desc("客户端连接数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_PACKETS_RECEIVED).unit("个").desc("接受包的数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_PACKETS_SENT).unit("个").desc("发送包的数量").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_EPHEMERALS_COUNT).unit("个").desc("临时节点数").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE).unit("byte").desc("文件大小(近似值)").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT).unit("个").desc("已打开的文件描述符数").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT).unit("个").desc("允许打开的最大文件描述符数").category(CATEGORY_PERFORMANCE)
|
||||
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
|
||||
|
||||
// JMX指标
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH).unit("ms").desc("ZK请求99分位延迟").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(PERCENTILE_99)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX).unit("ms").desc("ZK请求最大延迟").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MAX)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN).unit("ms").desc("ZK请求最小延迟").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MIN)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN).unit("ms").desc("ZK请求平均延迟").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MEAN)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC).unit("个").desc("断开连接数").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_DISCONNECTORS_PER_SEC ).jmxAttribute(RATE_MIN_1)));
|
||||
|
||||
items.add(buildAllVersionsItem()
|
||||
.name(ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC).unit("个").desc("同步连接数").category(CATEGORY_CLIENT)
|
||||
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
|
||||
.jmxObjectName( JMX_ZK_SYNC_CONNECTS_PER_SEC ).jmxAttribute(RATE_MIN_1)));
|
||||
return items;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.zookeeper;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface ZnodeService {
|
||||
|
||||
Result<List<String>> listZnodeChildren(Long clusterPhyId, String path, String keyword);
|
||||
|
||||
Result<Znode> getZnode(Long clusterPhyId, String path);
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.zookeeper;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface ZookeeperMetricService {
|
||||
/**
|
||||
* ZK指标获取
|
||||
* @param param 参数,因为ZK 四字命令在使用时,是短连接,所以参数内容会复杂一些,后续可以考虑优化为长连接
|
||||
* @return
|
||||
*/
|
||||
Result<ZookeeperMetrics> collectMetricsFromZookeeper(ZookeeperMetricParam param);
|
||||
Result<ZookeeperMetrics> batchCollectMetricsFromZookeeper(Long clusterPhyId, List<String> metricNameList);
|
||||
|
||||
Result<List<MetricLineVO>> listMetricsFromES(Long clusterPhyId, MetricDTO dto);
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.zookeeper;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface ZookeeperService {
|
||||
/**
|
||||
* 从ZK集群中获取ZK信息
|
||||
*/
|
||||
Result<List<ZookeeperInfo>> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig);
|
||||
|
||||
void batchReplaceDataInDB(Long clusterPhyId, List<ZookeeperInfo> infoList);
|
||||
|
||||
List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId);
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.ZnodeConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Service
|
||||
public class ZnodeServiceImpl implements ZnodeService {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ZnodeServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
private KafkaZKDAO kafkaZKDAO;
|
||||
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Result<List<String>> listZnodeChildren(Long clusterPhyId, String path, String keyword) {
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
|
||||
if (clusterPhy == null) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
|
||||
}
|
||||
|
||||
List<String> children;
|
||||
try {
|
||||
children = kafkaZKDAO.getChildren(clusterPhyId, path, false);
|
||||
} catch (NotExistException e) {
|
||||
LOGGER.error("class=ZnodeServiceImpl||method=listZnodeChildren||clusterPhyId={}||errMsg={}", clusterPhyId, "create ZK client create failed");
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, "ZK客户端创建失败");
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=ZnodeServiceImpl||method=listZnodeChildren||clusterPhyId={}||errMsg={}", clusterPhyId, "ZK operate failed");
|
||||
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, "ZK操作失败");
|
||||
}
|
||||
|
||||
//关键字搜索
|
||||
if (keyword != null) {
|
||||
children = children.stream().filter(elem -> elem.contains(keyword)).collect(Collectors.toList());
|
||||
}
|
||||
return Result.buildSuc(children);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Znode> getZnode(Long clusterPhyId, String path) {
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
|
||||
if (clusterPhy == null) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
|
||||
}
|
||||
|
||||
//获取zookeeper上的原始数据
|
||||
Tuple<byte[], Stat> dataAndStat;
|
||||
try {
|
||||
dataAndStat = kafkaZKDAO.getDataAndStat(clusterPhyId, path);
|
||||
} catch (NotExistException e) {
|
||||
LOGGER.error("class=ZnodeServiceImpl||method=getZnode||clusterPhyId={}||errMsg={}", clusterPhyId, "create ZK client create failed");
|
||||
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, "ZK客户端创建失败");
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=ZnodeServiceImpl||method=getZnode||clusterPhyId={}||errMsg={}", clusterPhyId, "ZK operate failed");
|
||||
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, "ZK操作失败");
|
||||
}
|
||||
|
||||
return Result.buildSuc(ZnodeConverter.convert2Znode(dataAndStat, path));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,281 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl;
|
||||
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.MonitorCmdDataParser;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
|
||||
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ZookeeperMetricESDAO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_JMX_CONNECT_ERROR;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems.*;
|
||||
|
||||
|
||||
@Service
|
||||
public class ZookeeperMetricServiceImpl extends BaseMetricService implements ZookeeperMetricService {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ZookeeperMetricServiceImpl.class);
|
||||
|
||||
public static final String ZOOKEEPER_METHOD_DO_NOTHING = "doNothing";
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD = "getMetricFromMonitorCmd";
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD = "getMetricFromServerCmd";
|
||||
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX = "getMetricFromKafkaByJMX";
|
||||
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperService zookeeperService;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperMetricESDAO zookeeperMetricESDAO;
|
||||
|
||||
@Autowired
|
||||
private KafkaJMXClient kafkaJMXClient;
|
||||
|
||||
@Autowired
|
||||
private KafkaControllerService kafkaControllerService;
|
||||
|
||||
@Override
|
||||
protected VersionItemTypeEnum getVersionItemType() {
|
||||
return VersionItemTypeEnum.METRIC_ZOOKEEPER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> listMetricPOFields(){
|
||||
return BeanUtil.listBeanFields(ZookeeperMetricPO.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initRegisterVCHandler(){
|
||||
registerVCHandler( ZOOKEEPER_METHOD_DO_NOTHING, this::doNothing);
|
||||
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD, this::getMetricFromMonitorCmd);
|
||||
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD, this::getMetricFromServerCmd);
|
||||
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX, this::getMetricFromKafkaByJMX);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<ZookeeperMetrics> collectMetricsFromZookeeper(ZookeeperMetricParam param) {
|
||||
try {
|
||||
return (Result<ZookeeperMetrics>)doVCHandler(param.getClusterPhyId(), param.getMetricName(), param);
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<ZookeeperMetrics> batchCollectMetricsFromZookeeper(Long clusterPhyId, List<String> metricNameList) {
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
|
||||
if (null == clusterPhy) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
|
||||
}
|
||||
|
||||
List<ZookeeperInfo> aliveZKList = zookeeperService.listFromDBByCluster(clusterPhyId).stream()
|
||||
.filter(elem -> Constant.ALIVE.equals(elem.getStatus()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (ValidateUtils.isEmptyList(aliveZKList)) {
|
||||
// 没有指标可以获取
|
||||
return Result.buildSuc(new ZookeeperMetrics(clusterPhyId));
|
||||
}
|
||||
|
||||
// 构造参数
|
||||
ZookeeperMetricParam param = new ZookeeperMetricParam(
|
||||
clusterPhyId,
|
||||
aliveZKList.stream().map(elem -> new Tuple<String, Integer>(elem.getHost(), elem.getPort())).collect(Collectors.toList()),
|
||||
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class),
|
||||
null
|
||||
);
|
||||
|
||||
ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId);
|
||||
for(String metricName : metricNameList) {
|
||||
try {
|
||||
if(metrics.getMetrics().containsKey(metricName)) {
|
||||
continue;
|
||||
}
|
||||
param.setMetricName(metricName);
|
||||
|
||||
Result<ZookeeperMetrics> ret = this.collectMetricsFromZookeeper(param);
|
||||
if(null == ret || ret.failed() || null == ret.getData()){
|
||||
continue;
|
||||
}
|
||||
|
||||
metrics.putMetric(ret.getData().getMetrics());
|
||||
} catch (Exception e){
|
||||
LOGGER.error(
|
||||
"class=ZookeeperMetricServiceImpl||method=collectMetricsFromZookeeper||clusterPhyId={}||metricName={}||errMsg=exception!",
|
||||
clusterPhyId, metricName, e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return Result.buildSuc(metrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<List<MetricLineVO>> listMetricsFromES(Long clusterPhyId, MetricDTO dto) {
|
||||
Map<String/*metricName*/, List<MetricPointVO>> pointVOMap = zookeeperMetricESDAO.listMetricsByClusterPhyId(
|
||||
clusterPhyId,
|
||||
dto.getMetricsNames(),
|
||||
dto.getAggType(),
|
||||
dto.getStartTime(),
|
||||
dto.getEndTime()
|
||||
);
|
||||
|
||||
// 格式转化
|
||||
List<MetricLineVO> voList = new ArrayList<>();
|
||||
pointVOMap.entrySet().stream().forEach(entry ->
|
||||
voList.add(new MetricLineVO(String.valueOf(clusterPhyId), entry.getKey(), entry.getValue()))
|
||||
);
|
||||
return Result.buildSuc(voList);
|
||||
}
|
||||
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private Result<ZookeeperMetrics> getMetricFromServerCmd(VersionItemParam metricParam) {
|
||||
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
|
||||
|
||||
Result<ZookeeperMetrics> rz = null;
|
||||
for (Tuple<String, Integer> hostPort: param.getZkAddressList()) {
|
||||
Result<ServerCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
|
||||
param.getClusterPhyId(),
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
|
||||
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
|
||||
new ServerCmdDataParser()
|
||||
);
|
||||
|
||||
if (cmdDataResult.failed()) {
|
||||
rz = Result.buildFromIgnoreData(cmdDataResult);
|
||||
continue;
|
||||
}
|
||||
|
||||
ServerCmdData cmdData = cmdDataResult.getData();
|
||||
|
||||
ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue());
|
||||
|
||||
return Result.buildSuc(metrics);
|
||||
}
|
||||
|
||||
return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId()));
|
||||
}
|
||||
|
||||
private Result<ZookeeperMetrics> getMetricFromMonitorCmd(VersionItemParam metricParam) {
|
||||
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
|
||||
|
||||
Result<ZookeeperMetrics> rz = null;
|
||||
for (Tuple<String, Integer> hostPort: param.getZkAddressList()) {
|
||||
Result<MonitorCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
|
||||
param.getClusterPhyId(),
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
|
||||
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
|
||||
new MonitorCmdDataParser()
|
||||
);
|
||||
|
||||
if (cmdDataResult.failed()) {
|
||||
rz = Result.buildFromIgnoreData(cmdDataResult);
|
||||
continue;
|
||||
}
|
||||
|
||||
MonitorCmdData cmdData = cmdDataResult.getData();
|
||||
|
||||
ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_WATCH_COUNT, cmdData.getZkWatchCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_EPHEMERALS_COUNT, cmdData.getZkEphemeralsCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE, cmdData.getZkApproximateDataSize().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT, cmdData.getZkOpenFileDescriptorCount().floatValue());
|
||||
metrics.putMetric(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT, cmdData.getZkMaxFileDescriptorCount().floatValue());
|
||||
|
||||
return Result.buildSuc(metrics);
|
||||
}
|
||||
|
||||
return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId()));
|
||||
}
|
||||
|
||||
private Result<ZookeeperMetrics> doNothing(VersionItemParam metricParam) {
|
||||
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
|
||||
return Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId()));
|
||||
}
|
||||
|
||||
private Result<ZookeeperMetrics> getMetricFromKafkaByJMX(VersionItemParam metricParam) {
|
||||
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
|
||||
|
||||
String metricName = param.getMetricName();
|
||||
Long clusterPhyId = param.getClusterPhyId();
|
||||
Integer kafkaControllerId = param.getKafkaControllerId();
|
||||
|
||||
//1、获取jmx的属性信息
|
||||
VersionJmxInfo jmxInfo = getJMXInfo(clusterPhyId, metricName);
|
||||
if(null == jmxInfo) {
|
||||
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
|
||||
}
|
||||
|
||||
//2、获取jmx连接
|
||||
JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterPhyId, kafkaControllerId);
|
||||
if (ValidateUtils.isNull(jmxConnectorWrap)) {
|
||||
return Result.buildFailure(VC_JMX_INIT_ERROR);
|
||||
}
|
||||
|
||||
try {
|
||||
//2、获取jmx指标
|
||||
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString();
|
||||
|
||||
return Result.buildSuc(ZookeeperMetrics.initWithMetric(clusterPhyId, metricName, Float.valueOf(value)));
|
||||
} catch (Exception e) {
|
||||
return Result.buildFailure(VC_JMX_CONNECT_ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,147 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper.ZookeeperInfoPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.ZookeeperUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper.ZookeeperDAO;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Service
|
||||
public class ZookeeperServiceImpl implements ZookeeperService {
|
||||
private static final ILog LOGGER = LogFactory.getLog(ZookeeperServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
private ZookeeperDAO zookeeperDAO;
|
||||
|
||||
@Override
|
||||
public Result<List<ZookeeperInfo>> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig) {
|
||||
List<Tuple<String, Integer>> addressList = null;
|
||||
try {
|
||||
addressList = ZookeeperUtils.connectStringParser(zookeeperAddress);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"class=ZookeeperServiceImpl||method=listFromZookeeperCluster||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!",
|
||||
clusterPhyId, zookeeperAddress, e
|
||||
);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, e.getMessage());
|
||||
}
|
||||
|
||||
List<ZookeeperInfo> aliveZKList = new ArrayList<>();
|
||||
for (Tuple<String, Integer> hostPort: addressList) {
|
||||
aliveZKList.add(this.getFromZookeeperCluster(
|
||||
clusterPhyId,
|
||||
hostPort.getV1(),
|
||||
hostPort.getV2(),
|
||||
zkConfig
|
||||
));
|
||||
}
|
||||
return Result.buildSuc(aliveZKList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batchReplaceDataInDB(Long clusterPhyId, List<ZookeeperInfo> infoList) {
|
||||
// DB 中的信息
|
||||
List<ZookeeperInfoPO> dbInfoList = this.listRawFromDBByCluster(clusterPhyId);
|
||||
Map<String, ZookeeperInfoPO> dbMap = new HashMap<>();
|
||||
dbInfoList.stream().forEach(elem -> dbMap.put(elem.getHost() + elem.getPort(), elem));
|
||||
|
||||
// 新获取到的信息
|
||||
List<ZookeeperInfoPO> newInfoList = ConvertUtil.list2List(infoList, ZookeeperInfoPO.class);
|
||||
for (ZookeeperInfoPO newInfo: newInfoList) {
|
||||
try {
|
||||
ZookeeperInfoPO oldInfo = dbMap.remove(newInfo.getHost() + newInfo.getPort());
|
||||
if (oldInfo == null) {
|
||||
zookeeperDAO.insert(newInfo);
|
||||
} else if (!Constant.DOWN.equals(newInfo.getStatus())) {
|
||||
// 存活时,直接使用获取到的数据
|
||||
newInfo.setId(oldInfo.getId());
|
||||
zookeeperDAO.updateById(newInfo);
|
||||
} else {
|
||||
// 如果挂了,则版本和角色信息,使用先前的信息。
|
||||
// 挂掉之后,如果角色是leader,则需要调整一下
|
||||
newInfo.setId(oldInfo.getId());
|
||||
newInfo.setRole(ZKRoleEnum.LEADER.getRole().equals(oldInfo.getRole())? ZKRoleEnum.FOLLOWER.getRole(): oldInfo.getRole());
|
||||
newInfo.setVersion(oldInfo.getVersion());
|
||||
zookeeperDAO.updateById(newInfo);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=ZookeeperServiceImpl||method=batchReplaceDataInDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterPhyId, newInfo, e);
|
||||
}
|
||||
}
|
||||
|
||||
// 删除剩余的ZK节点
|
||||
dbMap.entrySet().forEach(entry -> {
|
||||
try {
|
||||
zookeeperDAO.deleteById(entry.getValue().getId());
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=ZookeeperServiceImpl||method=batchReplaceDataInDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterPhyId, entry.getValue(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId) {
|
||||
return ConvertUtil.list2List(this.listRawFromDBByCluster(clusterPhyId), ZookeeperInfo.class);
|
||||
}
|
||||
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private List<ZookeeperInfoPO> listRawFromDBByCluster(Long clusterPhyId) {
|
||||
LambdaQueryWrapper<ZookeeperInfoPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(ZookeeperInfoPO::getClusterPhyId, clusterPhyId);
|
||||
|
||||
return zookeeperDAO.selectList(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
private ZookeeperInfo getFromZookeeperCluster(Long clusterPhyId, String host, Integer port, ZKConfig zkConfig) {
|
||||
ZookeeperInfo zookeeperInfo = new ZookeeperInfo();
|
||||
zookeeperInfo.setClusterPhyId(clusterPhyId);
|
||||
zookeeperInfo.setHost(host);
|
||||
zookeeperInfo.setPort(port);
|
||||
zookeeperInfo.setRole("");
|
||||
zookeeperInfo.setVersion("");
|
||||
zookeeperInfo.setStatus(Constant.DOWN);
|
||||
|
||||
Result<ServerCmdData> serverCmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
|
||||
clusterPhyId,
|
||||
host,
|
||||
port,
|
||||
zkConfig != null ? zkConfig.getOpenSecure(): false,
|
||||
zkConfig != null ? zkConfig.getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
|
||||
new ServerCmdDataParser()
|
||||
);
|
||||
if (serverCmdDataResult.hasData()) {
|
||||
zookeeperInfo.setRole(serverCmdDataResult.getData().getZkServerState());
|
||||
zookeeperInfo.setVersion(serverCmdDataResult.getData().getZkVersion());
|
||||
zookeeperInfo.setStatus(Constant.ALIVE);
|
||||
} else if (serverCmdDataResult.getCode().equals(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN.getCode())) {
|
||||
zookeeperInfo.setStatus(Constant.ZK_ALIVE_BUT_4_LETTER_FORBIDDEN);
|
||||
} else {
|
||||
return zookeeperInfo;
|
||||
}
|
||||
|
||||
return zookeeperInfo;
|
||||
}
|
||||
}
|
||||
@@ -355,3 +355,19 @@ CREATE TABLE `ks_km_app_node` (
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_app_host` (`app_name`,`host_name`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='km集群部署的node信息';
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS `ks_km_zookeeper`;
|
||||
CREATE TABLE `ks_km_zookeeper` (
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||
`cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '物理集群ID',
|
||||
`host` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper主机名',
|
||||
`port` int(16) NOT NULL DEFAULT '-1' COMMENT 'zookeeper端口',
|
||||
`role` int(16) NOT NULL DEFAULT '-1' COMMENT '角色, leader follower observer',
|
||||
`version` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper版本',
|
||||
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活,11存活但是4字命令使用不了',
|
||||
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_cluster_phy_id_host_port` (`cluster_phy_id`,`host`, `port`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Zookeeper信息表';
|
||||
85
km-dist/init/template/ks_kafka_zookeeper_metric
Normal file
85
km-dist/init/template/ks_kafka_zookeeper_metric
Normal file
@@ -0,0 +1,85 @@
|
||||
PUT _template/ks_kafka_zookeeper_metric
|
||||
{
|
||||
"order" : 10,
|
||||
"index_patterns" : [
|
||||
"ks_kafka_zookeeper_metric*"
|
||||
],
|
||||
"settings" : {
|
||||
"index" : {
|
||||
"number_of_shards" : "10"
|
||||
}
|
||||
},
|
||||
"mappings" : {
|
||||
"properties" : {
|
||||
"routingValue" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"ignore_above" : 256,
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
},
|
||||
"clusterPhyId" : {
|
||||
"type" : "long"
|
||||
},
|
||||
"metrics" : {
|
||||
"properties" : {
|
||||
"AvgRequestLatency" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"MinRequestLatency" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"MaxRequestLatency" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"OutstandingRequests" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"NodeCount" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"WatchCount" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"NumAliveConnections" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"PacketsReceived" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"PacketsSent" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"EphemeralsCount" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"ApproximateDataSize" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"OpenFileDescriptorCount" : {
|
||||
"type" : "double"
|
||||
},
|
||||
"MaxFileDescriptorCount" : {
|
||||
"type" : "double"
|
||||
}
|
||||
}
|
||||
},
|
||||
"key" : {
|
||||
"type" : "text",
|
||||
"fields" : {
|
||||
"keyword" : {
|
||||
"ignore_above" : 256,
|
||||
"type" : "keyword"
|
||||
}
|
||||
}
|
||||
},
|
||||
"timestamp" : {
|
||||
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
|
||||
"type" : "date"
|
||||
}
|
||||
}
|
||||
},
|
||||
"aliases" : { }
|
||||
}
|
||||
@@ -40,8 +40,7 @@ public class BaseMetricESDAO extends BaseESDAO {
|
||||
/**
|
||||
* 不同维度 kafka 监控数据
|
||||
*/
|
||||
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps
|
||||
.newConcurrentMap();
|
||||
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps.newConcurrentMap();
|
||||
|
||||
/**
|
||||
* 检查 es 索引是否存在,不存在则创建索引
|
||||
|
||||
@@ -0,0 +1,106 @@
|
||||
package com.xiaojukeji.know.streaming.km.persistence.es.dao;
|
||||
|
||||
import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse;
|
||||
import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.ESConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX;
|
||||
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_TEMPLATE;
|
||||
|
||||
@Component
|
||||
public class ZookeeperMetricESDAO extends BaseMetricESDAO {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.indexName = ZOOKEEPER_INDEX;
|
||||
super.indexTemplate = ZOOKEEPER_TEMPLATE;
|
||||
checkCurrentDayIndexExist();
|
||||
BaseMetricESDAO.register(indexName, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定集群,指定指标,一段时间内的值
|
||||
*/
|
||||
public Map<String/*metricName*/, List<MetricPointVO>> listMetricsByClusterPhyId(Long clusterPhyId,
|
||||
List<String> metricNameList,
|
||||
String aggType,
|
||||
Long startTime,
|
||||
Long endTime) {
|
||||
//1、获取需要查下的索引
|
||||
String realIndex = realIndex(startTime, endTime);
|
||||
|
||||
//2、根据查询的时间区间大小来确定指标点的聚合区间大小
|
||||
String interval = MetricsUtils.getInterval(endTime - startTime);
|
||||
|
||||
//3、构造agg查询条件
|
||||
String aggDsl = buildAggsDSL(metricNameList, aggType);
|
||||
|
||||
//4、构造dsl查询条件,开始查询
|
||||
try {
|
||||
String dsl = dslLoaderUtil.getFormatDslByFileName(
|
||||
DslsConstant.GET_ZOOKEEPER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl);
|
||||
|
||||
return esOpClient.performRequestWithRouting(
|
||||
String.valueOf(clusterPhyId),
|
||||
realIndex,
|
||||
dsl,
|
||||
s -> handleListESQueryResponse(s, metricNameList, aggType),
|
||||
ESConstant.DEFAULT_RETRY_TIME
|
||||
);
|
||||
} catch (Exception e){
|
||||
LOGGER.error("class=ZookeeperMetricESDAO||method=listMetricsByClusterPhyId||clusterPhyId={}||errMsg=exception!",
|
||||
clusterPhyId, e
|
||||
);
|
||||
}
|
||||
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
|
||||
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
|
||||
if(null == esAggrMap) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
|
||||
for(String metric : metrics){
|
||||
List<MetricPointVO> metricPoints = new ArrayList<>();
|
||||
|
||||
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
|
||||
try {
|
||||
if (null != esBucket.getUnusedMap().get(KEY)) {
|
||||
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
|
||||
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
|
||||
|
||||
MetricPointVO metricPoint = new MetricPointVO();
|
||||
metricPoint.setAggType(aggType);
|
||||
metricPoint.setTimeStamp(timestamp);
|
||||
metricPoint.setValue(value);
|
||||
metricPoint.setName(metric);
|
||||
|
||||
metricPoints.add(metricPoint);
|
||||
}
|
||||
}catch (Exception e){
|
||||
LOGGER.error("method=handleESQueryResponse||metric={}||errMsg=exception!", metric, e);
|
||||
}
|
||||
} );
|
||||
|
||||
metricMap.put(metric, optimizeMetricPoints(metricPoints));
|
||||
}
|
||||
|
||||
return metricMap;
|
||||
}
|
||||
}
|
||||
@@ -80,4 +80,6 @@ public class DslsConstant {
|
||||
|
||||
public static final String COUNT_GROUP_NOT_METRIC_VALUE = "GroupMetricESDAO/countGroupNotMetricValue";
|
||||
|
||||
/**************************************************** Zookeeper ****************************************************/
|
||||
public static final String GET_ZOOKEEPER_AGG_LIST_METRICS = "ZookeeperMetricESDAO/getAggListZookeeperMetrics";
|
||||
}
|
||||
|
||||
@@ -12,5 +12,7 @@ import javax.management.ObjectName;
|
||||
public interface JmxDAO {
|
||||
Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute);
|
||||
|
||||
Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute);
|
||||
Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute);
|
||||
|
||||
Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig);
|
||||
}
|
||||
|
||||
@@ -19,24 +19,28 @@ public class JmxDAOImpl implements JmxDAO {
|
||||
|
||||
@Override
|
||||
public Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) {
|
||||
return this.getJmxValue(null, null, jmxHost, jmxPort, jmxConfig, objectName, attribute);
|
||||
return this.getJmxValue(null, jmxHost, jmxPort, jmxConfig, objectName, attribute);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) {
|
||||
public Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) {
|
||||
JmxConnectorWrap jmxConnectorWrap = null;
|
||||
try {
|
||||
jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, brokerId, null, jmxHost, jmxPort, jmxConfig);
|
||||
jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, null, null, jmxHost, jmxPort, jmxConfig);
|
||||
if (!jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) {
|
||||
log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed",
|
||||
clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig);
|
||||
log.error(
|
||||
"method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed",
|
||||
clusterPhyId, jmxHost, jmxPort, jmxConfig
|
||||
);
|
||||
return null;
|
||||
}
|
||||
|
||||
return jmxConnectorWrap.getAttribute(objectName, attribute);
|
||||
} catch (Exception e) {
|
||||
log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg={}",
|
||||
clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e);
|
||||
log.error(
|
||||
"method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg=exception!",
|
||||
clusterPhyId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e
|
||||
);
|
||||
} finally {
|
||||
if (jmxConnectorWrap != null) {
|
||||
jmxConnectorWrap.close();
|
||||
@@ -45,4 +49,27 @@ public class JmxDAOImpl implements JmxDAO {
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig) {
|
||||
try {
|
||||
Object object = this.getJmxValue(
|
||||
clusterPhyId,
|
||||
jmxHost,
|
||||
jmxPort,
|
||||
jmxConfig,
|
||||
new ObjectName("java.lang:type=Runtime"),
|
||||
"StartTime"
|
||||
);
|
||||
|
||||
return object == null? null: (Long) object;
|
||||
} catch (Exception e) {
|
||||
log.error(
|
||||
"class=JmxDAOImpl||method=getServerStartTime||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMsg=exception!",
|
||||
clusterPhyId, jmxHost, jmxPort, jmxConfig, e
|
||||
);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
package com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper.ZookeeperInfoPO;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface ZookeeperDAO extends BaseMapper<ZookeeperInfoPO> {
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
{
|
||||
"size": 0,
|
||||
"query": {
|
||||
"bool": {
|
||||
"must": [
|
||||
{
|
||||
"term": {
|
||||
"clusterPhyId": {
|
||||
"value": %d
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"term": {
|
||||
"brokerId": {
|
||||
"value": %d
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"range": {
|
||||
"timestamp": {
|
||||
"gte": %d,
|
||||
"lte": %d
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"aggs": {
|
||||
"hist": {
|
||||
"date_histogram": {
|
||||
"field": "timestamp",
|
||||
"fixed_interval": "%s",
|
||||
"time_zone": "Asia/Shanghai",
|
||||
"min_doc_count": 0
|
||||
},
|
||||
"aggs": {
|
||||
%s
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterZookeepersManager;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 22/09/19
|
||||
*/
|
||||
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "集群ZK-相关接口(REST)")
|
||||
@RestController
|
||||
@RequestMapping(ApiPrefix.API_V3_PREFIX)
|
||||
public class ClusterZookeepersController {
|
||||
@Autowired
|
||||
private ClusterZookeepersManager clusterZookeepersManager;
|
||||
|
||||
@Autowired
|
||||
private ZnodeService znodeService;
|
||||
|
||||
@ApiOperation("集群Zookeeper状态信息")
|
||||
@GetMapping(value = "clusters/{clusterPhyId}/zookeepers-state")
|
||||
public Result<ClusterZookeepersStateVO> getClusterZookeepersState(@PathVariable Long clusterPhyId) {
|
||||
return clusterZookeepersManager.getClusterPhyZookeepersState(clusterPhyId);
|
||||
}
|
||||
|
||||
@ApiOperation("集群Zookeeper信息列表")
|
||||
@PostMapping(value = "clusters/{clusterPhyId}/zookeepers-overview")
|
||||
public PaginationResult<ClusterZookeepersOverviewVO> getClusterZookeepersOverview(@PathVariable Long clusterPhyId,
|
||||
@RequestBody ClusterZookeepersOverviewDTO dto) {
|
||||
return clusterZookeepersManager.getClusterPhyZookeepersOverview(clusterPhyId, dto);
|
||||
}
|
||||
|
||||
@ApiOperation("Zookeeper节点数据")
|
||||
@GetMapping(value = "clusters/{clusterPhyId}/znode-data")
|
||||
public Result<ZnodeVO> getClusterZookeeperData(@PathVariable Long clusterPhyId,
|
||||
@RequestParam String path) {
|
||||
return clusterZookeepersManager.getZnodeVO(clusterPhyId, path);
|
||||
}
|
||||
|
||||
@ApiOperation("Zookeeper节点列表")
|
||||
@GetMapping(value = "clusters/{clusterPhyId}/znode-children")
|
||||
public Result<List<String>> getClusterZookeeperChild(@PathVariable Long clusterPhyId,
|
||||
@RequestParam String path,
|
||||
@RequestParam(required = false) String keyword) {
|
||||
return znodeService.listZnodeChildren(clusterPhyId, path, keyword);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package com.xiaojukeji.know.streaming.km.rest.api.v3.zk;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 22/09/19
|
||||
*/
|
||||
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "ZKMetrics-相关接口(REST)")
|
||||
@RestController
|
||||
@RequestMapping(ApiPrefix.API_V3_PREFIX)
|
||||
public class ZookeeperMetricsController {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperMetricsController.class);
|
||||
|
||||
@Autowired
|
||||
private ZookeeperMetricService zookeeperMetricService;
|
||||
|
||||
@ApiOperation(value = "ZK-最近指标", notes = "")
|
||||
@PostMapping(value = "clusters/{clusterPhyId}/zookeeper-latest-metrics")
|
||||
@ResponseBody
|
||||
public Result<BaseMetrics> getLatestMetrics(@PathVariable Long clusterPhyId, @RequestBody List<String> metricsNames) {
|
||||
Result<ZookeeperMetrics> metricsResult = zookeeperMetricService.batchCollectMetricsFromZookeeper(clusterPhyId, metricsNames);
|
||||
if (metricsResult.failed()) {
|
||||
return Result.buildFromIgnoreData(metricsResult);
|
||||
}
|
||||
|
||||
return Result.buildSuc(metricsResult.getData());
|
||||
}
|
||||
|
||||
@ApiOperation(value = "ZK-多指标历史信息", notes = "多条指标线")
|
||||
@PostMapping(value = "clusters/{clusterPhyId}/zookeeper-metrics")
|
||||
@ResponseBody
|
||||
public Result<List<MetricLineVO>> getMetricsLine(@PathVariable Long clusterPhyId, @RequestBody MetricDTO dto) {
|
||||
return zookeeperMetricService.listMetricsFromES(clusterPhyId, dto);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.xiaojukeji.know.streaming.km.task.metadata;
|
||||
|
||||
import com.didiglobal.logi.job.annotation.Task;
|
||||
import com.didiglobal.logi.job.common.TaskResult;
|
||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
@Task(name = "SyncZookeeperTask",
|
||||
description = "ZK信息同步到DB",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class SyncZookeeperTask extends AbstractAsyncMetadataDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(SyncZookeeperTask.class);
|
||||
|
||||
@Autowired
|
||||
private ZookeeperService zookeeperService;
|
||||
|
||||
@Override
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
|
||||
Result<List<ZookeeperInfo>> infoResult = zookeeperService.listFromZookeeper(
|
||||
clusterPhy.getId(),
|
||||
clusterPhy.getZookeeper(),
|
||||
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class)
|
||||
);
|
||||
|
||||
if (infoResult.failed()) {
|
||||
return new TaskResult(TaskResult.FAIL_CODE, infoResult.getMessage());
|
||||
}
|
||||
|
||||
zookeeperService.batchReplaceDataInDB(clusterPhy.getId(), infoResult.getData());
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.xiaojukeji.know.streaming.km.task.metrics;
|
||||
|
||||
import com.didiglobal.logi.job.annotation.Task;
|
||||
import com.didiglobal.logi.job.common.TaskResult;
|
||||
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.collector.metric.ZookeeperMetricCollector;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* @author didi
|
||||
*/
|
||||
@Task(name = "ZookeeperMetricCollectorTask",
|
||||
description = "Zookeeper指标采集任务",
|
||||
cron = "0 0/1 * * * ? *",
|
||||
autoRegister = true,
|
||||
consensual = ConsensualEnum.BROADCAST,
|
||||
timeout = 2 * 60)
|
||||
public class ZookeeperMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
|
||||
private static final ILog log = LogFactory.getLog(ZookeeperMetricCollectorTask.class);
|
||||
|
||||
@Autowired
|
||||
private ZookeeperMetricCollector zookeeperMetricCollector;
|
||||
|
||||
@Override
|
||||
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
|
||||
zookeeperMetricCollector.collectMetrics(clusterPhy);
|
||||
|
||||
return TaskResult.SUCCESS;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user