diff --git a/docs/install_guide/版本升级手册.md b/docs/install_guide/版本升级手册.md index a75f71fd..efc01399 100644 --- a/docs/install_guide/版本升级手册.md +++ b/docs/install_guide/版本升级手册.md @@ -7,7 +7,22 @@ ### 6.2.0、升级至 `master` 版本 -暂无 +```sql +DROP TABLE IF EXISTS `ks_km_zookeeper`; +CREATE TABLE `ks_km_zookeeper` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '物理集群ID', + `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper主机名', + `port` int(16) NOT NULL DEFAULT '-1' COMMENT 'zookeeper端口', + `role` int(16) NOT NULL DEFAULT '-1' COMMENT '角色, leader follower observer', + `version` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper版本', + `status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活,11存活但是4字命令使用不了', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_cluster_phy_id_host_port` (`cluster_phy_id`,`host`, `port`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Zookeeper信息表'; +``` ### 6.2.1、升级至 `v3.0.0` 版本 diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterZookeepersManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterZookeepersManager.java new file mode 100644 index 00000000..8219cd7e --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterZookeepersManager.java @@ -0,0 +1,19 @@ +package com.xiaojukeji.know.streaming.km.biz.cluster; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO; + +/** + * 多集群总体状态 + */ +public interface ClusterZookeepersManager { + Result getClusterPhyZookeepersState(Long clusterPhyId); + + PaginationResult getClusterPhyZookeepersOverview(Long clusterPhyId, ClusterZookeepersOverviewDTO dto); + + Result getZnodeVO(Long clusterPhyId, String path); +} diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java new file mode 100644 index 00000000..b285cac9 --- /dev/null +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java @@ -0,0 +1,137 @@ +package com.xiaojukeji.know.streaming.km.biz.cluster.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterZookeepersManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO; +import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + + +@Service +public class ClusterZookeepersManagerImpl implements ClusterZookeepersManager { + private static final ILog LOGGER = LogFactory.getLog(ClusterZookeepersManagerImpl.class); + + @Autowired + private ClusterPhyService clusterPhyService; + + @Autowired + private ZookeeperService zookeeperService; + + @Autowired + private ZookeeperMetricService zookeeperMetricService; + + @Autowired + private ZnodeService znodeService; + + @Override + public Result getClusterPhyZookeepersState(Long clusterPhyId) { + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId); + if (clusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); + } + +// // TODO +// private Integer healthState; +// private Integer healthCheckPassed; +// private Integer healthCheckTotal; + + List infoList = zookeeperService.listFromDBByCluster(clusterPhyId); + + ClusterZookeepersStateVO vo = new ClusterZookeepersStateVO(); + vo.setTotalServerCount(infoList.size()); + vo.setAliveFollowerCount(0); + vo.setTotalFollowerCount(0); + vo.setAliveObserverCount(0); + vo.setTotalObserverCount(0); + vo.setAliveServerCount(0); + for (ZookeeperInfo info: infoList) { + if (info.getRole().equals(ZKRoleEnum.LEADER.getRole())) { + vo.setLeaderNode(info.getHost()); + } + + if (info.getRole().equals(ZKRoleEnum.FOLLOWER.getRole())) { + vo.setTotalFollowerCount(vo.getTotalFollowerCount() + 1); + vo.setAliveFollowerCount(info.alive()? vo.getAliveFollowerCount() + 1: vo.getAliveFollowerCount()); + } + + if (info.getRole().equals(ZKRoleEnum.OBSERVER.getRole())) { + vo.setTotalObserverCount(vo.getTotalObserverCount() + 1); + vo.setAliveObserverCount(info.alive()? vo.getAliveObserverCount() + 1: vo.getAliveObserverCount()); + } + + if (info.alive()) { + vo.setAliveServerCount(vo.getAliveServerCount() + 1); + } + } + + Result metricsResult = zookeeperMetricService.collectMetricsFromZookeeper(new ZookeeperMetricParam( + clusterPhyId, + infoList.stream().filter(elem -> elem.alive()).map(item -> new Tuple(item.getHost(), item.getPort())).collect(Collectors.toList()), + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class), + ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT + )); + if (metricsResult.failed()) { + LOGGER.error( + "class=ClusterZookeepersManagerImpl||method=getClusterPhyZookeepersState||clusterPhyId={}||errMsg={}", + clusterPhyId, metricsResult.getMessage() + ); + return Result.buildSuc(vo); + } + Float watchCount = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT); + vo.setWatchCount(watchCount != null? watchCount.intValue(): null); + + return Result.buildSuc(vo); + } + + @Override + public PaginationResult getClusterPhyZookeepersOverview(Long clusterPhyId, ClusterZookeepersOverviewDTO dto) { + //获取集群zookeeper列表 + List clusterZookeepersOverviewVOList = ConvertUtil.list2List(zookeeperService.listFromDBByCluster(clusterPhyId), ClusterZookeepersOverviewVO.class); + + //搜索 + clusterZookeepersOverviewVOList = PaginationUtil.pageByFuzzyFilter(clusterZookeepersOverviewVOList, dto.getSearchKeywords(), Arrays.asList("host")); + + //分页 + PaginationResult paginationResult = PaginationUtil.pageBySubData(clusterZookeepersOverviewVOList, dto); + + return paginationResult; + } + + @Override + public Result getZnodeVO(Long clusterPhyId, String path) { + Result result = znodeService.getZnode(clusterPhyId, path); + if (result.failed()) { + return Result.buildFromIgnoreData(result); + } + return Result.buildSuc(ConvertUtil.obj2ObjByJSON(result.getData(), ZnodeVO.class)); + } + + /**************************************************** private method ****************************************************/ + +} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ZookeeperMetricCollector.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ZookeeperMetricCollector.java new file mode 100644 index 00000000..37f86d4e --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/ZookeeperMetricCollector.java @@ -0,0 +1,122 @@ +package com.xiaojukeji.know.streaming.km.collector.metric; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ZookeeperMetricEvent; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO; +import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; +import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_ZOOKEEPER; + +/** + * @author didi + */ +@Component +public class ZookeeperMetricCollector extends AbstractMetricCollector { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @Autowired + private VersionControlService versionControlService; + + @Autowired + private ZookeeperMetricService zookeeperMetricService; + + @Autowired + private ZookeeperService zookeeperService; + + @Autowired + private KafkaControllerService kafkaControllerService; + + @Override + public void collectMetrics(ClusterPhy clusterPhy) { + Long startTime = System.currentTimeMillis(); + Long clusterPhyId = clusterPhy.getId(); + List items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode()); + List aliveZKList = zookeeperService.listFromDBByCluster(clusterPhyId) + .stream() + .filter(elem -> Constant.ALIVE.equals(elem.getStatus())) + .collect(Collectors.toList()); + KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId); + + ZookeeperMetrics metrics = ZookeeperMetrics.initWithMetric(clusterPhyId, Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (float)Constant.INVALID_CODE); + if (ValidateUtils.isEmptyList(aliveZKList)) { + // 没有存活的ZK时,发布事件,然后直接返回 + publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics))); + return; + } + + // 构造参数 + ZookeeperMetricParam param = new ZookeeperMetricParam( + clusterPhyId, + aliveZKList.stream().map(elem -> new Tuple(elem.getHost(), elem.getPort())).collect(Collectors.toList()), + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class), + kafkaController == null? Constant.INVALID_CODE: kafkaController.getBrokerId(), + null + ); + + for(VersionControlItem v : items) { + try { + if(null != metrics.getMetrics().get(v.getName())) { + continue; + } + param.setMetricName(v.getName()); + + Result ret = zookeeperMetricService.collectMetricsFromZookeeper(param); + if(null == ret || ret.failed() || null == ret.getData()){ + continue; + } + + metrics.putMetric(ret.getData().getMetrics()); + + if(!EnvUtil.isOnline()){ + LOGGER.info( + "class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||metricValue={}", + clusterPhyId, v.getName(), ConvertUtil.obj2Json(ret.getData().getMetrics()) + ); + } + } catch (Exception e){ + LOGGER.error( + "class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||errMsg=exception!", + clusterPhyId, v.getName(), e + ); + } + } + + metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f); + + publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics))); + + LOGGER.info( + "class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||startTime={}||costTime={}||msg=msg=collect finished.", + clusterPhyId, startTime, System.currentTimeMillis() - startTime + ); + } + + @Override + public VersionItemTypeEnum collectorType() { + return METRIC_ZOOKEEPER; + } +} diff --git a/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java new file mode 100644 index 00000000..4f9dad53 --- /dev/null +++ b/km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/sink/ZookeeperMetricESSender.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.collector.sink; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ZookeeperMetricEvent; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX; + +@Component +public class ZookeeperMetricESSender extends AbstractMetricESSender implements ApplicationListener { + protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER"); + + @PostConstruct + public void init(){ + LOGGER.info("class=ZookeeperMetricESSender||method=init||msg=init finished"); + } + + @Override + public void onApplicationEvent(ZookeeperMetricEvent event) { + send2es(ZOOKEEPER_INDEX, ConvertUtil.list2List(event.getZookeeperMetrics(), ZookeeperMetricPO.class)); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterZookeepersOverviewDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterZookeepersOverviewDTO.java new file mode 100644 index 00000000..2b3a6e9d --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterZookeepersOverviewDTO.java @@ -0,0 +1,13 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.cluster; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; +import lombok.Data; + +/** + * @author wyc + * @date 2022/9/23 + */ +@Data +public class ClusterZookeepersOverviewDTO extends PaginationBaseDTO { + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/ZKConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/ZKConfig.java index 39e6fdf5..66a727e5 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/ZKConfig.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/ZKConfig.java @@ -1,8 +1,8 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.config; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.Data; import java.io.Serializable; import java.util.Properties; @@ -11,7 +11,6 @@ import java.util.Properties; * @author zengqiao * @date 22/02/24 */ -@Data @ApiModel(description = "ZK配置") public class ZKConfig implements Serializable { @ApiModelProperty(value="ZK的jmx配置") @@ -21,11 +20,51 @@ public class ZKConfig implements Serializable { private Boolean openSecure = false; @ApiModelProperty(value="ZK的Session超时时间", example = "15000") - private Long sessionTimeoutUnitMs = 15000L; + private Integer sessionTimeoutUnitMs = 15000; @ApiModelProperty(value="ZK的Request超时时间", example = "5000") - private Long requestTimeoutUnitMs = 5000L; + private Integer requestTimeoutUnitMs = 5000; @ApiModelProperty(value="ZK的Request超时时间") private Properties otherProps = new Properties(); + + public JmxConfig getJmxConfig() { + return jmxConfig == null? new JmxConfig(): jmxConfig; + } + + public void setJmxConfig(JmxConfig jmxConfig) { + this.jmxConfig = jmxConfig; + } + + public Boolean getOpenSecure() { + return openSecure != null && openSecure; + } + + public void setOpenSecure(Boolean openSecure) { + this.openSecure = openSecure; + } + + public Integer getSessionTimeoutUnitMs() { + return sessionTimeoutUnitMs == null? Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS: sessionTimeoutUnitMs; + } + + public void setSessionTimeoutUnitMs(Integer sessionTimeoutUnitMs) { + this.sessionTimeoutUnitMs = sessionTimeoutUnitMs; + } + + public Integer getRequestTimeoutUnitMs() { + return requestTimeoutUnitMs == null? Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS: requestTimeoutUnitMs; + } + + public void setRequestTimeoutUnitMs(Integer requestTimeoutUnitMs) { + this.requestTimeoutUnitMs = requestTimeoutUnitMs; + } + + public Properties getOtherProps() { + return otherProps == null? new Properties() : otherProps; + } + + public void setOtherProps(Properties otherProps) { + this.otherProps = otherProps; + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/ZookeeperMetrics.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/ZookeeperMetrics.java new file mode 100644 index 00000000..823125b5 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/metrics/ZookeeperMetrics.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics; + +import lombok.Data; +import lombok.ToString; + +/** + * @author zengqiao + * @date 20/6/17 + */ +@Data +@ToString +public class ZookeeperMetrics extends BaseMetrics { + public ZookeeperMetrics(Long clusterPhyId) { + super(clusterPhyId); + } + + public static ZookeeperMetrics initWithMetric(Long clusterPhyId, String metric, Float value) { + ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId); + metrics.setClusterPhyId( clusterPhyId ); + metrics.putMetric(metric, value); + return metrics; + } + + @Override + public String unique() { + return "ZK@" + clusterPhyId; + } +} \ No newline at end of file diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/ZookeeperMetricParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/ZookeeperMetricParam.java new file mode 100644 index 00000000..ef2b09c8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/metric/ZookeeperMetricParam.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author didi + */ +@Data +@NoArgsConstructor +public class ZookeeperMetricParam extends MetricParam { + private Long clusterPhyId; + + private List> zkAddressList; + + private ZKConfig zkConfig; + + private String metricName; + + private Integer kafkaControllerId; + + public ZookeeperMetricParam(Long clusterPhyId, + List> zkAddressList, + ZKConfig zkConfig, + String metricName) { + this.clusterPhyId = clusterPhyId; + this.zkAddressList = zkAddressList; + this.zkConfig = zkConfig; + this.metricName = metricName; + } + + public ZookeeperMetricParam(Long clusterPhyId, + List> zkAddressList, + ZKConfig zkConfig, + Integer kafkaControllerId, + String metricName) { + this.clusterPhyId = clusterPhyId; + this.zkAddressList = zkAddressList; + this.zkConfig = zkConfig; + this.kafkaControllerId = kafkaControllerId; + this.metricName = metricName; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/result/ResultStatus.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/result/ResultStatus.java index 842e1106..252146c9 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/result/ResultStatus.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/result/ResultStatus.java @@ -56,6 +56,7 @@ public enum ResultStatus { KAFKA_OPERATE_FAILED(8010, "Kafka操作失败"), MYSQL_OPERATE_FAILED(8020, "MySQL操作失败"), ZK_OPERATE_FAILED(8030, "ZK操作失败"), + ZK_FOUR_LETTER_CMD_FORBIDDEN(8031, "ZK四字命令被禁止"), ES_OPERATE_ERROR(8040, "ES操作失败"), HTTP_REQ_ERROR(8050, "第三方http请求异常"), diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/version/VersionMetricControlItem.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/version/VersionMetricControlItem.java index c7409104..5c3f6506 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/version/VersionMetricControlItem.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/version/VersionMetricControlItem.java @@ -23,6 +23,8 @@ public class VersionMetricControlItem extends VersionControlItem{ public static final String CATEGORY_PERFORMANCE = "Performance"; public static final String CATEGORY_FLOW = "Flow"; + public static final String CATEGORY_CLIENT = "Client"; + /** * 指标单位名称,非指标的没有 */ diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/Znode.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/Znode.java new file mode 100644 index 00000000..0bcb56d2 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/Znode.java @@ -0,0 +1,19 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper; + + +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import org.apache.zookeeper.data.Stat; + +@Data +public class Znode { + @ApiModelProperty(value = "节点名称", example = "broker") + private String name; + + @ApiModelProperty(value = "节点数据", example = "saassad") + private String data; + + @ApiModelProperty(value = "节点属性", example = "") + private Stat stat; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/ZookeeperInfo.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/ZookeeperInfo.java new file mode 100644 index 00000000..e943952e --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/ZookeeperInfo.java @@ -0,0 +1,42 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.BaseEntity; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import lombok.Data; + +@Data +public class ZookeeperInfo extends BaseEntity { + /** + * 集群Id + */ + private Long clusterPhyId; + + /** + * 主机 + */ + private String host; + + /** + * 端口 + */ + private Integer port; + + /** + * 角色 + */ + private String role; + + /** + * 版本 + */ + private String version; + + /** + * ZK状态 + */ + private Integer status; + + public boolean alive() { + return !(Constant.DOWN.equals(status)); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/BaseFourLetterWordCmdData.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/BaseFourLetterWordCmdData.java new file mode 100644 index 00000000..3e5713a8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/BaseFourLetterWordCmdData.java @@ -0,0 +1,9 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword; + +import java.io.Serializable; + +/** + * 四字命令结果数据的基础类 + */ +public class BaseFourLetterWordCmdData implements Serializable { +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ConfigCmdData.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ConfigCmdData.java new file mode 100644 index 00000000..d0982f47 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ConfigCmdData.java @@ -0,0 +1,38 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword; + +import lombok.Data; + + +/** + * clientPort=2183 + * dataDir=/data1/data/zkData2/version-2 + * dataLogDir=/data1/data/zkLog2/version-2 + * tickTime=2000 + * maxClientCnxns=60 + * minSessionTimeout=4000 + * maxSessionTimeout=40000 + * serverId=2 + * initLimit=15 + * syncLimit=10 + * electionAlg=3 + * electionPort=4445 + * quorumPort=4444 + * peerType=0 + */ +@Data +public class ConfigCmdData extends BaseFourLetterWordCmdData { + private Long clientPort; + private String dataDir; + private String dataLogDir; + private Long tickTime; + private Long maxClientCnxns; + private Long minSessionTimeout; + private Long maxSessionTimeout; + private Integer serverId; + private String initLimit; + private Long syncLimit; + private Long electionAlg; + private Long electionPort; + private Long quorumPort; + private Long peerType; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/MonitorCmdData.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/MonitorCmdData.java new file mode 100644 index 00000000..7ea1339b --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/MonitorCmdData.java @@ -0,0 +1,39 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword; + +import lombok.Data; + +/** + * zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT + * zk_avg_latency 0 + * zk_max_latency 399 + * zk_min_latency 0 + * zk_packets_received 234857 + * zk_packets_sent 234860 + * zk_num_alive_connections 4 + * zk_outstanding_requests 0 + * zk_server_state follower + * zk_znode_count 35566 + * zk_watch_count 39 + * zk_ephemerals_count 10 + * zk_approximate_data_size 3356708 + * zk_open_file_descriptor_count 35 + * zk_max_file_descriptor_count 819200 + */ +@Data +public class MonitorCmdData extends BaseFourLetterWordCmdData { + private String zkVersion; + private Long zkAvgLatency; + private Long zkMaxLatency; + private Long zkMinLatency; + private Long zkPacketsReceived; + private Long zkPacketsSent; + private Long zkNumAliveConnections; + private Long zkOutstandingRequests; + private String zkServerState; + private Long zkZnodeCount; + private Long zkWatchCount; + private Long zkEphemeralsCount; + private Long zkApproximateDataSize; + private Long zkOpenFileDescriptorCount; + private Long zkMaxFileDescriptorCount; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ServerCmdData.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ServerCmdData.java new file mode 100644 index 00000000..38bd2cf9 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/ServerCmdData.java @@ -0,0 +1,30 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword; + +import lombok.Data; + +/** + * Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT + * Latency min/avg/max: 0/0/2209 + * Received: 278202469 + * Sent: 279449055 + * Connections: 31 + * Outstanding: 0 + * Zxid: 0x20033fc12 + * Mode: leader + * Node count: 10084 + * Proposal sizes last/min/max: 36/32/31260 leader特有 + */ +@Data +public class ServerCmdData extends BaseFourLetterWordCmdData { + private String zkVersion; + private Long zkAvgLatency; + private Long zkMaxLatency; + private Long zkMinLatency; + private Long zkPacketsReceived; + private Long zkPacketsSent; + private Long zkNumAliveConnections; + private Long zkOutstandingRequests; + private String zkServerState; + private Long zkZnodeCount; + private Long zkZxid; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ConfigCmdDataParser.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ConfigCmdDataParser.java new file mode 100644 index 00000000..35ec153b --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ConfigCmdDataParser.java @@ -0,0 +1,116 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ConfigCmdData; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import lombok.Data; + +import java.util.HashMap; +import java.util.Map; + +/** + * clientPort=2183 + * dataDir=/data1/data/zkData2/version-2 + * dataLogDir=/data1/data/zkLog2/version-2 + * tickTime=2000 + * maxClientCnxns=60 + * minSessionTimeout=4000 + * maxSessionTimeout=40000 + * serverId=2 + * initLimit=15 + * syncLimit=10 + * electionAlg=3 + * electionPort=4445 + * quorumPort=4444 + * peerType=0 + */ +@Data +public class ConfigCmdDataParser implements FourLetterWordDataParser { + private static final ILog LOGGER = LogFactory.getLog(ConfigCmdDataParser.class); + + private Result dataResult = null; + + @Override + public String getCmd() { + return FourLetterWordUtil.ConfigCmd; + } + + @Override + public ConfigCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) { + Map dataMap = new HashMap<>(); + for (String elem : cmdData.split("\n")) { + if (elem.isEmpty()) { + continue; + } + + int idx = elem.indexOf('='); + if (idx >= 0) { + dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim()); + } + } + + ConfigCmdData configCmdData = new ConfigCmdData(); + dataMap.entrySet().stream().forEach(elem -> { + try { + switch (elem.getKey()) { + case "clientPort": + configCmdData.setClientPort(Long.valueOf(elem.getValue())); + break; + case "dataDir": + configCmdData.setDataDir(elem.getValue()); + break; + case "dataLogDir": + configCmdData.setDataLogDir(elem.getValue()); + break; + case "tickTime": + configCmdData.setTickTime(Long.valueOf(elem.getValue())); + break; + case "maxClientCnxns": + configCmdData.setMaxClientCnxns(Long.valueOf(elem.getValue())); + break; + case "minSessionTimeout": + configCmdData.setMinSessionTimeout(Long.valueOf(elem.getValue())); + break; + case "maxSessionTimeout": + configCmdData.setMaxSessionTimeout(Long.valueOf(elem.getValue())); + break; + case "serverId": + configCmdData.setServerId(Integer.valueOf(elem.getValue())); + break; + case "initLimit": + configCmdData.setInitLimit(elem.getValue()); + break; + case "syncLimit": + configCmdData.setSyncLimit(Long.valueOf(elem.getValue())); + break; + case "electionAlg": + configCmdData.setElectionAlg(Long.valueOf(elem.getValue())); + break; + case "electionPort": + configCmdData.setElectionPort(Long.valueOf(elem.getValue())); + break; + case "quorumPort": + configCmdData.setQuorumPort(Long.valueOf(elem.getValue())); + break; + case "peerType": + configCmdData.setPeerType(Long.valueOf(elem.getValue())); + break; + default: + LOGGER.warn( + "class=ConfigCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!", + elem.getKey(), elem.getValue() + ); + } + } catch (Exception e) { + LOGGER.error( + "class=ConfigCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!", + clusterPhyId, host, port, elem.getKey(), elem.getValue(), e + ); + } + }); + + return configCmdData; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/FourLetterWordDataParser.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/FourLetterWordDataParser.java new file mode 100644 index 00000000..58bb2368 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/FourLetterWordDataParser.java @@ -0,0 +1,10 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser; + +/** + * 四字命令结果解析类 + */ +public interface FourLetterWordDataParser { + String getCmd(); + + T parseAndInitData(Long clusterPhyId, String host, int port, String cmdData); +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/MonitorCmdDataParser.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/MonitorCmdDataParser.java new file mode 100644 index 00000000..a33f4da3 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/MonitorCmdDataParser.java @@ -0,0 +1,117 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import lombok.Data; + +import java.util.HashMap; +import java.util.Map; + +/** + * zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT + * zk_avg_latency 0 + * zk_max_latency 399 + * zk_min_latency 0 + * zk_packets_received 234857 + * zk_packets_sent 234860 + * zk_num_alive_connections 4 + * zk_outstanding_requests 0 + * zk_server_state follower + * zk_znode_count 35566 + * zk_watch_count 39 + * zk_ephemerals_count 10 + * zk_approximate_data_size 3356708 + * zk_open_file_descriptor_count 35 + * zk_max_file_descriptor_count 819200 + */ +@Data +public class MonitorCmdDataParser implements FourLetterWordDataParser { + private static final ILog LOGGER = LogFactory.getLog(MonitorCmdDataParser.class); + + @Override + public String getCmd() { + return FourLetterWordUtil.MonitorCmd; + } + + @Override + public MonitorCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) { + Map dataMap = new HashMap<>(); + for (String elem : cmdData.split("\n")) { + if (elem.isEmpty()) { + continue; + } + + int idx = elem.indexOf('\t'); + if (idx >= 0) { + dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim()); + } + } + + MonitorCmdData monitorCmdData = new MonitorCmdData(); + dataMap.entrySet().stream().forEach(elem -> { + try { + switch (elem.getKey()) { + case "zk_version": + monitorCmdData.setZkVersion(elem.getValue().split("-")[0]); + break; + case "zk_avg_latency": + monitorCmdData.setZkAvgLatency(Long.valueOf(elem.getValue())); + break; + case "zk_max_latency": + monitorCmdData.setZkMaxLatency(Long.valueOf(elem.getValue())); + break; + case "zk_min_latency": + monitorCmdData.setZkMinLatency(Long.valueOf(elem.getValue())); + break; + case "zk_packets_received": + monitorCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue())); + break; + case "zk_packets_sent": + monitorCmdData.setZkPacketsSent(Long.valueOf(elem.getValue())); + break; + case "zk_num_alive_connections": + monitorCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue())); + break; + case "zk_outstanding_requests": + monitorCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue())); + break; + case "zk_server_state": + monitorCmdData.setZkServerState(elem.getValue()); + break; + case "zk_znode_count": + monitorCmdData.setZkZnodeCount(Long.valueOf(elem.getValue())); + break; + case "zk_watch_count": + monitorCmdData.setZkWatchCount(Long.valueOf(elem.getValue())); + break; + case "zk_ephemerals_count": + monitorCmdData.setZkEphemeralsCount(Long.valueOf(elem.getValue())); + break; + case "zk_approximate_data_size": + monitorCmdData.setZkApproximateDataSize(Long.valueOf(elem.getValue())); + break; + case "zk_open_file_descriptor_count": + monitorCmdData.setZkOpenFileDescriptorCount(Long.valueOf(elem.getValue())); + break; + case "zk_max_file_descriptor_count": + monitorCmdData.setZkMaxFileDescriptorCount(Long.valueOf(elem.getValue())); + break; + default: + LOGGER.warn( + "class=MonitorCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!", + elem.getKey(), elem.getValue() + ); + } + } catch (Exception e) { + LOGGER.error( + "class=MonitorCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!", + clusterPhyId, host, port, elem.getKey(), elem.getValue(), e + ); + } + }); + + return monitorCmdData; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ServerCmdDataParser.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ServerCmdDataParser.java new file mode 100644 index 00000000..f91f19a8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/zookeeper/fourletterword/parser/ServerCmdDataParser.java @@ -0,0 +1,97 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import lombok.Data; + +import java.util.HashMap; +import java.util.Map; + +/** + * Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT + * Latency min/avg/max: 0/0/2209 + * Received: 278202469 + * Sent: 279449055 + * Connections: 31 + * Outstanding: 0 + * Zxid: 0x20033fc12 + * Mode: leader + * Node count: 10084 + * Proposal sizes last/min/max: 36/32/31260 leader特有 + */ +@Data +public class ServerCmdDataParser implements FourLetterWordDataParser { + private static final ILog LOGGER = LogFactory.getLog(ServerCmdDataParser.class); + + @Override + public String getCmd() { + return FourLetterWordUtil.ServerCmd; + } + + @Override + public ServerCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) { + Map dataMap = new HashMap<>(); + for (String elem : cmdData.split("\n")) { + if (elem.isEmpty()) { + continue; + } + + int idx = elem.indexOf(':'); + if (idx >= 0) { + dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim()); + } + } + + ServerCmdData serverCmdData = new ServerCmdData(); + dataMap.entrySet().stream().forEach(elem -> { + try { + switch (elem.getKey()) { + case "Zookeeper version": + serverCmdData.setZkVersion(elem.getValue().split("-")[0]); + break; + case "Latency min/avg/max": + String[] data = elem.getValue().split("/"); + serverCmdData.setZkMinLatency(Long.valueOf(data[0])); + serverCmdData.setZkAvgLatency(Long.valueOf(data[1])); + serverCmdData.setZkMaxLatency(Long.valueOf(data[2])); + break; + case "Received": + serverCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue())); + break; + case "Sent": + serverCmdData.setZkPacketsSent(Long.valueOf(elem.getValue())); + break; + case "Connections": + serverCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue())); + break; + case "Outstanding": + serverCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue())); + break; + case "Mode": + serverCmdData.setZkServerState(elem.getValue()); + break; + case "Node count": + serverCmdData.setZkZnodeCount(Long.valueOf(elem.getValue())); + break; + case "Zxid": + serverCmdData.setZkZxid(Long.parseUnsignedLong(elem.getValue().trim().substring(2), 16)); + break; + default: + LOGGER.warn( + "class=ServerCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!", + elem.getKey(), elem.getValue() + ); + } + } catch (Exception e) { + LOGGER.error( + "class=ServerCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!", + clusterPhyId, host, port, elem.getKey(), elem.getValue(), e + ); + } + }); + + return serverCmdData; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/BaseMetricEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/BaseMetricEvent.java index df1fe834..cfe5995a 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/BaseMetricEvent.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/BaseMetricEvent.java @@ -8,8 +8,6 @@ import org.springframework.context.ApplicationEvent; */ @Getter public class BaseMetricEvent extends ApplicationEvent { - - public BaseMetricEvent(Object source) { super( source ); } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ZookeeperMetricEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ZookeeperMetricEvent.java new file mode 100644 index 00000000..19279d53 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/metric/ZookeeperMetricEvent.java @@ -0,0 +1,20 @@ +package com.xiaojukeji.know.streaming.km.common.bean.event.metric; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import lombok.Getter; + +import java.util.List; + +/** + * @author didi + */ +@Getter +public class ZookeeperMetricEvent extends BaseMetricEvent { + + private List zookeeperMetrics; + + public ZookeeperMetricEvent(Object source, List zookeeperMetrics) { + super( source ); + this.zookeeperMetrics = zookeeperMetrics; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/ZookeeperMetricPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/ZookeeperMetricPO.java new file mode 100644 index 00000000..96921739 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/metrice/ZookeeperMetricPO.java @@ -0,0 +1,24 @@ +package com.xiaojukeji.know.streaming.km.common.bean.po.metrice; + +import lombok.Data; +import lombok.NoArgsConstructor; + +import static com.xiaojukeji.know.streaming.km.common.utils.CommonUtils.monitorTimestamp2min; + +@Data +@NoArgsConstructor +public class ZookeeperMetricPO extends BaseMetricESPO { + public ZookeeperMetricPO(Long clusterPhyId){ + super(clusterPhyId); + } + + @Override + public String getKey() { + return "ZK@" + clusterPhyId + "@" + monitorTimestamp2min(timestamp); + } + + @Override + public String getRoutingValue() { + return String.valueOf(clusterPhyId); + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/zookeeper/ZookeeperInfoPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/zookeeper/ZookeeperInfoPO.java new file mode 100644 index 00000000..69968ef6 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/zookeeper/ZookeeperInfoPO.java @@ -0,0 +1,40 @@ +package com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import lombok.Data; + +@Data +@TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "zookeeper") +public class ZookeeperInfoPO extends BasePO { + /** + * 集群Id + */ + private Long clusterPhyId; + + /** + * 主机 + */ + private String host; + + /** + * 端口 + */ + private Integer port; + + /** + * 角色 + */ + private String role; + + /** + * 版本 + */ + private String version; + + /** + * ZK状态 + */ + private Integer status; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/line/MetricMultiLinesVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/line/MetricMultiLinesVO.java index a3874292..917769d2 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/line/MetricMultiLinesVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/metrics/line/MetricMultiLinesVO.java @@ -1,16 +1,12 @@ package com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line; -import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; /** * @author didi @@ -26,19 +22,4 @@ public class MetricMultiLinesVO { @ApiModelProperty(value = "指标名称对应的指标线") private List metricLines; - - public List getMetricPoints(String resName) { - if (ValidateUtils.isNull(metricLines)) { - return new ArrayList<>(); - } - - List voList = metricLines.stream().filter(elem -> elem.getName().equals(resName)).collect(Collectors.toList()); - if (ValidateUtils.isEmptyList(voList)) { - return new ArrayList<>(); - } - - // 仅获取idx=0的指标 - return voList.get(0).getMetricPoints(); - } - } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java new file mode 100644 index 00000000..960b5d5a --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersOverviewVO.java @@ -0,0 +1,26 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author wyc + * @date 2022/9/23 + */ +@Data +@ApiModel(description = "Zookeeper信息概览") +public class ClusterZookeepersOverviewVO { + @ApiModelProperty(value = "主机ip", example = "121.0.0.1") + private String host; + + @ApiModelProperty(value = "端口号", example = "2416") + private Integer port; + + @ApiModelProperty(value = "版本", example = "1.1.2") + private String version; + + @ApiModelProperty(value = "角色", example = "Leader") + private String role; + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersStateVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersStateVO.java new file mode 100644 index 00000000..ceb2041f --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ClusterZookeepersStateVO.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + + +/** + * @author wyc + * @date 2022/9/23 + */ +@Data +@ApiModel(description = "ZK状态信息") +public class ClusterZookeepersStateVO { + @ApiModelProperty(value = "健康检查状态", example = "1") + private Integer healthState; + + @ApiModelProperty(value = "健康检查通过数", example = "1") + private Integer healthCheckPassed; + + @ApiModelProperty(value = "健康检查总数", example = "1") + private Integer healthCheckTotal; + + @ApiModelProperty(value = "ZK的Leader机器", example = "127.0.0.1") + private String leaderNode; + + @ApiModelProperty(value = "Watch数", example = "123456") + private Integer watchCount; + + @ApiModelProperty(value = "节点存活数", example = "8") + private Integer aliveServerCount; + + @ApiModelProperty(value = "总节点数", example = "10") + private Integer totalServerCount; + + @ApiModelProperty(value = "Follower角色存活数", example = "8") + private Integer aliveFollowerCount; + + @ApiModelProperty(value = "Follower角色总数", example = "10") + private Integer totalFollowerCount; + + @ApiModelProperty(value = "Observer角色存活数", example = "3") + private Integer aliveObserverCount; + + @ApiModelProperty(value = "Observer角色总数", example = "3") + private Integer totalObserverCount; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeStatVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeStatVO.java new file mode 100644 index 00000000..c5cd0aa9 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeStatVO.java @@ -0,0 +1,44 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author wyc + * @date 2022/9/23 + */ +@Data +public class ZnodeStatVO { + @ApiModelProperty(value = "节点被创建时的事物的ID", example = "0x1f09") + private Long czxid; + + @ApiModelProperty(value = "创建时间", example = "Sat Mar 16 15:38:34 CST 2019") + private Long ctime; + + @ApiModelProperty(value = "节点最后一次被修改时的事物的ID", example = "0x1f09") + private Long mzxid; + + @ApiModelProperty(value = "最后一次修改时间", example = "Sat Mar 16 15:38:34 CST 2019") + private Long mtime; + + @ApiModelProperty(value = "子节点列表最近一次呗修改的事物ID", example = "0x31") + private Long pzxid; + + @ApiModelProperty(value = "子节点版本号", example = "0") + private Integer cversion; + + @ApiModelProperty(value = "数据版本号", example = "0") + private Integer version; + + @ApiModelProperty(value = "ACL版本号", example = "0") + private Integer aversion; + + @ApiModelProperty(value = "创建临时节点的事物ID,持久节点事物为0", example = "0") + private Long ephemeralOwner; + + @ApiModelProperty(value = "数据长度,每个节点都可保存数据", example = "22") + private Integer dataLength; + + @ApiModelProperty(value = "子节点的个数", example = "6") + private Integer numChildren; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeVO.java new file mode 100644 index 00000000..b00a5ff7 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/zookeeper/ZnodeVO.java @@ -0,0 +1,22 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author wyc + * @date 2022/9/23 + */ +@Data +public class ZnodeVO { + + @ApiModelProperty(value = "节点名称", example = "broker") + private String name; + + @ApiModelProperty(value = "节点数据", example = "saassad") + private String data; + + @ApiModelProperty(value = "节点属性", example = "") + private ZnodeStatVO stat; + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index edd897ff..a91f0809 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -23,8 +23,8 @@ public class Constant { public static final Integer YES = 1; public static final Integer NO = 0; - public static final Integer ALIVE = 1; - public static final Integer DOWN = 0; + public static final Integer ALIVE = 1; + public static final Integer DOWN = 0; public static final Integer ONE_HUNDRED = 100; @@ -33,6 +33,7 @@ public class Constant { public static final Long B_TO_MB = 1024L * 1024L; public static final Integer DEFAULT_SESSION_TIMEOUT_UNIT_MS = 15000; + public static final Integer DEFAULT_REQUEST_TIMEOUT_UNIT_MS = 5000; public static final Float MIN_HEALTH_SCORE = 10f; @@ -66,4 +67,5 @@ public class Constant { public static final Integer DEFAULT_RETRY_TIME = 3; + public static final Integer ZK_ALIVE_BUT_4_LETTER_FORBIDDEN = 11; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java index af8bd2c3..1b8a7740 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESConstant.java @@ -34,6 +34,8 @@ public class ESConstant { public static final String TOTAL = "total"; + public static final Integer DEFAULT_RETRY_TIME = 3; + private ESConstant() { } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java index 0de516f7..64aef24f 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/ESIndexConstant.java @@ -644,4 +644,89 @@ public class ESIndexConstant { " \"aliases\" : { }\n" + " }"; + public final static String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric"; + public final static String ZOOKEEPER_TEMPLATE = "{\n" + + " \"order\" : 10,\n" + + " \"index_patterns\" : [\n" + + " \"ks_kafka_zookeeper_metric*\"\n" + + " ],\n" + + " \"settings\" : {\n" + + " \"index\" : {\n" + + " \"number_of_shards\" : \"10\"\n" + + " }\n" + + " },\n" + + " \"mappings\" : {\n" + + " \"properties\" : {\n" + + " \"routingValue\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"clusterPhyId\" : {\n" + + " \"type\" : \"long\"\n" + + " },\n" + + " \"metrics\" : {\n" + + " \"properties\" : {\n" + + " \"AvgRequestLatency\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MinRequestLatency\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MaxRequestLatency\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"OutstandingRequests\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"NodeCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"WatchCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"NumAliveConnections\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PacketsReceived\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"PacketsSent\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"EphemeralsCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"ApproximateDataSize\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"OpenFileDescriptorCount\" : {\n" + + " \"type\" : \"double\"\n" + + " },\n" + + " \"MaxFileDescriptorCount\" : {\n" + + " \"type\" : \"double\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"key\" : {\n" + + " \"type\" : \"text\",\n" + + " \"fields\" : {\n" + + " \"keyword\" : {\n" + + " \"ignore_above\" : 256,\n" + + " \"type\" : \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"timestamp\" : {\n" + + " \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" + + " \"type\" : \"date\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"aliases\" : { }\n" + + " }"; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ZnodeConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ZnodeConverter.java new file mode 100644 index 00000000..9b197358 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ZnodeConverter.java @@ -0,0 +1,19 @@ +package com.xiaojukeji.know.streaming.km.common.converter; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import org.apache.zookeeper.data.Stat; + +public class ZnodeConverter { + ZnodeConverter(){ + + } + + public static Znode convert2Znode(Tuple dataAndStat, String path) { + Znode znode = new Znode(); + znode.setStat(dataAndStat.getV2()); + znode.setData(dataAndStat.getV1() == null ? null : new String(dataAndStat.getV1())); + znode.setName(path.substring(path.lastIndexOf('/') + 1)); + return znode; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java index 15f13175..004dad6d 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java @@ -9,7 +9,9 @@ public enum VersionItemTypeEnum { METRIC_GROUP(102, "group_metric"), METRIC_BROKER(103, "broker_metric"), METRIC_PARTITION(104, "partition_metric"), - METRIC_REPLICATION (105, "replication_metric"), + METRIC_REPLICATION(105, "replication_metric"), + + METRIC_ZOOKEEPER(110, "zookeeper_metric"), /** * 服务端查询 diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/zookeeper/ZKRoleEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/zookeeper/ZKRoleEnum.java new file mode 100644 index 00000000..fd379dc8 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/zookeeper/ZKRoleEnum.java @@ -0,0 +1,22 @@ +package com.xiaojukeji.know.streaming.km.common.enums.zookeeper; + +import lombok.Getter; + +@Getter +public enum ZKRoleEnum { + LEADER("leader"), + + FOLLOWER("follower"), + + OBSERVER("observer"), + + UNKNOWN("unknown"), + + ; + + private final String role; + + ZKRoleEnum(String role) { + this.role = role; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java index cc7bfcb4..a9bea1c3 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxAttribute.java @@ -22,6 +22,12 @@ public class JmxAttribute { public static final String PERCENTILE_99 = "99thPercentile"; + public static final String MAX = "Max"; + + public static final String MEAN = "Mean"; + + public static final String MIN = "Min"; + public static final String VALUE = "Value"; public static final String CONNECTION_COUNT = "connection-count"; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java index d2d1651e..db8b3197 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxName.java @@ -63,6 +63,12 @@ public class JmxName { /*********************************************************** cluster ***********************************************************/ public static final String JMX_CLUSTER_PARTITION_UNDER_REPLICATED = "kafka.cluster:type=Partition,name=UnderReplicated"; + /*********************************************************** zookeeper ***********************************************************/ + + public static final String JMX_ZK_REQUEST_LATENCY_MS = "kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs"; + public static final String JMX_ZK_SYNC_CONNECTS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperSyncConnectsPerSec"; + public static final String JMX_ZK_DISCONNECTORS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"; + private JmxName() { } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/FourLetterWordUtil.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/FourLetterWordUtil.java new file mode 100644 index 00000000..a3ae31af --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/FourLetterWordUtil.java @@ -0,0 +1,163 @@ +package com.xiaojukeji.know.streaming.km.common.utils.zookeeper; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.FourLetterWordDataParser; +import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import org.apache.zookeeper.common.ClientX509Util; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.common.X509Util; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.HashSet; +import java.util.Set; + +public class FourLetterWordUtil { + private static final ILog LOGGER = LogFactory.getLog(FourLetterWordUtil.class); + + public static final String MonitorCmd = "mntr"; + public static final String ConfigCmd = "conf"; + public static final String ServerCmd = "srvr"; + + private static final Set supportedCommands = new HashSet<>(); + + public static Result executeFourLetterCmd(Long clusterPhyId, + String host, + int port, + boolean secure, + int timeout, + FourLetterWordDataParser dataParser) { + try { + if (!supportedCommands.contains(dataParser.getCmd())) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, String.format("ZK %s命令暂未进行支持", dataParser.getCmd())); + } + + String cmdData = send4LetterWord(host, port, dataParser.getCmd(), secure, timeout); + if (cmdData.contains("not executed because it is not in the whitelist.")) { + return Result.buildFromRSAndMsg(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN, cmdData); + } + if (ValidateUtils.isBlank(cmdData)) { + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, cmdData); + } + + return Result.buildSuc(dataParser.parseAndInitData(clusterPhyId, host, port, cmdData)); + } catch (Exception e) { + LOGGER.error( + "class=FourLetterWordUtil||method=executeFourLetterCmd||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!", + clusterPhyId, host, port, dataParser.getCmd(), secure, timeout, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } + + + /**************************************************** private method ****************************************************/ + + private static String send4LetterWord( + String host, + int port, + String cmd, + boolean secure, + int timeout) throws IOException, X509Exception.SSLContextException { + long startTime = System.currentTimeMillis(); + + LOGGER.info("connecting to {} {}", host, port); + + Socket socket = null; + OutputStream outputStream = null; + BufferedReader bufferedReader = null; + try { + InetSocketAddress hostaddress = host != null + ? new InetSocketAddress(host, port) + : new InetSocketAddress(InetAddress.getByName(null), port); + if (secure) { + LOGGER.info("using secure socket"); + try (X509Util x509Util = new ClientX509Util()) { + SSLContext sslContext = x509Util.getDefaultSSLContext(); + SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + SSLSocket sslSock = (SSLSocket) socketFactory.createSocket(); + sslSock.connect(hostaddress, timeout); + sslSock.startHandshake(); + socket = sslSock; + } + } else { + socket = new Socket(); + socket.connect(hostaddress, timeout); + } + socket.setSoTimeout(timeout); + + outputStream = socket.getOutputStream(); + outputStream.write(cmd.getBytes()); + outputStream.flush(); + + // 等待InputStream有数据 + while (System.currentTimeMillis() - startTime <= timeout && socket.getInputStream().available() <= 0) { + BackoffUtils.backoff(10); + } + + bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + sb.append(line).append("\n"); + } + return sb.toString(); + } catch (SocketTimeoutException e) { + throw new IOException("Exception while executing four letter word: " + cmd, e); + } finally { + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + LOGGER.error( + "class=FourLetterWordUtil||method=send4LetterWord||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!", + host, port, cmd, secure, timeout, e + ); + } + } + + if (bufferedReader != null) { + try { + bufferedReader.close(); + } catch (IOException e) { + LOGGER.error( + "class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!", + host, port, cmd, secure, timeout, e + ); + } + } + + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + LOGGER.error( + "class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!", + host, port, cmd, secure, timeout, e + ); + } + } + } + } + + static { + supportedCommands.add(MonitorCmd); + supportedCommands.add(ConfigCmd); + supportedCommands.add(ServerCmd); + } + +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/ZookeeperUtils.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/ZookeeperUtils.java new file mode 100644 index 00000000..9d8c6c5b --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/zookeeper/ZookeeperUtils.java @@ -0,0 +1,59 @@ +package com.xiaojukeji.know.streaming.km.common.utils.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import org.apache.zookeeper.client.ConnectStringParser; +import org.apache.zookeeper.common.NetUtils; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.zookeeper.common.StringUtils.split; + +public class ZookeeperUtils { + private static final int DEFAULT_PORT = 2181; + + /** + * 解析ZK地址 + * @see ConnectStringParser + */ + public static List> connectStringParser(String connectString) throws Exception { + List> ipPortList = new ArrayList<>(); + + if (connectString == null) { + return ipPortList; + } + + // parse out chroot, if any + int off = connectString.indexOf('/'); + if (off >= 0) { + connectString = connectString.substring(0, off); + } + + List hostsList = split(connectString, ","); + for (String host : hostsList) { + int port = DEFAULT_PORT; + String[] hostAndPort = NetUtils.getIPV6HostAndPort(host); + if (hostAndPort.length != 0) { + host = hostAndPort[0]; + if (hostAndPort.length == 2) { + port = Integer.parseInt(hostAndPort[1]); + } + } else { + int pidx = host.lastIndexOf(':'); + if (pidx >= 0) { + // otherwise : is at the end of the string, ignore + if (pidx < host.length() - 1) { + port = Integer.parseInt(host.substring(pidx + 1)); + } + host = host.substring(0, pidx); + } + } + + ipPortList.add(new Tuple<>(host, port)); + } + + return ipPortList; + } + + +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index fbede23c..7fc4f4f2 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -343,17 +343,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok private Broker getStartTimeAndBuildBroker(Long clusterPhyId, Node newNode, JmxConfig jmxConfig) { try { - Object object = jmxDAO.getJmxValue( - clusterPhyId, - newNode.id(), - newNode.host(), - null, - jmxConfig, - new ObjectName("java.lang:type=Runtime"), - "StartTime" - ); + Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), null, jmxConfig); - return Broker.buildFrom(clusterPhyId, newNode, object != null? (Long) object: null); + return Broker.buildFrom(clusterPhyId, newNode, startTime); } catch (Exception e) { log.error("class=BrokerServiceImpl||method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java new file mode 100644 index 00000000..9b0d4d2b --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/ZookeeperMetricVersionItems.java @@ -0,0 +1,141 @@ +package com.xiaojukeji.know.streaming.km.core.service.version.metrics; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem.*; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_ZOOKEEPER; +import static com.xiaojukeji.know.streaming.km.common.jmx.JmxAttribute.*; +import static com.xiaojukeji.know.streaming.km.common.jmx.JmxName.*; +import static com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl.ZookeeperMetricServiceImpl.*; + +@Component +public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric { + + /** + * 性能 + */ + public static final String ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY = "AvgRequestLatency"; + public static final String ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY = "MinRequestLatency"; + public static final String ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY = "MaxRequestLatency"; + public static final String ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS = "OutstandingRequests"; + public static final String ZOOKEEPER_METRIC_NODE_COUNT = "NodeCount"; + public static final String ZOOKEEPER_METRIC_WATCH_COUNT = "WatchCount"; + public static final String ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS = "NumAliveConnections"; + public static final String ZOOKEEPER_METRIC_PACKETS_RECEIVED = "PacketsReceived"; + public static final String ZOOKEEPER_METRIC_PACKETS_SENT = "PacketsSent"; + public static final String ZOOKEEPER_METRIC_EPHEMERALS_COUNT = "EphemeralsCount"; + public static final String ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE = "ApproximateDataSize"; + public static final String ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT = "OpenFileDescriptorCount"; + public static final String ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT = "MaxFileDescriptorCount"; + + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC = "KafkaZKDisconnectsPerSec"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC = "KafkaZKSyncConnectsPerSec"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH = "KafkaZKRequestLatencyMs_99thPercentile"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX = "KafkaZKRequestLatencyMs_Max"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN = "KafkaZKRequestLatencyMs_Mean"; + public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN = "KafkaZKRequestLatencyMs_Min"; + + + public static final String ZOOKEEPER_METRIC_COLLECT_COST_TIME = Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME; + + @Override + public int versionItemType() { + return METRIC_ZOOKEEPER.getCode(); + } + + @Override + public List init(){ + List items = new ArrayList<>(); + + // 性能指标 + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY).unit("ms").desc("平均响应延迟").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY).unit("ms").desc("最小响应延迟").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY).unit("ms").desc("最大响应延迟").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS).unit("个").desc("堆积请求数").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_NODE_COUNT).unit("个").desc("ZNode数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_WATCH_COUNT).unit("个").desc("Watch数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS).unit("个").desc("客户端连接数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_PACKETS_RECEIVED).unit("个").desc("接受包的数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_PACKETS_SENT).unit("个").desc("发送包的数量").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_EPHEMERALS_COUNT).unit("个").desc("临时节点数").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE).unit("byte").desc("文件大小(近似值)").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT).unit("个").desc("已打开的文件描述符数").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT).unit("个").desc("允许打开的最大文件描述符数").category(CATEGORY_PERFORMANCE) + .extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD)); + + // JMX指标 + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH).unit("ms").desc("ZK请求99分位延迟").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(PERCENTILE_99))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX).unit("ms").desc("ZK请求最大延迟").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MAX))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN).unit("ms").desc("ZK请求最小延迟").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MIN))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN).unit("ms").desc("ZK请求平均延迟").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MEAN))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC).unit("个").desc("断开连接数").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_DISCONNECTORS_PER_SEC ).jmxAttribute(RATE_MIN_1))); + + items.add(buildAllVersionsItem() + .name(ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC).unit("个").desc("同步连接数").category(CATEGORY_CLIENT) + .extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX ) + .jmxObjectName( JMX_ZK_SYNC_CONNECTS_PER_SEC ).jmxAttribute(RATE_MIN_1))); + return items; + } +} + diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZnodeService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZnodeService.java new file mode 100644 index 00000000..43e7744d --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZnodeService.java @@ -0,0 +1,13 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode; + +import java.util.List; + +public interface ZnodeService { + + Result> listZnodeChildren(Long clusterPhyId, String path, String keyword); + + Result getZnode(Long clusterPhyId, String path); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperMetricService.java new file mode 100644 index 00000000..2dc48851 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperMetricService.java @@ -0,0 +1,21 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; + +import java.util.List; + +public interface ZookeeperMetricService { + /** + * ZK指标获取 + * @param param 参数,因为ZK 四字命令在使用时,是短连接,所以参数内容会复杂一些,后续可以考虑优化为长连接 + * @return + */ + Result collectMetricsFromZookeeper(ZookeeperMetricParam param); + Result batchCollectMetricsFromZookeeper(Long clusterPhyId, List metricNameList); + + Result> listMetricsFromES(Long clusterPhyId, MetricDTO dto); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java new file mode 100644 index 00000000..758247aa --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; + +import java.util.List; + +public interface ZookeeperService { + /** + * 从ZK集群中获取ZK信息 + */ + Result> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig); + + void batchReplaceDataInDB(Long clusterPhyId, List infoList); + + List listFromDBByCluster(Long clusterPhyId); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZnodeServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZnodeServiceImpl.java new file mode 100644 index 00000000..9b9a70ef --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZnodeServiceImpl.java @@ -0,0 +1,81 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode; +import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.converter.ZnodeConverter; +import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; +import org.apache.zookeeper.data.Stat; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import java.util.List; +import java.util.stream.Collectors; + + +@Service +public class ZnodeServiceImpl implements ZnodeService { + private static final ILog LOGGER = LogFactory.getLog(ZnodeServiceImpl.class); + + @Autowired + private KafkaZKDAO kafkaZKDAO; + + @Autowired + private ClusterPhyService clusterPhyService; + + + + @Override + public Result> listZnodeChildren(Long clusterPhyId, String path, String keyword) { + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId); + if (clusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); + } + + List children; + try { + children = kafkaZKDAO.getChildren(clusterPhyId, path, false); + } catch (NotExistException e) { + LOGGER.error("class=ZnodeServiceImpl||method=listZnodeChildren||clusterPhyId={}||errMsg={}", clusterPhyId, "create ZK client create failed"); + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, "ZK客户端创建失败"); + } catch (Exception e) { + LOGGER.error("class=ZnodeServiceImpl||method=listZnodeChildren||clusterPhyId={}||errMsg={}", clusterPhyId, "ZK operate failed"); + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, "ZK操作失败"); + } + + //关键字搜索 + if (keyword != null) { + children = children.stream().filter(elem -> elem.contains(keyword)).collect(Collectors.toList()); + } + return Result.buildSuc(children); + } + + @Override + public Result getZnode(Long clusterPhyId, String path) { + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId); + if (clusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); + } + + //获取zookeeper上的原始数据 + Tuple dataAndStat; + try { + dataAndStat = kafkaZKDAO.getDataAndStat(clusterPhyId, path); + } catch (NotExistException e) { + LOGGER.error("class=ZnodeServiceImpl||method=getZnode||clusterPhyId={}||errMsg={}", clusterPhyId, "create ZK client create failed"); + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, "ZK客户端创建失败"); + } catch (Exception e) { + LOGGER.error("class=ZnodeServiceImpl||method=getZnode||clusterPhyId={}||errMsg={}", clusterPhyId, "ZK operate failed"); + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, "ZK操作失败"); + } + + return Result.buildSuc(ZnodeConverter.convert2Znode(dataAndStat, path)); + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java new file mode 100644 index 00000000..dea1d877 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperMetricServiceImpl.java @@ -0,0 +1,281 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.MonitorCmdDataParser; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; +import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; +import com.xiaojukeji.know.streaming.km.common.utils.*; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import com.xiaojukeji.know.streaming.km.persistence.es.dao.ZookeeperMetricESDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.management.ObjectName; +import java.util.*; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_JMX_CONNECT_ERROR; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems.*; + + +@Service +public class ZookeeperMetricServiceImpl extends BaseMetricService implements ZookeeperMetricService { + private static final ILog LOGGER = LogFactory.getLog(ZookeeperMetricServiceImpl.class); + + public static final String ZOOKEEPER_METHOD_DO_NOTHING = "doNothing"; + public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD = "getMetricFromMonitorCmd"; + public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD = "getMetricFromServerCmd"; + public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX = "getMetricFromKafkaByJMX"; + + @Autowired + private ClusterPhyService clusterPhyService; + + @Autowired + private ZookeeperService zookeeperService; + + @Autowired + private ZookeeperMetricESDAO zookeeperMetricESDAO; + + @Autowired + private KafkaJMXClient kafkaJMXClient; + + @Autowired + private KafkaControllerService kafkaControllerService; + + @Override + protected VersionItemTypeEnum getVersionItemType() { + return VersionItemTypeEnum.METRIC_ZOOKEEPER; + } + + @Override + protected List listMetricPOFields(){ + return BeanUtil.listBeanFields(ZookeeperMetricPO.class); + } + + @Override + protected void initRegisterVCHandler(){ + registerVCHandler( ZOOKEEPER_METHOD_DO_NOTHING, this::doNothing); + registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD, this::getMetricFromMonitorCmd); + registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD, this::getMetricFromServerCmd); + registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX, this::getMetricFromKafkaByJMX); + } + + @Override + public Result collectMetricsFromZookeeper(ZookeeperMetricParam param) { + try { + return (Result)doVCHandler(param.getClusterPhyId(), param.getMetricName(), param); + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + @Override + public Result batchCollectMetricsFromZookeeper(Long clusterPhyId, List metricNameList) { + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId); + if (null == clusterPhy) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); + } + + List aliveZKList = zookeeperService.listFromDBByCluster(clusterPhyId).stream() + .filter(elem -> Constant.ALIVE.equals(elem.getStatus())) + .collect(Collectors.toList()); + + if (ValidateUtils.isEmptyList(aliveZKList)) { + // 没有指标可以获取 + return Result.buildSuc(new ZookeeperMetrics(clusterPhyId)); + } + + // 构造参数 + ZookeeperMetricParam param = new ZookeeperMetricParam( + clusterPhyId, + aliveZKList.stream().map(elem -> new Tuple(elem.getHost(), elem.getPort())).collect(Collectors.toList()), + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class), + null + ); + + ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId); + for(String metricName : metricNameList) { + try { + if(metrics.getMetrics().containsKey(metricName)) { + continue; + } + param.setMetricName(metricName); + + Result ret = this.collectMetricsFromZookeeper(param); + if(null == ret || ret.failed() || null == ret.getData()){ + continue; + } + + metrics.putMetric(ret.getData().getMetrics()); + } catch (Exception e){ + LOGGER.error( + "class=ZookeeperMetricServiceImpl||method=collectMetricsFromZookeeper||clusterPhyId={}||metricName={}||errMsg=exception!", + clusterPhyId, metricName, e + ); + } + } + + return Result.buildSuc(metrics); + } + + @Override + public Result> listMetricsFromES(Long clusterPhyId, MetricDTO dto) { + Map> pointVOMap = zookeeperMetricESDAO.listMetricsByClusterPhyId( + clusterPhyId, + dto.getMetricsNames(), + dto.getAggType(), + dto.getStartTime(), + dto.getEndTime() + ); + + // 格式转化 + List voList = new ArrayList<>(); + pointVOMap.entrySet().stream().forEach(entry -> + voList.add(new MetricLineVO(String.valueOf(clusterPhyId), entry.getKey(), entry.getValue())) + ); + return Result.buildSuc(voList); + } + + + /**************************************************** private method ****************************************************/ + + private Result getMetricFromServerCmd(VersionItemParam metricParam) { + ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam; + + Result rz = null; + for (Tuple hostPort: param.getZkAddressList()) { + Result cmdDataResult = FourLetterWordUtil.executeFourLetterCmd( + param.getClusterPhyId(), + hostPort.getV1(), + hostPort.getV2(), + param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false, + param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS, + new ServerCmdDataParser() + ); + + if (cmdDataResult.failed()) { + rz = Result.buildFromIgnoreData(cmdDataResult); + continue; + } + + ServerCmdData cmdData = cmdDataResult.getData(); + + ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId()); + metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue()); + + return Result.buildSuc(metrics); + } + + return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId())); + } + + private Result getMetricFromMonitorCmd(VersionItemParam metricParam) { + ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam; + + Result rz = null; + for (Tuple hostPort: param.getZkAddressList()) { + Result cmdDataResult = FourLetterWordUtil.executeFourLetterCmd( + param.getClusterPhyId(), + hostPort.getV1(), + hostPort.getV2(), + param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false, + param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS, + new MonitorCmdDataParser() + ); + + if (cmdDataResult.failed()) { + rz = Result.buildFromIgnoreData(cmdDataResult); + continue; + } + + MonitorCmdData cmdData = cmdDataResult.getData(); + + ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId()); + metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_WATCH_COUNT, cmdData.getZkWatchCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_EPHEMERALS_COUNT, cmdData.getZkEphemeralsCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE, cmdData.getZkApproximateDataSize().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT, cmdData.getZkOpenFileDescriptorCount().floatValue()); + metrics.putMetric(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT, cmdData.getZkMaxFileDescriptorCount().floatValue()); + + return Result.buildSuc(metrics); + } + + return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId())); + } + + private Result doNothing(VersionItemParam metricParam) { + ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam; + return Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId())); + } + + private Result getMetricFromKafkaByJMX(VersionItemParam metricParam) { + ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam; + + String metricName = param.getMetricName(); + Long clusterPhyId = param.getClusterPhyId(); + Integer kafkaControllerId = param.getKafkaControllerId(); + + //1、获取jmx的属性信息 + VersionJmxInfo jmxInfo = getJMXInfo(clusterPhyId, metricName); + if(null == jmxInfo) { + return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST); + } + + //2、获取jmx连接 + JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterPhyId, kafkaControllerId); + if (ValidateUtils.isNull(jmxConnectorWrap)) { + return Result.buildFailure(VC_JMX_INIT_ERROR); + } + + try { + //2、获取jmx指标 + String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString(); + + return Result.buildSuc(ZookeeperMetrics.initWithMetric(clusterPhyId, metricName, Float.valueOf(value))); + } catch (Exception e) { + return Result.buildFailure(VC_JMX_CONNECT_ERROR); + } + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java new file mode 100644 index 00000000..2f1e318c --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java @@ -0,0 +1,147 @@ +package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.Tuple; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper.ZookeeperInfoPO; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil; +import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.ZookeeperUtils; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper.ZookeeperDAO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Service +public class ZookeeperServiceImpl implements ZookeeperService { + private static final ILog LOGGER = LogFactory.getLog(ZookeeperServiceImpl.class); + + @Autowired + private ZookeeperDAO zookeeperDAO; + + @Override + public Result> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig) { + List> addressList = null; + try { + addressList = ZookeeperUtils.connectStringParser(zookeeperAddress); + } catch (Exception e) { + LOGGER.error( + "class=ZookeeperServiceImpl||method=listFromZookeeperCluster||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!", + clusterPhyId, zookeeperAddress, e + ); + + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, e.getMessage()); + } + + List aliveZKList = new ArrayList<>(); + for (Tuple hostPort: addressList) { + aliveZKList.add(this.getFromZookeeperCluster( + clusterPhyId, + hostPort.getV1(), + hostPort.getV2(), + zkConfig + )); + } + return Result.buildSuc(aliveZKList); + } + + @Override + public void batchReplaceDataInDB(Long clusterPhyId, List infoList) { + // DB 中的信息 + List dbInfoList = this.listRawFromDBByCluster(clusterPhyId); + Map dbMap = new HashMap<>(); + dbInfoList.stream().forEach(elem -> dbMap.put(elem.getHost() + elem.getPort(), elem)); + + // 新获取到的信息 + List newInfoList = ConvertUtil.list2List(infoList, ZookeeperInfoPO.class); + for (ZookeeperInfoPO newInfo: newInfoList) { + try { + ZookeeperInfoPO oldInfo = dbMap.remove(newInfo.getHost() + newInfo.getPort()); + if (oldInfo == null) { + zookeeperDAO.insert(newInfo); + } else if (!Constant.DOWN.equals(newInfo.getStatus())) { + // 存活时,直接使用获取到的数据 + newInfo.setId(oldInfo.getId()); + zookeeperDAO.updateById(newInfo); + } else { + // 如果挂了,则版本和角色信息,使用先前的信息。 + // 挂掉之后,如果角色是leader,则需要调整一下 + newInfo.setId(oldInfo.getId()); + newInfo.setRole(ZKRoleEnum.LEADER.getRole().equals(oldInfo.getRole())? ZKRoleEnum.FOLLOWER.getRole(): oldInfo.getRole()); + newInfo.setVersion(oldInfo.getVersion()); + zookeeperDAO.updateById(newInfo); + } + } catch (Exception e) { + LOGGER.error("class=ZookeeperServiceImpl||method=batchReplaceDataInDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterPhyId, newInfo, e); + } + } + + // 删除剩余的ZK节点 + dbMap.entrySet().forEach(entry -> { + try { + zookeeperDAO.deleteById(entry.getValue().getId()); + } catch (Exception e) { + LOGGER.error("class=ZookeeperServiceImpl||method=batchReplaceDataInDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterPhyId, entry.getValue(), e); + } + }); + } + + @Override + public List listFromDBByCluster(Long clusterPhyId) { + return ConvertUtil.list2List(this.listRawFromDBByCluster(clusterPhyId), ZookeeperInfo.class); + } + + + /**************************************************** private method ****************************************************/ + + private List listRawFromDBByCluster(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(ZookeeperInfoPO::getClusterPhyId, clusterPhyId); + + return zookeeperDAO.selectList(lambdaQueryWrapper); + } + + private ZookeeperInfo getFromZookeeperCluster(Long clusterPhyId, String host, Integer port, ZKConfig zkConfig) { + ZookeeperInfo zookeeperInfo = new ZookeeperInfo(); + zookeeperInfo.setClusterPhyId(clusterPhyId); + zookeeperInfo.setHost(host); + zookeeperInfo.setPort(port); + zookeeperInfo.setRole(""); + zookeeperInfo.setVersion(""); + zookeeperInfo.setStatus(Constant.DOWN); + + Result serverCmdDataResult = FourLetterWordUtil.executeFourLetterCmd( + clusterPhyId, + host, + port, + zkConfig != null ? zkConfig.getOpenSecure(): false, + zkConfig != null ? zkConfig.getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS, + new ServerCmdDataParser() + ); + if (serverCmdDataResult.hasData()) { + zookeeperInfo.setRole(serverCmdDataResult.getData().getZkServerState()); + zookeeperInfo.setVersion(serverCmdDataResult.getData().getZkVersion()); + zookeeperInfo.setStatus(Constant.ALIVE); + } else if (serverCmdDataResult.getCode().equals(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN.getCode())) { + zookeeperInfo.setStatus(Constant.ZK_ALIVE_BUT_4_LETTER_FORBIDDEN); + } else { + return zookeeperInfo; + } + + return zookeeperInfo; + } +} diff --git a/km-dist/init/sql/ddl-ks-km.sql b/km-dist/init/sql/ddl-ks-km.sql index d9e4e16c..89f04034 100644 --- a/km-dist/init/sql/ddl-ks-km.sql +++ b/km-dist/init/sql/ddl-ks-km.sql @@ -355,3 +355,19 @@ CREATE TABLE `ks_km_app_node` ( PRIMARY KEY (`id`), KEY `idx_app_host` (`app_name`,`host_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='km集群部署的node信息'; + + +DROP TABLE IF EXISTS `ks_km_zookeeper`; +CREATE TABLE `ks_km_zookeeper` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '物理集群ID', + `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper主机名', + `port` int(16) NOT NULL DEFAULT '-1' COMMENT 'zookeeper端口', + `role` int(16) NOT NULL DEFAULT '-1' COMMENT '角色, leader follower observer', + `version` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper版本', + `status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活,11存活但是4字命令使用不了', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_cluster_phy_id_host_port` (`cluster_phy_id`,`host`, `port`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Zookeeper信息表'; \ No newline at end of file diff --git a/km-dist/init/template/ks_kafka_zookeeper_metric b/km-dist/init/template/ks_kafka_zookeeper_metric new file mode 100644 index 00000000..abb54a61 --- /dev/null +++ b/km-dist/init/template/ks_kafka_zookeeper_metric @@ -0,0 +1,85 @@ +PUT _template/ks_kafka_zookeeper_metric +{ + "order" : 10, + "index_patterns" : [ + "ks_kafka_zookeeper_metric*" + ], + "settings" : { + "index" : { + "number_of_shards" : "10" + } + }, + "mappings" : { + "properties" : { + "routingValue" : { + "type" : "text", + "fields" : { + "keyword" : { + "ignore_above" : 256, + "type" : "keyword" + } + } + }, + "clusterPhyId" : { + "type" : "long" + }, + "metrics" : { + "properties" : { + "AvgRequestLatency" : { + "type" : "double" + }, + "MinRequestLatency" : { + "type" : "double" + }, + "MaxRequestLatency" : { + "type" : "double" + }, + "OutstandingRequests" : { + "type" : "double" + }, + "NodeCount" : { + "type" : "double" + }, + "WatchCount" : { + "type" : "double" + }, + "NumAliveConnections" : { + "type" : "double" + }, + "PacketsReceived" : { + "type" : "double" + }, + "PacketsSent" : { + "type" : "double" + }, + "EphemeralsCount" : { + "type" : "double" + }, + "ApproximateDataSize" : { + "type" : "double" + }, + "OpenFileDescriptorCount" : { + "type" : "double" + }, + "MaxFileDescriptorCount" : { + "type" : "double" + } + } + }, + "key" : { + "type" : "text", + "fields" : { + "keyword" : { + "ignore_above" : 256, + "type" : "keyword" + } + } + }, + "timestamp" : { + "format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis", + "type" : "date" + } + } + }, + "aliases" : { } + } \ No newline at end of file diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java index a6615fbc..faeb64cb 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BaseMetricESDAO.java @@ -40,8 +40,7 @@ public class BaseMetricESDAO extends BaseESDAO { /** * 不同维度 kafka 监控数据 */ - private static Map ariusStatsEsDaoMap = Maps - .newConcurrentMap(); + private static Map ariusStatsEsDaoMap = Maps.newConcurrentMap(); /** * 检查 es 索引是否存在,不存在则创建索引 diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ZookeeperMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ZookeeperMetricESDAO.java new file mode 100644 index 00000000..8b391a3a --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/ZookeeperMetricESDAO.java @@ -0,0 +1,106 @@ +package com.xiaojukeji.know.streaming.km.persistence.es.dao; + +import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse; +import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO; +import com.xiaojukeji.know.streaming.km.common.constant.ESConstant; +import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils; +import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX; +import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_TEMPLATE; + +@Component +public class ZookeeperMetricESDAO extends BaseMetricESDAO { + + @PostConstruct + public void init() { + super.indexName = ZOOKEEPER_INDEX; + super.indexTemplate = ZOOKEEPER_TEMPLATE; + checkCurrentDayIndexExist(); + BaseMetricESDAO.register(indexName, this); + } + + /** + * 获取指定集群,指定指标,一段时间内的值 + */ + public Map> listMetricsByClusterPhyId(Long clusterPhyId, + List metricNameList, + String aggType, + Long startTime, + Long endTime) { + //1、获取需要查下的索引 + String realIndex = realIndex(startTime, endTime); + + //2、根据查询的时间区间大小来确定指标点的聚合区间大小 + String interval = MetricsUtils.getInterval(endTime - startTime); + + //3、构造agg查询条件 + String aggDsl = buildAggsDSL(metricNameList, aggType); + + //4、构造dsl查询条件,开始查询 + try { + String dsl = dslLoaderUtil.getFormatDslByFileName( + DslsConstant.GET_ZOOKEEPER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl); + + return esOpClient.performRequestWithRouting( + String.valueOf(clusterPhyId), + realIndex, + dsl, + s -> handleListESQueryResponse(s, metricNameList, aggType), + ESConstant.DEFAULT_RETRY_TIME + ); + } catch (Exception e){ + LOGGER.error("class=ZookeeperMetricESDAO||method=listMetricsByClusterPhyId||clusterPhyId={}||errMsg=exception!", + clusterPhyId, e + ); + } + + return new HashMap<>(); + } + + /**************************************************** private method ****************************************************/ + + private Map> handleListESQueryResponse(ESQueryResponse response, List metrics, String aggType){ + Map esAggrMap = checkBucketsAndHitsOfResponseAggs(response); + if(null == esAggrMap) { + return new HashMap<>(); + } + + Map> metricMap = new HashMap<>(); + for(String metric : metrics){ + List metricPoints = new ArrayList<>(); + + esAggrMap.get(HIST).getBucketList().forEach( esBucket -> { + try { + if (null != esBucket.getUnusedMap().get(KEY)) { + Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString()); + String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString(); + + MetricPointVO metricPoint = new MetricPointVO(); + metricPoint.setAggType(aggType); + metricPoint.setTimeStamp(timestamp); + metricPoint.setValue(value); + metricPoint.setName(metric); + + metricPoints.add(metricPoint); + } + }catch (Exception e){ + LOGGER.error("method=handleESQueryResponse||metric={}||errMsg=exception!", metric, e); + } + } ); + + metricMap.put(metric, optimizeMetricPoints(metricPoints)); + } + + return metricMap; + } +} diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslsConstant.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslsConstant.java index 3f158f36..94a8698e 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslsConstant.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dsls/DslsConstant.java @@ -80,4 +80,6 @@ public class DslsConstant { public static final String COUNT_GROUP_NOT_METRIC_VALUE = "GroupMetricESDAO/countGroupNotMetricValue"; + /**************************************************** Zookeeper ****************************************************/ + public static final String GET_ZOOKEEPER_AGG_LIST_METRICS = "ZookeeperMetricESDAO/getAggListZookeeperMetrics"; } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/JmxDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/JmxDAO.java index a3747c0a..017bcf04 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/JmxDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/JmxDAO.java @@ -12,5 +12,7 @@ import javax.management.ObjectName; public interface JmxDAO { Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute); - Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute); + Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute); + + Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig); } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/impl/JmxDAOImpl.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/impl/JmxDAOImpl.java index ec8349cc..77eb3252 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/impl/JmxDAOImpl.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/jmx/impl/JmxDAOImpl.java @@ -19,24 +19,28 @@ public class JmxDAOImpl implements JmxDAO { @Override public Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) { - return this.getJmxValue(null, null, jmxHost, jmxPort, jmxConfig, objectName, attribute); + return this.getJmxValue(null, jmxHost, jmxPort, jmxConfig, objectName, attribute); } @Override - public Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) { + public Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) { JmxConnectorWrap jmxConnectorWrap = null; try { - jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, brokerId, null, jmxHost, jmxPort, jmxConfig); + jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, null, null, jmxHost, jmxPort, jmxConfig); if (!jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) { - log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed", - clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig); + log.error( + "method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed", + clusterPhyId, jmxHost, jmxPort, jmxConfig + ); return null; } return jmxConnectorWrap.getAttribute(objectName, attribute); } catch (Exception e) { - log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg={}", - clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e); + log.error( + "method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg=exception!", + clusterPhyId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e + ); } finally { if (jmxConnectorWrap != null) { jmxConnectorWrap.close(); @@ -45,4 +49,27 @@ public class JmxDAOImpl implements JmxDAO { return null; } + + @Override + public Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig) { + try { + Object object = this.getJmxValue( + clusterPhyId, + jmxHost, + jmxPort, + jmxConfig, + new ObjectName("java.lang:type=Runtime"), + "StartTime" + ); + + return object == null? null: (Long) object; + } catch (Exception e) { + log.error( + "class=JmxDAOImpl||method=getServerStartTime||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMsg=exception!", + clusterPhyId, jmxHost, jmxPort, jmxConfig, e + ); + } + + return null; + } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/zookeeper/ZookeeperDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/zookeeper/ZookeeperDAO.java new file mode 100644 index 00000000..73a177ae --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/zookeeper/ZookeeperDAO.java @@ -0,0 +1,9 @@ +package com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper.ZookeeperInfoPO; +import org.springframework.stereotype.Repository; + +@Repository +public interface ZookeeperDAO extends BaseMapper { +} diff --git a/km-persistence/src/main/resources/dsl/ZookeeperMetricESDAO/getAggListZookeeperMetrics b/km-persistence/src/main/resources/dsl/ZookeeperMetricESDAO/getAggListZookeeperMetrics new file mode 100644 index 00000000..c05c221d --- /dev/null +++ b/km-persistence/src/main/resources/dsl/ZookeeperMetricESDAO/getAggListZookeeperMetrics @@ -0,0 +1,44 @@ +{ + "size": 0, + "query": { + "bool": { + "must": [ + { + "term": { + "clusterPhyId": { + "value": %d + } + } + }, + { + "term": { + "brokerId": { + "value": %d + } + } + }, + { + "range": { + "timestamp": { + "gte": %d, + "lte": %d + } + } + } + ] + } + }, + "aggs": { + "hist": { + "date_histogram": { + "field": "timestamp", + "fixed_interval": "%s", + "time_zone": "Asia/Shanghai", + "min_doc_count": 0 + }, + "aggs": { + %s + } + } + } +} \ No newline at end of file diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterZookeepersController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterZookeepersController.java new file mode 100644 index 00000000..99faa832 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/cluster/ClusterZookeepersController.java @@ -0,0 +1,63 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster; + +import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterZookeepersManager; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO; +import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + + +/** + * @author zengqiao + * @date 22/09/19 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "集群ZK-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_PREFIX) +public class ClusterZookeepersController { + @Autowired + private ClusterZookeepersManager clusterZookeepersManager; + + @Autowired + private ZnodeService znodeService; + + @ApiOperation("集群Zookeeper状态信息") + @GetMapping(value = "clusters/{clusterPhyId}/zookeepers-state") + public Result getClusterZookeepersState(@PathVariable Long clusterPhyId) { + return clusterZookeepersManager.getClusterPhyZookeepersState(clusterPhyId); + } + + @ApiOperation("集群Zookeeper信息列表") + @PostMapping(value = "clusters/{clusterPhyId}/zookeepers-overview") + public PaginationResult getClusterZookeepersOverview(@PathVariable Long clusterPhyId, + @RequestBody ClusterZookeepersOverviewDTO dto) { + return clusterZookeepersManager.getClusterPhyZookeepersOverview(clusterPhyId, dto); + } + + @ApiOperation("Zookeeper节点数据") + @GetMapping(value = "clusters/{clusterPhyId}/znode-data") + public Result getClusterZookeeperData(@PathVariable Long clusterPhyId, + @RequestParam String path) { + return clusterZookeepersManager.getZnodeVO(clusterPhyId, path); + } + + @ApiOperation("Zookeeper节点列表") + @GetMapping(value = "clusters/{clusterPhyId}/znode-children") + public Result> getClusterZookeeperChild(@PathVariable Long clusterPhyId, + @RequestParam String path, + @RequestParam(required = false) String keyword) { + return znodeService.listZnodeChildren(clusterPhyId, path, keyword); + } + +} diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/zk/ZookeeperMetricsController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/zk/ZookeeperMetricsController.java new file mode 100644 index 00000000..bb2ea098 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/zk/ZookeeperMetricsController.java @@ -0,0 +1,52 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.zk; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + + +/** + * @author zengqiao + * @date 22/09/19 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "ZKMetrics-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_PREFIX) +public class ZookeeperMetricsController { + private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperMetricsController.class); + + @Autowired + private ZookeeperMetricService zookeeperMetricService; + + @ApiOperation(value = "ZK-最近指标", notes = "") + @PostMapping(value = "clusters/{clusterPhyId}/zookeeper-latest-metrics") + @ResponseBody + public Result getLatestMetrics(@PathVariable Long clusterPhyId, @RequestBody List metricsNames) { + Result metricsResult = zookeeperMetricService.batchCollectMetricsFromZookeeper(clusterPhyId, metricsNames); + if (metricsResult.failed()) { + return Result.buildFromIgnoreData(metricsResult); + } + + return Result.buildSuc(metricsResult.getData()); + } + + @ApiOperation(value = "ZK-多指标历史信息", notes = "多条指标线") + @PostMapping(value = "clusters/{clusterPhyId}/zookeeper-metrics") + @ResponseBody + public Result> getMetricsLine(@PathVariable Long clusterPhyId, @RequestBody MetricDTO dto) { + return zookeeperMetricService.listMetricsFromES(clusterPhyId, dto); + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncZookeeperTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncZookeeperTask.java new file mode 100644 index 00000000..5af37be2 --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncZookeeperTask.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.know.streaming.km.task.metadata; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.List; + + +@Task(name = "SyncZookeeperTask", + description = "ZK信息同步到DB", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class SyncZookeeperTask extends AbstractAsyncMetadataDispatchTask { + private static final ILog log = LogFactory.getLog(SyncZookeeperTask.class); + + @Autowired + private ZookeeperService zookeeperService; + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { + Result> infoResult = zookeeperService.listFromZookeeper( + clusterPhy.getId(), + clusterPhy.getZookeeper(), + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class) + ); + + if (infoResult.failed()) { + return new TaskResult(TaskResult.FAIL_CODE, infoResult.getMessage()); + } + + zookeeperService.batchReplaceDataInDB(clusterPhy.getId(), infoResult.getData()); + + return TaskResult.SUCCESS; + } +} diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ZookeeperMetricCollectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ZookeeperMetricCollectorTask.java new file mode 100644 index 00000000..f533a30a --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metrics/ZookeeperMetricCollectorTask.java @@ -0,0 +1,33 @@ +package com.xiaojukeji.know.streaming.km.task.metrics; + +import com.didiglobal.logi.job.annotation.Task; +import com.didiglobal.logi.job.common.TaskResult; +import com.didiglobal.logi.job.core.consensual.ConsensualEnum; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.collector.metric.ZookeeperMetricCollector; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author didi + */ +@Task(name = "ZookeeperMetricCollectorTask", + description = "Zookeeper指标采集任务", + cron = "0 0/1 * * * ? *", + autoRegister = true, + consensual = ConsensualEnum.BROADCAST, + timeout = 2 * 60) +public class ZookeeperMetricCollectorTask extends AbstractAsyncMetricsDispatchTask { + private static final ILog log = LogFactory.getLog(ZookeeperMetricCollectorTask.class); + + @Autowired + private ZookeeperMetricCollector zookeeperMetricCollector; + + @Override + public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception { + zookeeperMetricCollector.collectMetrics(clusterPhy); + + return TaskResult.SUCCESS; + } +}