Merge pull request #651 from didi/master

合并主分支
This commit is contained in:
EricZeng
2022-10-10 19:10:48 +08:00
committed by GitHub
75 changed files with 3283 additions and 69 deletions

View File

@@ -1 +0,0 @@
## Issue 模板

46
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@@ -0,0 +1,46 @@
---
name: 报告Bug
about: 报告KnowStreaming的相关Bug
title: ''
labels: bug
assignees: ''
---
- [ ] 我已经在 [issues](https://github.com/didi/KnowStreaming/issues) 搜索过相关问题了,并没有重复的。
### 环境信息
* KnowStreaming version : <font size=4 color =red> xxx </font>
* Operating System version : <font size=4 color =red> xxx </font>
* Java version : <font size=4 color =red> xxx </font>
### 重现该问题的步骤
1. xxx
2. xxx
3. xxx
### 预期结果
<!-- 写下应该出现的预期结果?-->
### 实际结果
<!-- 实际发生了什么? -->
---
如果有异常请附上异常Trace:
```
Just put your stack trace here!
```

5
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View File

@@ -0,0 +1,5 @@
blank_issues_enabled: true
contact_links:
- name: KnowStreaming官网
url: https://knowstreaming.com/
about: KnowStreaming website

View File

@@ -0,0 +1,22 @@
---
name: 优化建议
about: 相关功能优化建议
title: ''
labels: Optimization Suggestions
assignees: ''
---
- [ ] 我已经在 [issues](https://github.com/didi/KnowStreaming/issues) 搜索过相关问题了,并没有重复的。
### 环境信息
* KnowStreaming version : <font size=4 color =red> xxx </font>
* Operating System version : <font size=4 color =red> xxx </font>
* Java version : <font size=4 color =red> xxx </font>
### 需要优化的功能点
### 建议如何优化

12
.github/ISSUE_TEMPLATE/discussion.md vendored Normal file
View File

@@ -0,0 +1,12 @@
---
name: 讨论/discussion
about: 开启一个关于KnowStreaming的讨论
title: ''
labels: discussion
assignees: ''
---
## 讨论主题
...

View File

@@ -0,0 +1,15 @@
---
name: 提议新功能/需求
about: 给KnowStreaming提一个功能需求
title: ''
labels: feature
assignees: ''
---
- [ ] 我在 [issues](https://github.com/didi/KnowStreaming/issues) 中并未搜索到与此相关的功能需求。
- [ ] 我在 [release notes] (https://github.com/didi/KnowStreaming/releases)已经发布的版本中并没有搜到相关功能.
## 这里描述需求
<!--请尽可能的描述清楚您的需求 -->

12
.github/ISSUE_TEMPLATE/question.md vendored Normal file
View File

@@ -0,0 +1,12 @@
---
name: 提个问题
about: 问KnowStreaming相关问题
title: ''
labels: question
assignees: ''
---
- [ ] 我已经在 [issues](https://github.com/didi/KnowStreaming/issues) 搜索过相关问题了,并没有重复的。
## 在这里提出你的问题

22
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View File

@@ -0,0 +1,22 @@
请不要在没有先创建Issue的情况下创建Pull Request。
## 变更的目的是什么
XXXXX
## 简短的更新日志
XX
## 验证这一变化
XXXX
请遵循此清单,以帮助我们快速轻松地整合您的贡献:
* [ ] 确保有针对更改提交的 Github issue通常在您开始处理之前。诸如拼写错误之类的琐碎更改不需要 Github issue。您的Pull Request应该只解决这个问题而不需要进行其他更改—— 一个 PR 解决一个问题。
* [ ] 格式化 Pull Request 标题,如[ISSUE #123] support Confluent Schema Registry。 Pull Request 中的每个提交都应该有一个有意义的主题行和正文。
* [ ] 编写足够详细的Pull Request描述以了解Pull Request的作用、方式和原因。
* [ ] 编写必要的单元测试来验证您的逻辑更正。如果提交了新功能或重大更改请记住在test 模块中添加 integration-test
* [ ] 确保编译通过,集成测试通过

74
CODE_OF_CONDUCT.md Normal file
View File

@@ -0,0 +1,74 @@
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to making participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, gender identity and expression, level of experience,
education, socio-economic status, nationality, personal appearance, race,
religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment
include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community. Examples of
representing a project or community include using an official project e-mail
address, posting via an official social media account, or acting as an appointed
representative at an online or offline event. Representation of a project may be
further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at shirenchuang@didiglobal.com . All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
[homepage]: https://www.contributor-covenant.org

View File

@@ -1,28 +1,150 @@
# Contribution Guideline
Thanks for considering to contribute this project. All issues and pull requests are highly appreciated.
## Pull Requests
Before sending pull request to this project, please read and follow guidelines below.
# 为KnowStreaming做贡献
1. Branch: We only accept pull request on `dev` branch.
2. Coding style: Follow the coding style used in LogiKM.
3. Commit message: Use English and be aware of your spell.
4. Test: Make sure to test your code.
Add device mode, API version, related log, screenshots and other related information in your pull request if possible.
欢迎👏🏻来到KnowStreaming本文档是关于如何为KnowStreaming做出贡献的指南。
NOTE: We assume all your contribution can be licensed under the [AGPL-3.0](LICENSE).
如果您发现不正确或遗漏的内容, 请留下意见/建议。
## Issues
## 行为守则
请务必阅读并遵守我们的 [行为准则](./CODE_OF_CONDUCT.md).
We love clearly described issues. :)
Following information can help us to resolve the issue faster.
* Device mode and hardware information.
* API version.
* Logs.
* Screenshots.
* Steps to reproduce the issue.
## 贡献
**KnowStreaming** 欢迎任何角色的新参与者,包括 **User** 、**Contributor**、**Committer**、**PMC** 。
我们鼓励新人积极加入 **KnowStreaming** 项目从User到Contributor、Committer ,甚至是 PMC 角色。
为了做到这一点,新人需要积极地为 **KnowStreaming** 项目做出贡献。以下介绍如何对 **KnowStreaming** 进行贡献。
### 创建/打开 Issue
如果您在文档中发现拼写错误、在代码中**发现错误**或想要**新功能**或想要**提供建议**,您可以在 GitHub 上[创建一个Issue](https://github.com/didi/KnowStreaming/issues/new/choose) 进行报告。
如果您想直接贡献, 您可以选择下面标签的问题。
- [contribution welcome](https://github.com/didi/KnowStreaming/labels/contribution%20welcome) : 非常需要解决/新增 的Issues
- [good first issue](https://github.com/didi/KnowStreaming/labels/good%20first%20issue): 对新人比较友好, 新人可以拿这个Issue来练练手热热身。
<font color=red ><b> 请注意,任何 PR 都必须与有效issue相关联。否则PR 将被拒绝。</b></font>
### 开始你的贡献
**分支介绍**
我们将 `dev`分支作为开发分支, 说明这是一个不稳定的分支。
此外,我们的分支模型符合 [https://nvie.com/posts/a-successful-git-branching-model/](https://nvie.com/posts/a-successful-git-branching-model/). 我们强烈建议新人在创建PR之前先阅读上述文章。
**贡献流程**
为方便描述,我们这里定义一下2个名词
自己Fork出来的仓库是私人仓库, 我们这里称之为 **分叉仓库**
Fork的源项目,我们称之为:**源仓库**
现在如果您准备好创建PR, 以下是贡献者的工作流程:
1. Fork [KnowStreaming](https://github.com/didi/KnowStreaming) 项目到自己的仓库
2. 从源仓库的`dev`拉取并创建自己的本地分支,例如: `dev`
3. 在本地分支上对代码进行修改
4. Rebase 开发分支, 并解决冲突
5. commit 并 push 您的更改到您自己的**分叉仓库**
6. 创建一个 Pull Request 到**源仓库**的`dev`分支中。
7. 等待回复。如果回复的慢,请无情的催促。
更为详细的贡献流程请看:[贡献流程](./docs/contributer_guide/贡献流程.md)
创建Pull Request时
1. 请遵循 PR的 [模板](./.github/PULL_REQUEST_TEMPLATE.md)
2. 请确保 PR 有相应的issue。
3. 如果您的 PR 包含较大的更改,例如组件重构或新组件,请编写有关其设计和使用的详细文档(在对应的issue中)。
4. 注意单个 PR 不能太大。如果需要进行大量更改,最好将更改分成几个单独的 PR。
5. 在合并PR之前尽量的将最终的提交信息清晰简洁, 将多次修改的提交尽可能的合并为一次提交。
6. 创建 PR 后将为PR分配一个或多个reviewers。
<font color=red><b>如果您的 PR 包含较大的更改,例如组件重构或新组件,请编写有关其设计和使用的详细文档。</b></font>
# 代码审查指南
Commiter将轮流review代码以确保在合并前至少有一名Commiter
一些原则:
- 可读性——重要的代码应该有详细的文档。API 应该有 Javadoc。代码风格应与现有风格保持一致。
- 优雅:新的函数、类或组件应该设计得很好。
- 可测试性——单元测试用例应该覆盖 80% 的新代码。
- 可维护性 - 遵守我们的编码规范。
# 开发者
## 成为Contributor
只要成功提交并合并PR , 则为Contributor
贡献者名单请看:[贡献者名单](./docs/contributer_guide/开发者名单.md)
## 尝试成为Commiter
一般来说, 贡献8个重要的补丁并至少让三个不同的人来Review他们(您需要3个Commiter的支持)。
然后请人给你提名, 您需要展示您的
1. 至少8个重要的PR和项目的相关问题
2. 与团队合作的能力
3. 了解项目的代码库和编码风格
4. 编写好代码的能力
当前的Commiter可以通过在KnowStreaming中的Issue标签 `nomination`(提名)来提名您
1. 你的名字和姓氏
2. 指向您的Git个人资料的链接
3. 解释为什么你应该成为Commiter
4. 详细说明提名人与您合作的3个PR以及相关问题,这些问题可以证明您的能力。
另外2个Commiter需要支持您的**提名**如果5个工作日内没有人反对您就是提交者,如果有人反对或者想要更多的信息Commiter会讨论并通常达成共识(5个工作日内) 。
# 开源奖励计划
我们非常欢迎开发者们为KnowStreaming开源项目贡献一份力量相应也将给予贡献者激励以表认可与感谢。
## 参与贡献
1. 积极参与 Issue 的讨论如答疑解惑、提供想法或报告无法解决的错误Issue
2. 撰写和改进项目的文档Wiki
3. 提交补丁优化代码Coding
## 你将获得
1. 加入KnowStreaming开源项目贡献者名单并展示
2. KnowStreaming开源贡献者证书(纸质&电子版)
3. KnowStreaming贡献者精美大礼包(KnowStreamin/滴滴 周边)
## 相关规则
- Contributer和Commiter都会有对应的证书和对应的礼包
- 每季度有KnowStreaming项目团队评选出杰出贡献者,颁发相应证书。
- 年末进行年度评选
贡献者名单请看:[贡献者名单](./docs/contributer_guide/开发者名单.md)

View File

@@ -101,7 +101,7 @@
点击 [这里](CONTRIBUTING.md),了解如何成为 Know Streaming 的贡献者
获取KnowStreaming开源社区证书。
## 加入技术交流群
@@ -134,6 +134,11 @@ PS: 提问请尽量把问题一次性描述清楚,并告知环境信息情况
微信加群:添加`mike_zhangliang``PenceXie`的微信号备注KnowStreaming加群。
<br/>
加群之前有劳点一下 star一个小小的 star 是对KnowStreaming作者们努力建设社区的动力。
感谢感谢!!!
<img width="116" alt="wx" src="https://user-images.githubusercontent.com/71620349/192257217-c4ebc16c-3ad9-485d-a914-5911d3a4f46b.png">
## Star History

View File

@@ -0,0 +1 @@
TODO.

View File

@@ -0,0 +1,43 @@
开源贡献者证书发放名单(定期更新)
贡献者名单请看:[贡献者名单]()
|姓名|Github|角色|发放日期|
|--|--|--|--|
|张亮 | [@zhangliangboy](https://github.com/zhangliangboy)|||
|谢鹏|[@PenceXie](https://github.com/PenceXie)|||
|石臻臻 | [@shirenchuang](https://github.com/shirenchuang)|||
|周宇航|[@GraceWalk](https://github.com/GraceWalk)|||
|曾巧|[@ZQKC](https://github.com/ZQKC)|||
|赵寅锐|[@ZHAOYINRUI](https://github.com/ZHAOYINRUI)|||
|王东方|[@wangdongfang-aden](https://github.com/wangdongfang-aden)|||
|haoqi123|[@[haoqi123]](https://github.com/haoqi123)|||
|17hao|[@17hao](https://github.com/17hao)|||
|Huyueeer|[@Huyueeer](https://github.com/Huyueeer)|||
|杨光|[@yaangvipguang](https://github.com/yangvipguang)|
|王亚聪|[@wangyacongi](https://github.com/wangyacongi)|
|WYAOBO|[@WYAOBO](https://github.com/WYAOBO)
| Super .Wein星痕| [@superspeedone](https://github.com/superspeedone)|||
| Yang Jing| [@yangbajing](https://github.com/yangbajing)|||
| 刘新元 Liu XinYuan| [@Liu-XinYuan](https://github.com/Liu-XinYuan)|||
|Joker | [@LiubeyJokerQueue](https://github.com/JokerQueue)|||
|Eason Lau | [@Liubey](https://github.com/Liubey)|||
| hailanxin| [@hailanxin](https://github.com/hailanxin)|||
| Qi Zhang| [@zzzhangqi](https://github.com/zzzhangqi)|||
|Hongten | [@Hongten](https://github.com/Hongten)|||
|fengxsong | [@fengxsong](https://github.com/fengxsong)|||
|f1558 | [@f1558](https://github.com/f1558)|||
| 谢晓东| [@Strangevy](https://github.com/Strangevy)|||
| ZhaoXinlong| [@ZhaoXinlong](https://github.com/ZhaoXinlong)|||
|xuehaipeng | [@xuehaipeng](https://github.com/xuehaipeng)|||
|mrazkong | [@mrazkong](https://github.com/mrazkong)|||
|xuzhengxi | [@hyper-xx)](https://github.com/hyper-xx)|||
|pierre xiong | [@pierre94](https://github.com/pierre94)|||

View File

@@ -0,0 +1,121 @@
### 贡献流程
[贡献源码细则](../CONTRIBUTING.md)
#### 1. fork didi/KnowStreaming项目到您的github库
找到你要Fork的项目,例如 [KnowStreaming](https://github.com/didi/KnowStreaming) ,点击Fork按钮。
![在这里插入图片描述](https://img-blog.csdnimg.cn/ac7bfef9ccde49d587c30e702a615ef5.png)
#### 2. 克隆或下载您fork的Nacos代码仓库到您本地
```sh
git clone { your fork knowstreaming repo address }
cd KnowStreaming
```
#### 3. 添加 didi/KnowStreaming仓库为upstream仓库
```sh
### 添加源仓库
git remote add upstream https://github.com/didi/KnowStreaming
### 查看是否添加成功
git remote -v
origin ${your fork KnowStreaming repo address} (fetch)
origin ${your fork KnowStreaming repo address} (push)
upstream https://github.com/didi/KnowStreaming(fetch)
upstream https://github.com/didi/KnowStreaming (push)
### 获取源仓库的基本信息
git fetch origin
git fetch upstream
```
上面是将didi/KnowStreaming添加为远程仓库, 当前就会有2个远程仓库
1. origin 你Fork出来的分叉仓库
2. upstream 源仓库
git fetch 获取远程仓库的基本信息, 比如 **源仓库**的所有分支就获取到了
#### 4. 同步源仓库开发分支到本地分叉仓库中
一般开源项目都会有一个给贡献者提交代码的分支,例如 KnowStreaming的分支是 `dev`
首先我们要将 **源仓库**的开发分支(`dev`) 拉取到本地仓库中
```sh
git checkout -b dev upstream/dev
```
**或者IDEA的形式创建**
![在这里插入图片描述](https://img-blog.csdnimg.cn/c95f2601a9af41889a5fc20b2a9724a5.png)
#### 5. 在本地新建的开发分支上进行修改
首先请保证您阅读并正确设置KnowStreaming code style, 相关内容请阅读[KnowStreaming 代码规约 ]()。
修改时请保证该分支上的修改仅和issue相关并尽量细化做到
<font color=red><b>一个分支只修改一件事一个PR只修改一件事</b></font>
同时,您的提交记录请尽量描述清楚,主要以谓 + 宾进行描述Fix xxx problem/bug。少量简单的提交可以使用For xxx来描述For codestyle。 如果该提交和某个ISSUE相关可以添加ISSUE号作为前缀For #10000, Fix xxx problem/bug。
#### 6. Rebase 基础分支和开发分支
您修改的时候可能别人的修改已经提交并被合并此时可能会有冲突这里请使用rebase命令进行合并解决主要有2个好处
1. 您的提交记录将会非常优雅不会出现Merge xxxx branch 等字样
2. rebase后您分支的提交日志也是一条单链基本不会出现各种分支交错的情况回查时更轻松
```sh
git fetch upstream
git rebase -i upstream/dev
```
**或者在IDEA的操作如下**
![在这里插入图片描述](https://img-blog.csdnimg.cn/d75addcfa9564d3d9e1d226a2f7f4d64.png)
选择 源仓库的开发分支
![在这里插入图片描述](https://img-blog.csdnimg.cn/4e85714df13b44bcb10f1e655450cb72.png)
推荐使用IDEA的方式, 有冲突的时候更容易解决冲突问题。
#### 7. 将您开发完成rebase后的分支上传到您fork的仓库
```sh
git push origin dev
```
#### 8. 按照PR模板中的清单创建Pull Request
![在这里插入图片描述](https://img-blog.csdnimg.cn/1dab060aed314666970e3910e05f2205.png)
选择自己的分支合并到模板分支。
#### 9. 等待合并代码
提交了PR之后需要等待PMC、Commiter 来Review代码如果有问题需要配合修改重新提交。
如果没有问题会直接合并到开发分支`dev`中。
注: 如果长时间没有review, 则可以多催促社区来Review代码

View File

@@ -7,7 +7,22 @@
### 6.2.0、升级至 `master` 版本
暂无
```sql
DROP TABLE IF EXISTS `ks_km_zookeeper`;
CREATE TABLE `ks_km_zookeeper` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '物理集群ID',
`host` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper主机名',
`port` int(16) NOT NULL DEFAULT '-1' COMMENT 'zookeeper端口',
`role` int(16) NOT NULL DEFAULT '-1' COMMENT '角色, leader follower observer',
`version` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper版本',
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活0未存活11存活但是4字命令使用不了',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_cluster_phy_id_host_port` (`cluster_phy_id`,`host`, `port`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Zookeeper信息表';
```
### 6.2.1、升级至 `v3.0.0` 版本

View File

@@ -0,0 +1,19 @@
package com.xiaojukeji.know.streaming.km.biz.cluster;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO;
/**
* 多集群总体状态
*/
public interface ClusterZookeepersManager {
Result<ClusterZookeepersStateVO> getClusterPhyZookeepersState(Long clusterPhyId);
PaginationResult<ClusterZookeepersOverviewVO> getClusterPhyZookeepersOverview(Long clusterPhyId, ClusterZookeepersOverviewDTO dto);
Result<ZnodeVO> getZnodeVO(Long clusterPhyId, String path);
}

View File

@@ -0,0 +1,137 @@
package com.xiaojukeji.know.streaming.km.biz.cluster.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterZookeepersManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class ClusterZookeepersManagerImpl implements ClusterZookeepersManager {
private static final ILog LOGGER = LogFactory.getLog(ClusterZookeepersManagerImpl.class);
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private ZookeeperService zookeeperService;
@Autowired
private ZookeeperMetricService zookeeperMetricService;
@Autowired
private ZnodeService znodeService;
@Override
public Result<ClusterZookeepersStateVO> getClusterPhyZookeepersState(Long clusterPhyId) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
}
// // TODO
// private Integer healthState;
// private Integer healthCheckPassed;
// private Integer healthCheckTotal;
List<ZookeeperInfo> infoList = zookeeperService.listFromDBByCluster(clusterPhyId);
ClusterZookeepersStateVO vo = new ClusterZookeepersStateVO();
vo.setTotalServerCount(infoList.size());
vo.setAliveFollowerCount(0);
vo.setTotalFollowerCount(0);
vo.setAliveObserverCount(0);
vo.setTotalObserverCount(0);
vo.setAliveServerCount(0);
for (ZookeeperInfo info: infoList) {
if (info.getRole().equals(ZKRoleEnum.LEADER.getRole())) {
vo.setLeaderNode(info.getHost());
}
if (info.getRole().equals(ZKRoleEnum.FOLLOWER.getRole())) {
vo.setTotalFollowerCount(vo.getTotalFollowerCount() + 1);
vo.setAliveFollowerCount(info.alive()? vo.getAliveFollowerCount() + 1: vo.getAliveFollowerCount());
}
if (info.getRole().equals(ZKRoleEnum.OBSERVER.getRole())) {
vo.setTotalObserverCount(vo.getTotalObserverCount() + 1);
vo.setAliveObserverCount(info.alive()? vo.getAliveObserverCount() + 1: vo.getAliveObserverCount());
}
if (info.alive()) {
vo.setAliveServerCount(vo.getAliveServerCount() + 1);
}
}
Result<ZookeeperMetrics> metricsResult = zookeeperMetricService.collectMetricsFromZookeeper(new ZookeeperMetricParam(
clusterPhyId,
infoList.stream().filter(elem -> elem.alive()).map(item -> new Tuple<String, Integer>(item.getHost(), item.getPort())).collect(Collectors.toList()),
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class),
ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT
));
if (metricsResult.failed()) {
LOGGER.error(
"class=ClusterZookeepersManagerImpl||method=getClusterPhyZookeepersState||clusterPhyId={}||errMsg={}",
clusterPhyId, metricsResult.getMessage()
);
return Result.buildSuc(vo);
}
Float watchCount = metricsResult.getData().getMetric(ZookeeperMetricVersionItems.ZOOKEEPER_METRIC_WATCH_COUNT);
vo.setWatchCount(watchCount != null? watchCount.intValue(): null);
return Result.buildSuc(vo);
}
@Override
public PaginationResult<ClusterZookeepersOverviewVO> getClusterPhyZookeepersOverview(Long clusterPhyId, ClusterZookeepersOverviewDTO dto) {
//获取集群zookeeper列表
List<ClusterZookeepersOverviewVO> clusterZookeepersOverviewVOList = ConvertUtil.list2List(zookeeperService.listFromDBByCluster(clusterPhyId), ClusterZookeepersOverviewVO.class);
//搜索
clusterZookeepersOverviewVOList = PaginationUtil.pageByFuzzyFilter(clusterZookeepersOverviewVOList, dto.getSearchKeywords(), Arrays.asList("host"));
//分页
PaginationResult<ClusterZookeepersOverviewVO> paginationResult = PaginationUtil.pageBySubData(clusterZookeepersOverviewVOList, dto);
return paginationResult;
}
@Override
public Result<ZnodeVO> getZnodeVO(Long clusterPhyId, String path) {
Result<Znode> result = znodeService.getZnode(clusterPhyId, path);
if (result.failed()) {
return Result.buildFromIgnoreData(result);
}
return Result.buildSuc(ConvertUtil.obj2ObjByJSON(result.getData(), ZnodeVO.class));
}
/**************************************************** private method ****************************************************/
}

View File

@@ -0,0 +1,122 @@
package com.xiaojukeji.know.streaming.km.collector.metric;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ZookeeperMetricEvent;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO;
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
import com.xiaojukeji.know.streaming.km.core.service.version.VersionControlService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_ZOOKEEPER;
/**
* @author didi
*/
@Component
public class ZookeeperMetricCollector extends AbstractMetricCollector<ZookeeperMetricPO> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
@Autowired
private VersionControlService versionControlService;
@Autowired
private ZookeeperMetricService zookeeperMetricService;
@Autowired
private ZookeeperService zookeeperService;
@Autowired
private KafkaControllerService kafkaControllerService;
@Override
public void collectMetrics(ClusterPhy clusterPhy) {
Long startTime = System.currentTimeMillis();
Long clusterPhyId = clusterPhy.getId();
List<VersionControlItem> items = versionControlService.listVersionControlItem(clusterPhyId, collectorType().getCode());
List<ZookeeperInfo> aliveZKList = zookeeperService.listFromDBByCluster(clusterPhyId)
.stream()
.filter(elem -> Constant.ALIVE.equals(elem.getStatus()))
.collect(Collectors.toList());
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);
ZookeeperMetrics metrics = ZookeeperMetrics.initWithMetric(clusterPhyId, Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (float)Constant.INVALID_CODE);
if (ValidateUtils.isEmptyList(aliveZKList)) {
// 没有存活的ZK时发布事件然后直接返回
publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics)));
return;
}
// 构造参数
ZookeeperMetricParam param = new ZookeeperMetricParam(
clusterPhyId,
aliveZKList.stream().map(elem -> new Tuple<String, Integer>(elem.getHost(), elem.getPort())).collect(Collectors.toList()),
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class),
kafkaController == null? Constant.INVALID_CODE: kafkaController.getBrokerId(),
null
);
for(VersionControlItem v : items) {
try {
if(null != metrics.getMetrics().get(v.getName())) {
continue;
}
param.setMetricName(v.getName());
Result<ZookeeperMetrics> ret = zookeeperMetricService.collectMetricsFromZookeeper(param);
if(null == ret || ret.failed() || null == ret.getData()){
continue;
}
metrics.putMetric(ret.getData().getMetrics());
if(!EnvUtil.isOnline()){
LOGGER.info(
"class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||metricValue={}",
clusterPhyId, v.getName(), ConvertUtil.obj2Json(ret.getData().getMetrics())
);
}
} catch (Exception e){
LOGGER.error(
"class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||metricName={}||errMsg=exception!",
clusterPhyId, v.getName(), e
);
}
}
metrics.putMetric(Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME, (System.currentTimeMillis() - startTime) / 1000.0f);
publishMetric(new ZookeeperMetricEvent(this, Arrays.asList(metrics)));
LOGGER.info(
"class=ZookeeperMetricCollector||method=collectMetrics||clusterPhyId={}||startTime={}||costTime={}||msg=msg=collect finished.",
clusterPhyId, startTime, System.currentTimeMillis() - startTime
);
}
@Override
public VersionItemTypeEnum collectorType() {
return METRIC_ZOOKEEPER;
}
}

View File

@@ -0,0 +1,28 @@
package com.xiaojukeji.know.streaming.km.collector.sink;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.bean.event.metric.ZookeeperMetricEvent;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX;
@Component
public class ZookeeperMetricESSender extends AbstractMetricESSender implements ApplicationListener<ZookeeperMetricEvent> {
protected static final ILog LOGGER = LogFactory.getLog("METRIC_LOGGER");
@PostConstruct
public void init(){
LOGGER.info("class=ZookeeperMetricESSender||method=init||msg=init finished");
}
@Override
public void onApplicationEvent(ZookeeperMetricEvent event) {
send2es(ZOOKEEPER_INDEX, ConvertUtil.list2List(event.getZookeeperMetrics(), ZookeeperMetricPO.class));
}
}

View File

@@ -0,0 +1,13 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.cluster;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import lombok.Data;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
public class ClusterZookeepersOverviewDTO extends PaginationBaseDTO {
}

View File

@@ -1,8 +1,8 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.config;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.util.Properties;
@@ -11,7 +11,6 @@ import java.util.Properties;
* @author zengqiao
* @date 22/02/24
*/
@Data
@ApiModel(description = "ZK配置")
public class ZKConfig implements Serializable {
@ApiModelProperty(value="ZK的jmx配置")
@@ -21,11 +20,51 @@ public class ZKConfig implements Serializable {
private Boolean openSecure = false;
@ApiModelProperty(value="ZK的Session超时时间", example = "15000")
private Long sessionTimeoutUnitMs = 15000L;
private Integer sessionTimeoutUnitMs = 15000;
@ApiModelProperty(value="ZK的Request超时时间", example = "5000")
private Long requestTimeoutUnitMs = 5000L;
private Integer requestTimeoutUnitMs = 5000;
@ApiModelProperty(value="ZK的Request超时时间")
private Properties otherProps = new Properties();
public JmxConfig getJmxConfig() {
return jmxConfig == null? new JmxConfig(): jmxConfig;
}
public void setJmxConfig(JmxConfig jmxConfig) {
this.jmxConfig = jmxConfig;
}
public Boolean getOpenSecure() {
return openSecure != null && openSecure;
}
public void setOpenSecure(Boolean openSecure) {
this.openSecure = openSecure;
}
public Integer getSessionTimeoutUnitMs() {
return sessionTimeoutUnitMs == null? Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS: sessionTimeoutUnitMs;
}
public void setSessionTimeoutUnitMs(Integer sessionTimeoutUnitMs) {
this.sessionTimeoutUnitMs = sessionTimeoutUnitMs;
}
public Integer getRequestTimeoutUnitMs() {
return requestTimeoutUnitMs == null? Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS: requestTimeoutUnitMs;
}
public void setRequestTimeoutUnitMs(Integer requestTimeoutUnitMs) {
this.requestTimeoutUnitMs = requestTimeoutUnitMs;
}
public Properties getOtherProps() {
return otherProps == null? new Properties() : otherProps;
}
public void setOtherProps(Properties otherProps) {
this.otherProps = otherProps;
}
}

View File

@@ -0,0 +1,28 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics;
import lombok.Data;
import lombok.ToString;
/**
* @author zengqiao
* @date 20/6/17
*/
@Data
@ToString
public class ZookeeperMetrics extends BaseMetrics {
public ZookeeperMetrics(Long clusterPhyId) {
super(clusterPhyId);
}
public static ZookeeperMetrics initWithMetric(Long clusterPhyId, String metric, Float value) {
ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId);
metrics.setClusterPhyId( clusterPhyId );
metrics.putMetric(metric, value);
return metrics;
}
@Override
public String unique() {
return "ZK@" + clusterPhyId;
}
}

View File

@@ -0,0 +1,47 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @author didi
*/
@Data
@NoArgsConstructor
public class ZookeeperMetricParam extends MetricParam {
private Long clusterPhyId;
private List<Tuple<String, Integer>> zkAddressList;
private ZKConfig zkConfig;
private String metricName;
private Integer kafkaControllerId;
public ZookeeperMetricParam(Long clusterPhyId,
List<Tuple<String, Integer>> zkAddressList,
ZKConfig zkConfig,
String metricName) {
this.clusterPhyId = clusterPhyId;
this.zkAddressList = zkAddressList;
this.zkConfig = zkConfig;
this.metricName = metricName;
}
public ZookeeperMetricParam(Long clusterPhyId,
List<Tuple<String, Integer>> zkAddressList,
ZKConfig zkConfig,
Integer kafkaControllerId,
String metricName) {
this.clusterPhyId = clusterPhyId;
this.zkAddressList = zkAddressList;
this.zkConfig = zkConfig;
this.kafkaControllerId = kafkaControllerId;
this.metricName = metricName;
}
}

View File

@@ -56,6 +56,7 @@ public enum ResultStatus {
KAFKA_OPERATE_FAILED(8010, "Kafka操作失败"),
MYSQL_OPERATE_FAILED(8020, "MySQL操作失败"),
ZK_OPERATE_FAILED(8030, "ZK操作失败"),
ZK_FOUR_LETTER_CMD_FORBIDDEN(8031, "ZK四字命令被禁止"),
ES_OPERATE_ERROR(8040, "ES操作失败"),
HTTP_REQ_ERROR(8050, "第三方http请求异常"),

View File

@@ -23,6 +23,8 @@ public class VersionMetricControlItem extends VersionControlItem{
public static final String CATEGORY_PERFORMANCE = "Performance";
public static final String CATEGORY_FLOW = "Flow";
public static final String CATEGORY_CLIENT = "Client";
/**
* 指标单位名称,非指标的没有
*/

View File

@@ -0,0 +1,19 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.zookeeper.data.Stat;
@Data
public class Znode {
@ApiModelProperty(value = "节点名称", example = "broker")
private String name;
@ApiModelProperty(value = "节点数据", example = "saassad")
private String data;
@ApiModelProperty(value = "节点属性", example = "")
private Stat stat;
}

View File

@@ -0,0 +1,42 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper;
import com.xiaojukeji.know.streaming.km.common.bean.entity.BaseEntity;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import lombok.Data;
@Data
public class ZookeeperInfo extends BaseEntity {
/**
* 集群Id
*/
private Long clusterPhyId;
/**
* 主机
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 角色
*/
private String role;
/**
* 版本
*/
private String version;
/**
* ZK状态
*/
private Integer status;
public boolean alive() {
return !(Constant.DOWN.equals(status));
}
}

View File

@@ -0,0 +1,9 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
import java.io.Serializable;
/**
* 四字命令结果数据的基础类
*/
public class BaseFourLetterWordCmdData implements Serializable {
}

View File

@@ -0,0 +1,38 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
import lombok.Data;
/**
* clientPort=2183
* dataDir=/data1/data/zkData2/version-2
* dataLogDir=/data1/data/zkLog2/version-2
* tickTime=2000
* maxClientCnxns=60
* minSessionTimeout=4000
* maxSessionTimeout=40000
* serverId=2
* initLimit=15
* syncLimit=10
* electionAlg=3
* electionPort=4445
* quorumPort=4444
* peerType=0
*/
@Data
public class ConfigCmdData extends BaseFourLetterWordCmdData {
private Long clientPort;
private String dataDir;
private String dataLogDir;
private Long tickTime;
private Long maxClientCnxns;
private Long minSessionTimeout;
private Long maxSessionTimeout;
private Integer serverId;
private String initLimit;
private Long syncLimit;
private Long electionAlg;
private Long electionPort;
private Long quorumPort;
private Long peerType;
}

View File

@@ -0,0 +1,39 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
import lombok.Data;
/**
* zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT
* zk_avg_latency 0
* zk_max_latency 399
* zk_min_latency 0
* zk_packets_received 234857
* zk_packets_sent 234860
* zk_num_alive_connections 4
* zk_outstanding_requests 0
* zk_server_state follower
* zk_znode_count 35566
* zk_watch_count 39
* zk_ephemerals_count 10
* zk_approximate_data_size 3356708
* zk_open_file_descriptor_count 35
* zk_max_file_descriptor_count 819200
*/
@Data
public class MonitorCmdData extends BaseFourLetterWordCmdData {
private String zkVersion;
private Long zkAvgLatency;
private Long zkMaxLatency;
private Long zkMinLatency;
private Long zkPacketsReceived;
private Long zkPacketsSent;
private Long zkNumAliveConnections;
private Long zkOutstandingRequests;
private String zkServerState;
private Long zkZnodeCount;
private Long zkWatchCount;
private Long zkEphemeralsCount;
private Long zkApproximateDataSize;
private Long zkOpenFileDescriptorCount;
private Long zkMaxFileDescriptorCount;
}

View File

@@ -0,0 +1,30 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword;
import lombok.Data;
/**
* Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
* Latency min/avg/max: 0/0/2209
* Received: 278202469
* Sent: 279449055
* Connections: 31
* Outstanding: 0
* Zxid: 0x20033fc12
* Mode: leader
* Node count: 10084
* Proposal sizes last/min/max: 36/32/31260 leader特有
*/
@Data
public class ServerCmdData extends BaseFourLetterWordCmdData {
private String zkVersion;
private Long zkAvgLatency;
private Long zkMaxLatency;
private Long zkMinLatency;
private Long zkPacketsReceived;
private Long zkPacketsSent;
private Long zkNumAliveConnections;
private Long zkOutstandingRequests;
private String zkServerState;
private Long zkZnodeCount;
private Long zkZxid;
}

View File

@@ -0,0 +1,116 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ConfigCmdData;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* clientPort=2183
* dataDir=/data1/data/zkData2/version-2
* dataLogDir=/data1/data/zkLog2/version-2
* tickTime=2000
* maxClientCnxns=60
* minSessionTimeout=4000
* maxSessionTimeout=40000
* serverId=2
* initLimit=15
* syncLimit=10
* electionAlg=3
* electionPort=4445
* quorumPort=4444
* peerType=0
*/
@Data
public class ConfigCmdDataParser implements FourLetterWordDataParser<ConfigCmdData> {
private static final ILog LOGGER = LogFactory.getLog(ConfigCmdDataParser.class);
private Result<ConfigCmdData> dataResult = null;
@Override
public String getCmd() {
return FourLetterWordUtil.ConfigCmd;
}
@Override
public ConfigCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
Map<String, String> dataMap = new HashMap<>();
for (String elem : cmdData.split("\n")) {
if (elem.isEmpty()) {
continue;
}
int idx = elem.indexOf('=');
if (idx >= 0) {
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
}
}
ConfigCmdData configCmdData = new ConfigCmdData();
dataMap.entrySet().stream().forEach(elem -> {
try {
switch (elem.getKey()) {
case "clientPort":
configCmdData.setClientPort(Long.valueOf(elem.getValue()));
break;
case "dataDir":
configCmdData.setDataDir(elem.getValue());
break;
case "dataLogDir":
configCmdData.setDataLogDir(elem.getValue());
break;
case "tickTime":
configCmdData.setTickTime(Long.valueOf(elem.getValue()));
break;
case "maxClientCnxns":
configCmdData.setMaxClientCnxns(Long.valueOf(elem.getValue()));
break;
case "minSessionTimeout":
configCmdData.setMinSessionTimeout(Long.valueOf(elem.getValue()));
break;
case "maxSessionTimeout":
configCmdData.setMaxSessionTimeout(Long.valueOf(elem.getValue()));
break;
case "serverId":
configCmdData.setServerId(Integer.valueOf(elem.getValue()));
break;
case "initLimit":
configCmdData.setInitLimit(elem.getValue());
break;
case "syncLimit":
configCmdData.setSyncLimit(Long.valueOf(elem.getValue()));
break;
case "electionAlg":
configCmdData.setElectionAlg(Long.valueOf(elem.getValue()));
break;
case "electionPort":
configCmdData.setElectionPort(Long.valueOf(elem.getValue()));
break;
case "quorumPort":
configCmdData.setQuorumPort(Long.valueOf(elem.getValue()));
break;
case "peerType":
configCmdData.setPeerType(Long.valueOf(elem.getValue()));
break;
default:
LOGGER.warn(
"class=ConfigCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
elem.getKey(), elem.getValue()
);
}
} catch (Exception e) {
LOGGER.error(
"class=ConfigCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
);
}
});
return configCmdData;
}
}

View File

@@ -0,0 +1,10 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
/**
* 四字命令结果解析类
*/
public interface FourLetterWordDataParser<T> {
String getCmd();
T parseAndInitData(Long clusterPhyId, String host, int port, String cmdData);
}

View File

@@ -0,0 +1,117 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* zk_version 3.4.6-1569965, built on 02/20/2014 09:09 GMT
* zk_avg_latency 0
* zk_max_latency 399
* zk_min_latency 0
* zk_packets_received 234857
* zk_packets_sent 234860
* zk_num_alive_connections 4
* zk_outstanding_requests 0
* zk_server_state follower
* zk_znode_count 35566
* zk_watch_count 39
* zk_ephemerals_count 10
* zk_approximate_data_size 3356708
* zk_open_file_descriptor_count 35
* zk_max_file_descriptor_count 819200
*/
@Data
public class MonitorCmdDataParser implements FourLetterWordDataParser<MonitorCmdData> {
private static final ILog LOGGER = LogFactory.getLog(MonitorCmdDataParser.class);
@Override
public String getCmd() {
return FourLetterWordUtil.MonitorCmd;
}
@Override
public MonitorCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
Map<String, String> dataMap = new HashMap<>();
for (String elem : cmdData.split("\n")) {
if (elem.isEmpty()) {
continue;
}
int idx = elem.indexOf('\t');
if (idx >= 0) {
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
}
}
MonitorCmdData monitorCmdData = new MonitorCmdData();
dataMap.entrySet().stream().forEach(elem -> {
try {
switch (elem.getKey()) {
case "zk_version":
monitorCmdData.setZkVersion(elem.getValue().split("-")[0]);
break;
case "zk_avg_latency":
monitorCmdData.setZkAvgLatency(Long.valueOf(elem.getValue()));
break;
case "zk_max_latency":
monitorCmdData.setZkMaxLatency(Long.valueOf(elem.getValue()));
break;
case "zk_min_latency":
monitorCmdData.setZkMinLatency(Long.valueOf(elem.getValue()));
break;
case "zk_packets_received":
monitorCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
break;
case "zk_packets_sent":
monitorCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
break;
case "zk_num_alive_connections":
monitorCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
break;
case "zk_outstanding_requests":
monitorCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
break;
case "zk_server_state":
monitorCmdData.setZkServerState(elem.getValue());
break;
case "zk_znode_count":
monitorCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
break;
case "zk_watch_count":
monitorCmdData.setZkWatchCount(Long.valueOf(elem.getValue()));
break;
case "zk_ephemerals_count":
monitorCmdData.setZkEphemeralsCount(Long.valueOf(elem.getValue()));
break;
case "zk_approximate_data_size":
monitorCmdData.setZkApproximateDataSize(Long.valueOf(elem.getValue()));
break;
case "zk_open_file_descriptor_count":
monitorCmdData.setZkOpenFileDescriptorCount(Long.valueOf(elem.getValue()));
break;
case "zk_max_file_descriptor_count":
monitorCmdData.setZkMaxFileDescriptorCount(Long.valueOf(elem.getValue()));
break;
default:
LOGGER.warn(
"class=MonitorCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
elem.getKey(), elem.getValue()
);
}
} catch (Exception e) {
LOGGER.error(
"class=MonitorCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
);
}
});
return monitorCmdData;
}
}

View File

@@ -0,0 +1,97 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
* Latency min/avg/max: 0/0/2209
* Received: 278202469
* Sent: 279449055
* Connections: 31
* Outstanding: 0
* Zxid: 0x20033fc12
* Mode: leader
* Node count: 10084
* Proposal sizes last/min/max: 36/32/31260 leader特有
*/
@Data
public class ServerCmdDataParser implements FourLetterWordDataParser<ServerCmdData> {
private static final ILog LOGGER = LogFactory.getLog(ServerCmdDataParser.class);
@Override
public String getCmd() {
return FourLetterWordUtil.ServerCmd;
}
@Override
public ServerCmdData parseAndInitData(Long clusterPhyId, String host, int port, String cmdData) {
Map<String, String> dataMap = new HashMap<>();
for (String elem : cmdData.split("\n")) {
if (elem.isEmpty()) {
continue;
}
int idx = elem.indexOf(':');
if (idx >= 0) {
dataMap.put(elem.substring(0, idx), elem.substring(idx + 1).trim());
}
}
ServerCmdData serverCmdData = new ServerCmdData();
dataMap.entrySet().stream().forEach(elem -> {
try {
switch (elem.getKey()) {
case "Zookeeper version":
serverCmdData.setZkVersion(elem.getValue().split("-")[0]);
break;
case "Latency min/avg/max":
String[] data = elem.getValue().split("/");
serverCmdData.setZkMinLatency(Long.valueOf(data[0]));
serverCmdData.setZkAvgLatency(Long.valueOf(data[1]));
serverCmdData.setZkMaxLatency(Long.valueOf(data[2]));
break;
case "Received":
serverCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
break;
case "Sent":
serverCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
break;
case "Connections":
serverCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
break;
case "Outstanding":
serverCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
break;
case "Mode":
serverCmdData.setZkServerState(elem.getValue());
break;
case "Node count":
serverCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
break;
case "Zxid":
serverCmdData.setZkZxid(Long.parseUnsignedLong(elem.getValue().trim().substring(2), 16));
break;
default:
LOGGER.warn(
"class=ServerCmdDataParser||method=parseAndInitData||name={}||value={}||msg=data not parsed!",
elem.getKey(), elem.getValue()
);
}
} catch (Exception e) {
LOGGER.error(
"class=ServerCmdDataParser||method=parseAndInitData||clusterPhyId={}||host={}||port={}||name={}||value={}||errMsg=exception!",
clusterPhyId, host, port, elem.getKey(), elem.getValue(), e
);
}
});
return serverCmdData;
}
}

View File

@@ -8,8 +8,6 @@ import org.springframework.context.ApplicationEvent;
*/
@Getter
public class BaseMetricEvent extends ApplicationEvent {
public BaseMetricEvent(Object source) {
super( source );
}

View File

@@ -0,0 +1,20 @@
package com.xiaojukeji.know.streaming.km.common.bean.event.metric;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import lombok.Getter;
import java.util.List;
/**
* @author didi
*/
@Getter
public class ZookeeperMetricEvent extends BaseMetricEvent {
private List<ZookeeperMetrics> zookeeperMetrics;
public ZookeeperMetricEvent(Object source, List<ZookeeperMetrics> zookeeperMetrics) {
super( source );
this.zookeeperMetrics = zookeeperMetrics;
}
}

View File

@@ -0,0 +1,24 @@
package com.xiaojukeji.know.streaming.km.common.bean.po.metrice;
import lombok.Data;
import lombok.NoArgsConstructor;
import static com.xiaojukeji.know.streaming.km.common.utils.CommonUtils.monitorTimestamp2min;
@Data
@NoArgsConstructor
public class ZookeeperMetricPO extends BaseMetricESPO {
public ZookeeperMetricPO(Long clusterPhyId){
super(clusterPhyId);
}
@Override
public String getKey() {
return "ZK@" + clusterPhyId + "@" + monitorTimestamp2min(timestamp);
}
@Override
public String getRoutingValue() {
return String.valueOf(clusterPhyId);
}
}

View File

@@ -0,0 +1,40 @@
package com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper;
import com.baomidou.mybatisplus.annotation.TableName;
import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import lombok.Data;
@Data
@TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "zookeeper")
public class ZookeeperInfoPO extends BasePO {
/**
* 集群Id
*/
private Long clusterPhyId;
/**
* 主机
*/
private String host;
/**
* 端口
*/
private Integer port;
/**
* 角色
*/
private String role;
/**
* 版本
*/
private String version;
/**
* ZK状态
*/
private Integer status;
}

View File

@@ -1,16 +1,12 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author didi
@@ -26,19 +22,4 @@ public class MetricMultiLinesVO {
@ApiModelProperty(value = "指标名称对应的指标线")
private List<MetricLineVO> metricLines;
public List<MetricPointVO> getMetricPoints(String resName) {
if (ValidateUtils.isNull(metricLines)) {
return new ArrayList<>();
}
List<MetricLineVO> voList = metricLines.stream().filter(elem -> elem.getName().equals(resName)).collect(Collectors.toList());
if (ValidateUtils.isEmptyList(voList)) {
return new ArrayList<>();
}
// 仅获取idx=0的指标
return voList.get(0).getMetricPoints();
}
}

View File

@@ -0,0 +1,26 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
@ApiModel(description = "Zookeeper信息概览")
public class ClusterZookeepersOverviewVO {
@ApiModelProperty(value = "主机ip", example = "121.0.0.1")
private String host;
@ApiModelProperty(value = "端口号", example = "2416")
private Integer port;
@ApiModelProperty(value = "版本", example = "1.1.2")
private String version;
@ApiModelProperty(value = "角色", example = "Leader")
private String role;
}

View File

@@ -0,0 +1,47 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
@ApiModel(description = "ZK状态信息")
public class ClusterZookeepersStateVO {
@ApiModelProperty(value = "健康检查状态", example = "1")
private Integer healthState;
@ApiModelProperty(value = "健康检查通过数", example = "1")
private Integer healthCheckPassed;
@ApiModelProperty(value = "健康检查总数", example = "1")
private Integer healthCheckTotal;
@ApiModelProperty(value = "ZK的Leader机器", example = "127.0.0.1")
private String leaderNode;
@ApiModelProperty(value = "Watch数", example = "123456")
private Integer watchCount;
@ApiModelProperty(value = "节点存活数", example = "8")
private Integer aliveServerCount;
@ApiModelProperty(value = "总节点数", example = "10")
private Integer totalServerCount;
@ApiModelProperty(value = "Follower角色存活数", example = "8")
private Integer aliveFollowerCount;
@ApiModelProperty(value = "Follower角色总数", example = "10")
private Integer totalFollowerCount;
@ApiModelProperty(value = "Observer角色存活数", example = "3")
private Integer aliveObserverCount;
@ApiModelProperty(value = "Observer角色总数", example = "3")
private Integer totalObserverCount;
}

View File

@@ -0,0 +1,44 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
public class ZnodeStatVO {
@ApiModelProperty(value = "节点被创建时的事物的ID", example = "0x1f09")
private Long czxid;
@ApiModelProperty(value = "创建时间", example = "Sat Mar 16 15:38:34 CST 2019")
private Long ctime;
@ApiModelProperty(value = "节点最后一次被修改时的事物的ID", example = "0x1f09")
private Long mzxid;
@ApiModelProperty(value = "最后一次修改时间", example = "Sat Mar 16 15:38:34 CST 2019")
private Long mtime;
@ApiModelProperty(value = "子节点列表最近一次呗修改的事物ID", example = "0x31")
private Long pzxid;
@ApiModelProperty(value = "子节点版本号", example = "0")
private Integer cversion;
@ApiModelProperty(value = "数据版本号", example = "0")
private Integer version;
@ApiModelProperty(value = "ACL版本号", example = "0")
private Integer aversion;
@ApiModelProperty(value = "创建临时节点的事物ID持久节点事物为0", example = "0")
private Long ephemeralOwner;
@ApiModelProperty(value = "数据长度,每个节点都可保存数据", example = "22")
private Integer dataLength;
@ApiModelProperty(value = "子节点的个数", example = "6")
private Integer numChildren;
}

View File

@@ -0,0 +1,22 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author wyc
* @date 2022/9/23
*/
@Data
public class ZnodeVO {
@ApiModelProperty(value = "节点名称", example = "broker")
private String name;
@ApiModelProperty(value = "节点数据", example = "saassad")
private String data;
@ApiModelProperty(value = "节点属性", example = "")
private ZnodeStatVO stat;
}

View File

@@ -23,8 +23,8 @@ public class Constant {
public static final Integer YES = 1;
public static final Integer NO = 0;
public static final Integer ALIVE = 1;
public static final Integer DOWN = 0;
public static final Integer ALIVE = 1;
public static final Integer DOWN = 0;
public static final Integer ONE_HUNDRED = 100;
@@ -33,6 +33,7 @@ public class Constant {
public static final Long B_TO_MB = 1024L * 1024L;
public static final Integer DEFAULT_SESSION_TIMEOUT_UNIT_MS = 15000;
public static final Integer DEFAULT_REQUEST_TIMEOUT_UNIT_MS = 5000;
public static final Float MIN_HEALTH_SCORE = 10f;
@@ -66,4 +67,5 @@ public class Constant {
public static final Integer DEFAULT_RETRY_TIME = 3;
public static final Integer ZK_ALIVE_BUT_4_LETTER_FORBIDDEN = 11;
}

View File

@@ -34,6 +34,8 @@ public class ESConstant {
public static final String TOTAL = "total";
public static final Integer DEFAULT_RETRY_TIME = 3;
private ESConstant() {
}
}

View File

@@ -644,4 +644,89 @@ public class ESIndexConstant {
" \"aliases\" : { }\n" +
" }";
public final static String ZOOKEEPER_INDEX = "ks_kafka_zookeeper_metric";
public final static String ZOOKEEPER_TEMPLATE = "{\n" +
" \"order\" : 10,\n" +
" \"index_patterns\" : [\n" +
" \"ks_kafka_zookeeper_metric*\"\n" +
" ],\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : \"10\"\n" +
" }\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"routingValue\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"clusterPhyId\" : {\n" +
" \"type\" : \"long\"\n" +
" },\n" +
" \"metrics\" : {\n" +
" \"properties\" : {\n" +
" \"AvgRequestLatency\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"MinRequestLatency\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"MaxRequestLatency\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"OutstandingRequests\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"NodeCount\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"WatchCount\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"NumAliveConnections\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"PacketsReceived\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"PacketsSent\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"EphemeralsCount\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"ApproximateDataSize\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"OpenFileDescriptorCount\" : {\n" +
" \"type\" : \"double\"\n" +
" },\n" +
" \"MaxFileDescriptorCount\" : {\n" +
" \"type\" : \"double\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"key\" : {\n" +
" \"type\" : \"text\",\n" +
" \"fields\" : {\n" +
" \"keyword\" : {\n" +
" \"ignore_above\" : 256,\n" +
" \"type\" : \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"timestamp\" : {\n" +
" \"format\" : \"yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis\",\n" +
" \"type\" : \"date\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : { }\n" +
" }";
}

View File

@@ -0,0 +1,19 @@
package com.xiaojukeji.know.streaming.km.common.converter;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import org.apache.zookeeper.data.Stat;
public class ZnodeConverter {
ZnodeConverter(){
}
public static Znode convert2Znode(Tuple<byte[], Stat> dataAndStat, String path) {
Znode znode = new Znode();
znode.setStat(dataAndStat.getV2());
znode.setData(dataAndStat.getV1() == null ? null : new String(dataAndStat.getV1()));
znode.setName(path.substring(path.lastIndexOf('/') + 1));
return znode;
}
}

View File

@@ -9,7 +9,9 @@ public enum VersionItemTypeEnum {
METRIC_GROUP(102, "group_metric"),
METRIC_BROKER(103, "broker_metric"),
METRIC_PARTITION(104, "partition_metric"),
METRIC_REPLICATION (105, "replication_metric"),
METRIC_REPLICATION(105, "replication_metric"),
METRIC_ZOOKEEPER(110, "zookeeper_metric"),
/**
* 服务端查询

View File

@@ -0,0 +1,22 @@
package com.xiaojukeji.know.streaming.km.common.enums.zookeeper;
import lombok.Getter;
@Getter
public enum ZKRoleEnum {
LEADER("leader"),
FOLLOWER("follower"),
OBSERVER("observer"),
UNKNOWN("unknown"),
;
private final String role;
ZKRoleEnum(String role) {
this.role = role;
}
}

View File

@@ -22,6 +22,12 @@ public class JmxAttribute {
public static final String PERCENTILE_99 = "99thPercentile";
public static final String MAX = "Max";
public static final String MEAN = "Mean";
public static final String MIN = "Min";
public static final String VALUE = "Value";
public static final String CONNECTION_COUNT = "connection-count";

View File

@@ -63,6 +63,12 @@ public class JmxName {
/*********************************************************** cluster ***********************************************************/
public static final String JMX_CLUSTER_PARTITION_UNDER_REPLICATED = "kafka.cluster:type=Partition,name=UnderReplicated";
/*********************************************************** zookeeper ***********************************************************/
public static final String JMX_ZK_REQUEST_LATENCY_MS = "kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs";
public static final String JMX_ZK_SYNC_CONNECTS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperSyncConnectsPerSec";
public static final String JMX_ZK_DISCONNECTORS_PER_SEC = "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec";
private JmxName() {
}
}

View File

@@ -0,0 +1,163 @@
package com.xiaojukeji.know.streaming.km.common.utils.zookeeper;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.FourLetterWordDataParser;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.common.X509Util;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Set;
public class FourLetterWordUtil {
private static final ILog LOGGER = LogFactory.getLog(FourLetterWordUtil.class);
public static final String MonitorCmd = "mntr";
public static final String ConfigCmd = "conf";
public static final String ServerCmd = "srvr";
private static final Set<String> supportedCommands = new HashSet<>();
public static <T> Result<T> executeFourLetterCmd(Long clusterPhyId,
String host,
int port,
boolean secure,
int timeout,
FourLetterWordDataParser<T> dataParser) {
try {
if (!supportedCommands.contains(dataParser.getCmd())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, String.format("ZK %s命令暂未进行支持", dataParser.getCmd()));
}
String cmdData = send4LetterWord(host, port, dataParser.getCmd(), secure, timeout);
if (cmdData.contains("not executed because it is not in the whitelist.")) {
return Result.buildFromRSAndMsg(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN, cmdData);
}
if (ValidateUtils.isBlank(cmdData)) {
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, cmdData);
}
return Result.buildSuc(dataParser.parseAndInitData(clusterPhyId, host, port, cmdData));
} catch (Exception e) {
LOGGER.error(
"class=FourLetterWordUtil||method=executeFourLetterCmd||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
clusterPhyId, host, port, dataParser.getCmd(), secure, timeout, e
);
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
}
}
/**************************************************** private method ****************************************************/
private static String send4LetterWord(
String host,
int port,
String cmd,
boolean secure,
int timeout) throws IOException, X509Exception.SSLContextException {
long startTime = System.currentTimeMillis();
LOGGER.info("connecting to {} {}", host, port);
Socket socket = null;
OutputStream outputStream = null;
BufferedReader bufferedReader = null;
try {
InetSocketAddress hostaddress = host != null
? new InetSocketAddress(host, port)
: new InetSocketAddress(InetAddress.getByName(null), port);
if (secure) {
LOGGER.info("using secure socket");
try (X509Util x509Util = new ClientX509Util()) {
SSLContext sslContext = x509Util.getDefaultSSLContext();
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
SSLSocket sslSock = (SSLSocket) socketFactory.createSocket();
sslSock.connect(hostaddress, timeout);
sslSock.startHandshake();
socket = sslSock;
}
} else {
socket = new Socket();
socket.connect(hostaddress, timeout);
}
socket.setSoTimeout(timeout);
outputStream = socket.getOutputStream();
outputStream.write(cmd.getBytes());
outputStream.flush();
// 等待InputStream有数据
while (System.currentTimeMillis() - startTime <= timeout && socket.getInputStream().available() <= 0) {
BackoffUtils.backoff(10);
}
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
StringBuilder sb = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
sb.append(line).append("\n");
}
return sb.toString();
} catch (SocketTimeoutException e) {
throw new IOException("Exception while executing four letter word: " + cmd, e);
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
LOGGER.error(
"class=FourLetterWordUtil||method=send4LetterWord||clusterPhyId={}||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
host, port, cmd, secure, timeout, e
);
}
}
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException e) {
LOGGER.error(
"class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
host, port, cmd, secure, timeout, e
);
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
LOGGER.error(
"class=FourLetterWordUtil||method=send4LetterWord||host={}||port={}||cmd={}||secure={}||timeout={}||errMsg=exception!",
host, port, cmd, secure, timeout, e
);
}
}
}
}
static {
supportedCommands.add(MonitorCmd);
supportedCommands.add(ConfigCmd);
supportedCommands.add(ServerCmd);
}
}

View File

@@ -0,0 +1,59 @@
package com.xiaojukeji.know.streaming.km.common.utils.zookeeper;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import org.apache.zookeeper.client.ConnectStringParser;
import org.apache.zookeeper.common.NetUtils;
import java.util.ArrayList;
import java.util.List;
import static org.apache.zookeeper.common.StringUtils.split;
public class ZookeeperUtils {
private static final int DEFAULT_PORT = 2181;
/**
* 解析ZK地址
* @see ConnectStringParser
*/
public static List<Tuple<String, Integer>> connectStringParser(String connectString) throws Exception {
List<Tuple<String, Integer>> ipPortList = new ArrayList<>();
if (connectString == null) {
return ipPortList;
}
// parse out chroot, if any
int off = connectString.indexOf('/');
if (off >= 0) {
connectString = connectString.substring(0, off);
}
List<String> hostsList = split(connectString, ",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
if (hostAndPort.length != 0) {
host = hostAndPort[0];
if (hostAndPort.length == 2) {
port = Integer.parseInt(hostAndPort[1]);
}
} else {
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
// otherwise : is at the end of the string, ignore
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
}
ipPortList.add(new Tuple<>(host, port));
}
return ipPortList;
}
}

View File

@@ -343,17 +343,9 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
private Broker getStartTimeAndBuildBroker(Long clusterPhyId, Node newNode, JmxConfig jmxConfig) {
try {
Object object = jmxDAO.getJmxValue(
clusterPhyId,
newNode.id(),
newNode.host(),
null,
jmxConfig,
new ObjectName("java.lang:type=Runtime"),
"StartTime"
);
Long startTime = jmxDAO.getServerStartTime(clusterPhyId, newNode.host(), null, jmxConfig);
return Broker.buildFrom(clusterPhyId, newNode, object != null? (Long) object: null);
return Broker.buildFrom(clusterPhyId, newNode, startTime);
} catch (Exception e) {
log.error("class=BrokerServiceImpl||method=getStartTimeAndBuildBroker||clusterPhyId={}||brokerNode={}||jmxConfig={}||errMsg=exception!", clusterPhyId, newNode, jmxConfig, e);
}

View File

@@ -0,0 +1,141 @@
package com.xiaojukeji.know.streaming.km.core.service.version.metrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.METRIC_ZOOKEEPER;
import static com.xiaojukeji.know.streaming.km.common.jmx.JmxAttribute.*;
import static com.xiaojukeji.know.streaming.km.common.jmx.JmxName.*;
import static com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl.ZookeeperMetricServiceImpl.*;
@Component
public class ZookeeperMetricVersionItems extends BaseMetricVersionMetric {
/**
* 性能
*/
public static final String ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY = "AvgRequestLatency";
public static final String ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY = "MinRequestLatency";
public static final String ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY = "MaxRequestLatency";
public static final String ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS = "OutstandingRequests";
public static final String ZOOKEEPER_METRIC_NODE_COUNT = "NodeCount";
public static final String ZOOKEEPER_METRIC_WATCH_COUNT = "WatchCount";
public static final String ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS = "NumAliveConnections";
public static final String ZOOKEEPER_METRIC_PACKETS_RECEIVED = "PacketsReceived";
public static final String ZOOKEEPER_METRIC_PACKETS_SENT = "PacketsSent";
public static final String ZOOKEEPER_METRIC_EPHEMERALS_COUNT = "EphemeralsCount";
public static final String ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE = "ApproximateDataSize";
public static final String ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT = "OpenFileDescriptorCount";
public static final String ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT = "MaxFileDescriptorCount";
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC = "KafkaZKDisconnectsPerSec";
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC = "KafkaZKSyncConnectsPerSec";
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH = "KafkaZKRequestLatencyMs_99thPercentile";
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX = "KafkaZKRequestLatencyMs_Max";
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN = "KafkaZKRequestLatencyMs_Mean";
public static final String ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN = "KafkaZKRequestLatencyMs_Min";
public static final String ZOOKEEPER_METRIC_COLLECT_COST_TIME = Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME;
@Override
public int versionItemType() {
return METRIC_ZOOKEEPER.getCode();
}
@Override
public List<VersionMetricControlItem> init(){
List<VersionMetricControlItem> items = new ArrayList<>();
// 性能指标
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY).unit("ms").desc("平均响应延迟").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY).unit("ms").desc("最小响应延迟").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY).unit("ms").desc("最大响应延迟").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS).unit("").desc("堆积请求数").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_NODE_COUNT).unit("").desc("ZNode数量").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_WATCH_COUNT).unit("").desc("Watch数量").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS).unit("").desc("客户端连接数量").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_PACKETS_RECEIVED).unit("").desc("接受包的数量").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_PACKETS_SENT).unit("").desc("发送包的数量").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_EPHEMERALS_COUNT).unit("").desc("临时节点数").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE).unit("byte").desc("文件大小(近似值)").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT).unit("").desc("已打开的文件描述符数").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT).unit("").desc("允许打开的最大文件描述符数").category(CATEGORY_PERFORMANCE)
.extendMethod(ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD));
// JMX指标
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_99TH).unit("ms").desc("ZK请求99分位延迟").category(CATEGORY_CLIENT)
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(PERCENTILE_99)));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MAX).unit("ms").desc("ZK请求最大延迟").category(CATEGORY_CLIENT)
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MAX)));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MIN).unit("ms").desc("ZK请求最小延迟").category(CATEGORY_CLIENT)
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MIN)));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_KAFKA_ZK_REQUEST_LATENCY_MEAN).unit("ms").desc("ZK请求平均延迟").category(CATEGORY_CLIENT)
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
.jmxObjectName( JMX_ZK_REQUEST_LATENCY_MS ).jmxAttribute(MEAN)));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_KAFKA_ZK_DISCONNECTS_PER_SEC).unit("").desc("断开连接数").category(CATEGORY_CLIENT)
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
.jmxObjectName( JMX_ZK_DISCONNECTORS_PER_SEC ).jmxAttribute(RATE_MIN_1)));
items.add(buildAllVersionsItem()
.name(ZOOKEEPER_METRIC_KAFKA_ZK_SYNC_CONNECTS_PER_SEC).unit("").desc("同步连接数").category(CATEGORY_CLIENT)
.extend( buildJMXMethodExtend( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX )
.jmxObjectName( JMX_ZK_SYNC_CONNECTS_PER_SEC ).jmxAttribute(RATE_MIN_1)));
return items;
}
}

View File

@@ -0,0 +1,13 @@
package com.xiaojukeji.know.streaming.km.core.service.zookeeper;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode;
import java.util.List;
public interface ZnodeService {
Result<List<String>> listZnodeChildren(Long clusterPhyId, String path, String keyword);
Result<Znode> getZnode(Long clusterPhyId, String path);
}

View File

@@ -0,0 +1,21 @@
package com.xiaojukeji.know.streaming.km.core.service.zookeeper;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
import java.util.List;
public interface ZookeeperMetricService {
/**
* ZK指标获取
* @param param 参数因为ZK 四字命令在使用时,是短连接,所以参数内容会复杂一些,后续可以考虑优化为长连接
* @return
*/
Result<ZookeeperMetrics> collectMetricsFromZookeeper(ZookeeperMetricParam param);
Result<ZookeeperMetrics> batchCollectMetricsFromZookeeper(Long clusterPhyId, List<String> metricNameList);
Result<List<MetricLineVO>> listMetricsFromES(Long clusterPhyId, MetricDTO dto);
}

View File

@@ -0,0 +1,18 @@
package com.xiaojukeji.know.streaming.km.core.service.zookeeper;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import java.util.List;
public interface ZookeeperService {
/**
* 从ZK集群中获取ZK信息
*/
Result<List<ZookeeperInfo>> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig);
void batchReplaceDataInDB(Long clusterPhyId, List<ZookeeperInfo> infoList);
List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId);
}

View File

@@ -0,0 +1,81 @@
package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.Znode;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.converter.ZnodeConverter;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService;
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class ZnodeServiceImpl implements ZnodeService {
private static final ILog LOGGER = LogFactory.getLog(ZnodeServiceImpl.class);
@Autowired
private KafkaZKDAO kafkaZKDAO;
@Autowired
private ClusterPhyService clusterPhyService;
@Override
public Result<List<String>> listZnodeChildren(Long clusterPhyId, String path, String keyword) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
}
List<String> children;
try {
children = kafkaZKDAO.getChildren(clusterPhyId, path, false);
} catch (NotExistException e) {
LOGGER.error("class=ZnodeServiceImpl||method=listZnodeChildren||clusterPhyId={}||errMsg={}", clusterPhyId, "create ZK client create failed");
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, "ZK客户端创建失败");
} catch (Exception e) {
LOGGER.error("class=ZnodeServiceImpl||method=listZnodeChildren||clusterPhyId={}||errMsg={}", clusterPhyId, "ZK operate failed");
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, "ZK操作失败");
}
//关键字搜索
if (keyword != null) {
children = children.stream().filter(elem -> elem.contains(keyword)).collect(Collectors.toList());
}
return Result.buildSuc(children);
}
@Override
public Result<Znode> getZnode(Long clusterPhyId, String path) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
}
//获取zookeeper上的原始数据
Tuple<byte[], Stat> dataAndStat;
try {
dataAndStat = kafkaZKDAO.getDataAndStat(clusterPhyId, path);
} catch (NotExistException e) {
LOGGER.error("class=ZnodeServiceImpl||method=getZnode||clusterPhyId={}||errMsg={}", clusterPhyId, "create ZK client create failed");
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, "ZK客户端创建失败");
} catch (Exception e) {
LOGGER.error("class=ZnodeServiceImpl||method=getZnode||clusterPhyId={}||errMsg={}", clusterPhyId, "ZK operate failed");
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, "ZK操作失败");
}
return Result.buildSuc(ZnodeConverter.convert2Znode(dataAndStat, path));
}
}

View File

@@ -0,0 +1,281 @@
package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.metric.ZookeeperMetricParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionJmxInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.MonitorCmdDataParser;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.*;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.MonitorCmdData;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.ZookeeperMetricPO;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ZookeeperMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.management.ObjectName;
import java.util.*;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_JMX_CONNECT_ERROR;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems.*;
@Service
public class ZookeeperMetricServiceImpl extends BaseMetricService implements ZookeeperMetricService {
private static final ILog LOGGER = LogFactory.getLog(ZookeeperMetricServiceImpl.class);
public static final String ZOOKEEPER_METHOD_DO_NOTHING = "doNothing";
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD = "getMetricFromMonitorCmd";
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD = "getMetricFromServerCmd";
public static final String ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX = "getMetricFromKafkaByJMX";
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private ZookeeperService zookeeperService;
@Autowired
private ZookeeperMetricESDAO zookeeperMetricESDAO;
@Autowired
private KafkaJMXClient kafkaJMXClient;
@Autowired
private KafkaControllerService kafkaControllerService;
@Override
protected VersionItemTypeEnum getVersionItemType() {
return VersionItemTypeEnum.METRIC_ZOOKEEPER;
}
@Override
protected List<String> listMetricPOFields(){
return BeanUtil.listBeanFields(ZookeeperMetricPO.class);
}
@Override
protected void initRegisterVCHandler(){
registerVCHandler( ZOOKEEPER_METHOD_DO_NOTHING, this::doNothing);
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_MONITOR_CMD, this::getMetricFromMonitorCmd);
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_SERVER_CMD, this::getMetricFromServerCmd);
registerVCHandler( ZOOKEEPER_METHOD_GET_METRIC_FROM_KAFKA_BY_JMX, this::getMetricFromKafkaByJMX);
}
@Override
public Result<ZookeeperMetrics> collectMetricsFromZookeeper(ZookeeperMetricParam param) {
try {
return (Result<ZookeeperMetrics>)doVCHandler(param.getClusterPhyId(), param.getMetricName(), param);
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}
@Override
public Result<ZookeeperMetrics> batchCollectMetricsFromZookeeper(Long clusterPhyId, List<String> metricNameList) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (null == clusterPhy) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
}
List<ZookeeperInfo> aliveZKList = zookeeperService.listFromDBByCluster(clusterPhyId).stream()
.filter(elem -> Constant.ALIVE.equals(elem.getStatus()))
.collect(Collectors.toList());
if (ValidateUtils.isEmptyList(aliveZKList)) {
// 没有指标可以获取
return Result.buildSuc(new ZookeeperMetrics(clusterPhyId));
}
// 构造参数
ZookeeperMetricParam param = new ZookeeperMetricParam(
clusterPhyId,
aliveZKList.stream().map(elem -> new Tuple<String, Integer>(elem.getHost(), elem.getPort())).collect(Collectors.toList()),
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class),
null
);
ZookeeperMetrics metrics = new ZookeeperMetrics(clusterPhyId);
for(String metricName : metricNameList) {
try {
if(metrics.getMetrics().containsKey(metricName)) {
continue;
}
param.setMetricName(metricName);
Result<ZookeeperMetrics> ret = this.collectMetricsFromZookeeper(param);
if(null == ret || ret.failed() || null == ret.getData()){
continue;
}
metrics.putMetric(ret.getData().getMetrics());
} catch (Exception e){
LOGGER.error(
"class=ZookeeperMetricServiceImpl||method=collectMetricsFromZookeeper||clusterPhyId={}||metricName={}||errMsg=exception!",
clusterPhyId, metricName, e
);
}
}
return Result.buildSuc(metrics);
}
@Override
public Result<List<MetricLineVO>> listMetricsFromES(Long clusterPhyId, MetricDTO dto) {
Map<String/*metricName*/, List<MetricPointVO>> pointVOMap = zookeeperMetricESDAO.listMetricsByClusterPhyId(
clusterPhyId,
dto.getMetricsNames(),
dto.getAggType(),
dto.getStartTime(),
dto.getEndTime()
);
// 格式转化
List<MetricLineVO> voList = new ArrayList<>();
pointVOMap.entrySet().stream().forEach(entry ->
voList.add(new MetricLineVO(String.valueOf(clusterPhyId), entry.getKey(), entry.getValue()))
);
return Result.buildSuc(voList);
}
/**************************************************** private method ****************************************************/
private Result<ZookeeperMetrics> getMetricFromServerCmd(VersionItemParam metricParam) {
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
Result<ZookeeperMetrics> rz = null;
for (Tuple<String, Integer> hostPort: param.getZkAddressList()) {
Result<ServerCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
param.getClusterPhyId(),
hostPort.getV1(),
hostPort.getV2(),
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
new ServerCmdDataParser()
);
if (cmdDataResult.failed()) {
rz = Result.buildFromIgnoreData(cmdDataResult);
continue;
}
ServerCmdData cmdData = cmdDataResult.getData();
ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId());
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue());
return Result.buildSuc(metrics);
}
return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId()));
}
private Result<ZookeeperMetrics> getMetricFromMonitorCmd(VersionItemParam metricParam) {
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
Result<ZookeeperMetrics> rz = null;
for (Tuple<String, Integer> hostPort: param.getZkAddressList()) {
Result<MonitorCmdData> cmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
param.getClusterPhyId(),
hostPort.getV1(),
hostPort.getV2(),
param.getZkConfig() != null ? param.getZkConfig().getOpenSecure(): false,
param.getZkConfig() != null ? param.getZkConfig().getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
new MonitorCmdDataParser()
);
if (cmdDataResult.failed()) {
rz = Result.buildFromIgnoreData(cmdDataResult);
continue;
}
MonitorCmdData cmdData = cmdDataResult.getData();
ZookeeperMetrics metrics = new ZookeeperMetrics(param.getClusterPhyId());
metrics.putMetric(ZOOKEEPER_METRIC_AVG_REQUEST_LATENCY, cmdData.getZkAvgLatency().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_MIN_REQUEST_LATENCY, cmdData.getZkMinLatency().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_MAX_REQUEST_LATENCY, cmdData.getZkMaxLatency().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_OUTSTANDING_REQUESTS, cmdData.getZkOutstandingRequests().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_NODE_COUNT, cmdData.getZkZnodeCount().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_WATCH_COUNT, cmdData.getZkWatchCount().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_NUM_ALIVE_CONNECTIONS, cmdData.getZkNumAliveConnections().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_RECEIVED, cmdData.getZkPacketsReceived().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_PACKETS_SENT, cmdData.getZkPacketsSent().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_EPHEMERALS_COUNT, cmdData.getZkEphemeralsCount().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_APPROXIMATE_DATA_SIZE, cmdData.getZkApproximateDataSize().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_OPEN_FILE_DESCRIPTOR_COUNT, cmdData.getZkOpenFileDescriptorCount().floatValue());
metrics.putMetric(ZOOKEEPER_METRIC_MAX_FILE_DESCRIPTOR_COUNT, cmdData.getZkMaxFileDescriptorCount().floatValue());
return Result.buildSuc(metrics);
}
return rz != null? rz: Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId()));
}
private Result<ZookeeperMetrics> doNothing(VersionItemParam metricParam) {
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
return Result.buildSuc(new ZookeeperMetrics(param.getClusterPhyId()));
}
private Result<ZookeeperMetrics> getMetricFromKafkaByJMX(VersionItemParam metricParam) {
ZookeeperMetricParam param = (ZookeeperMetricParam)metricParam;
String metricName = param.getMetricName();
Long clusterPhyId = param.getClusterPhyId();
Integer kafkaControllerId = param.getKafkaControllerId();
//1、获取jmx的属性信息
VersionJmxInfo jmxInfo = getJMXInfo(clusterPhyId, metricName);
if(null == jmxInfo) {
return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);
}
//2、获取jmx连接
JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterPhyId, kafkaControllerId);
if (ValidateUtils.isNull(jmxConnectorWrap)) {
return Result.buildFailure(VC_JMX_INIT_ERROR);
}
try {
//2、获取jmx指标
String value = jmxConnectorWrap.getAttribute(new ObjectName(jmxInfo.getJmxObjectName()), jmxInfo.getJmxAttribute()).toString();
return Result.buildSuc(ZookeeperMetrics.initWithMetric(clusterPhyId, metricName, Float.valueOf(value)));
} catch (Exception e) {
return Result.buildFailure(VC_JMX_CONNECT_ERROR);
}
}
}

View File

@@ -0,0 +1,147 @@
package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.ServerCmdData;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.fourletterword.parser.ServerCmdDataParser;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper.ZookeeperInfoPO;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.FourLetterWordUtil;
import com.xiaojukeji.know.streaming.km.common.utils.zookeeper.ZookeeperUtils;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
import com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper.ZookeeperDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class ZookeeperServiceImpl implements ZookeeperService {
private static final ILog LOGGER = LogFactory.getLog(ZookeeperServiceImpl.class);
@Autowired
private ZookeeperDAO zookeeperDAO;
@Override
public Result<List<ZookeeperInfo>> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig) {
List<Tuple<String, Integer>> addressList = null;
try {
addressList = ZookeeperUtils.connectStringParser(zookeeperAddress);
} catch (Exception e) {
LOGGER.error(
"class=ZookeeperServiceImpl||method=listFromZookeeperCluster||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!",
clusterPhyId, zookeeperAddress, e
);
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, e.getMessage());
}
List<ZookeeperInfo> aliveZKList = new ArrayList<>();
for (Tuple<String, Integer> hostPort: addressList) {
aliveZKList.add(this.getFromZookeeperCluster(
clusterPhyId,
hostPort.getV1(),
hostPort.getV2(),
zkConfig
));
}
return Result.buildSuc(aliveZKList);
}
@Override
public void batchReplaceDataInDB(Long clusterPhyId, List<ZookeeperInfo> infoList) {
// DB 中的信息
List<ZookeeperInfoPO> dbInfoList = this.listRawFromDBByCluster(clusterPhyId);
Map<String, ZookeeperInfoPO> dbMap = new HashMap<>();
dbInfoList.stream().forEach(elem -> dbMap.put(elem.getHost() + elem.getPort(), elem));
// 新获取到的信息
List<ZookeeperInfoPO> newInfoList = ConvertUtil.list2List(infoList, ZookeeperInfoPO.class);
for (ZookeeperInfoPO newInfo: newInfoList) {
try {
ZookeeperInfoPO oldInfo = dbMap.remove(newInfo.getHost() + newInfo.getPort());
if (oldInfo == null) {
zookeeperDAO.insert(newInfo);
} else if (!Constant.DOWN.equals(newInfo.getStatus())) {
// 存活时,直接使用获取到的数据
newInfo.setId(oldInfo.getId());
zookeeperDAO.updateById(newInfo);
} else {
// 如果挂了,则版本和角色信息,使用先前的信息。
// 挂掉之后如果角色是leader则需要调整一下
newInfo.setId(oldInfo.getId());
newInfo.setRole(ZKRoleEnum.LEADER.getRole().equals(oldInfo.getRole())? ZKRoleEnum.FOLLOWER.getRole(): oldInfo.getRole());
newInfo.setVersion(oldInfo.getVersion());
zookeeperDAO.updateById(newInfo);
}
} catch (Exception e) {
LOGGER.error("class=ZookeeperServiceImpl||method=batchReplaceDataInDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterPhyId, newInfo, e);
}
}
// 删除剩余的ZK节点
dbMap.entrySet().forEach(entry -> {
try {
zookeeperDAO.deleteById(entry.getValue().getId());
} catch (Exception e) {
LOGGER.error("class=ZookeeperServiceImpl||method=batchReplaceDataInDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterPhyId, entry.getValue(), e);
}
});
}
@Override
public List<ZookeeperInfo> listFromDBByCluster(Long clusterPhyId) {
return ConvertUtil.list2List(this.listRawFromDBByCluster(clusterPhyId), ZookeeperInfo.class);
}
/**************************************************** private method ****************************************************/
private List<ZookeeperInfoPO> listRawFromDBByCluster(Long clusterPhyId) {
LambdaQueryWrapper<ZookeeperInfoPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(ZookeeperInfoPO::getClusterPhyId, clusterPhyId);
return zookeeperDAO.selectList(lambdaQueryWrapper);
}
private ZookeeperInfo getFromZookeeperCluster(Long clusterPhyId, String host, Integer port, ZKConfig zkConfig) {
ZookeeperInfo zookeeperInfo = new ZookeeperInfo();
zookeeperInfo.setClusterPhyId(clusterPhyId);
zookeeperInfo.setHost(host);
zookeeperInfo.setPort(port);
zookeeperInfo.setRole("");
zookeeperInfo.setVersion("");
zookeeperInfo.setStatus(Constant.DOWN);
Result<ServerCmdData> serverCmdDataResult = FourLetterWordUtil.executeFourLetterCmd(
clusterPhyId,
host,
port,
zkConfig != null ? zkConfig.getOpenSecure(): false,
zkConfig != null ? zkConfig.getRequestTimeoutUnitMs(): Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS,
new ServerCmdDataParser()
);
if (serverCmdDataResult.hasData()) {
zookeeperInfo.setRole(serverCmdDataResult.getData().getZkServerState());
zookeeperInfo.setVersion(serverCmdDataResult.getData().getZkVersion());
zookeeperInfo.setStatus(Constant.ALIVE);
} else if (serverCmdDataResult.getCode().equals(ResultStatus.ZK_FOUR_LETTER_CMD_FORBIDDEN.getCode())) {
zookeeperInfo.setStatus(Constant.ZK_ALIVE_BUT_4_LETTER_FORBIDDEN);
} else {
return zookeeperInfo;
}
return zookeeperInfo;
}
}

View File

@@ -355,3 +355,19 @@ CREATE TABLE `ks_km_app_node` (
PRIMARY KEY (`id`),
KEY `idx_app_host` (`app_name`,`host_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='km集群部署的node信息';
DROP TABLE IF EXISTS `ks_km_zookeeper`;
CREATE TABLE `ks_km_zookeeper` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '物理集群ID',
`host` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper主机名',
`port` int(16) NOT NULL DEFAULT '-1' COMMENT 'zookeeper端口',
`role` int(16) NOT NULL DEFAULT '-1' COMMENT '角色, leader follower observer',
`version` varchar(128) NOT NULL DEFAULT '' COMMENT 'zookeeper版本',
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活0未存活11存活但是4字命令使用不了',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_cluster_phy_id_host_port` (`cluster_phy_id`,`host`, `port`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Zookeeper信息表';

View File

@@ -0,0 +1,85 @@
PUT _template/ks_kafka_zookeeper_metric
{
"order" : 10,
"index_patterns" : [
"ks_kafka_zookeeper_metric*"
],
"settings" : {
"index" : {
"number_of_shards" : "10"
}
},
"mappings" : {
"properties" : {
"routingValue" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"clusterPhyId" : {
"type" : "long"
},
"metrics" : {
"properties" : {
"AvgRequestLatency" : {
"type" : "double"
},
"MinRequestLatency" : {
"type" : "double"
},
"MaxRequestLatency" : {
"type" : "double"
},
"OutstandingRequests" : {
"type" : "double"
},
"NodeCount" : {
"type" : "double"
},
"WatchCount" : {
"type" : "double"
},
"NumAliveConnections" : {
"type" : "double"
},
"PacketsReceived" : {
"type" : "double"
},
"PacketsSent" : {
"type" : "double"
},
"EphemeralsCount" : {
"type" : "double"
},
"ApproximateDataSize" : {
"type" : "double"
},
"OpenFileDescriptorCount" : {
"type" : "double"
},
"MaxFileDescriptorCount" : {
"type" : "double"
}
}
},
"key" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"timestamp" : {
"format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
"type" : "date"
}
}
},
"aliases" : { }
}

View File

@@ -40,8 +40,7 @@ public class BaseMetricESDAO extends BaseESDAO {
/**
* 不同维度 kafka 监控数据
*/
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps
.newConcurrentMap();
private static Map<String, BaseMetricESDAO> ariusStatsEsDaoMap = Maps.newConcurrentMap();
/**
* 检查 es 索引是否存在,不存在则创建索引

View File

@@ -0,0 +1,106 @@
package com.xiaojukeji.know.streaming.km.persistence.es.dao;
import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse;
import com.didiglobal.logi.elasticsearch.client.response.query.query.aggs.ESAggr;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.common.constant.ESConstant;
import com.xiaojukeji.know.streaming.km.common.utils.MetricsUtils;
import com.xiaojukeji.know.streaming.km.persistence.es.dsls.DslsConstant;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.xiaojukeji.know.streaming.km.common.constant.ESConstant.*;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_INDEX;
import static com.xiaojukeji.know.streaming.km.common.constant.ESIndexConstant.ZOOKEEPER_TEMPLATE;
@Component
public class ZookeeperMetricESDAO extends BaseMetricESDAO {
@PostConstruct
public void init() {
super.indexName = ZOOKEEPER_INDEX;
super.indexTemplate = ZOOKEEPER_TEMPLATE;
checkCurrentDayIndexExist();
BaseMetricESDAO.register(indexName, this);
}
/**
* 获取指定集群,指定指标,一段时间内的值
*/
public Map<String/*metricName*/, List<MetricPointVO>> listMetricsByClusterPhyId(Long clusterPhyId,
List<String> metricNameList,
String aggType,
Long startTime,
Long endTime) {
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
//2、根据查询的时间区间大小来确定指标点的聚合区间大小
String interval = MetricsUtils.getInterval(endTime - startTime);
//3、构造agg查询条件
String aggDsl = buildAggsDSL(metricNameList, aggType);
//4、构造dsl查询条件开始查询
try {
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_ZOOKEEPER_AGG_LIST_METRICS, clusterPhyId, startTime, endTime, interval, aggDsl);
return esOpClient.performRequestWithRouting(
String.valueOf(clusterPhyId),
realIndex,
dsl,
s -> handleListESQueryResponse(s, metricNameList, aggType),
ESConstant.DEFAULT_RETRY_TIME
);
} catch (Exception e){
LOGGER.error("class=ZookeeperMetricESDAO||method=listMetricsByClusterPhyId||clusterPhyId={}||errMsg=exception!",
clusterPhyId, e
);
}
return new HashMap<>();
}
/**************************************************** private method ****************************************************/
private Map<String, List<MetricPointVO>> handleListESQueryResponse(ESQueryResponse response, List<String> metrics, String aggType){
Map<String, ESAggr> esAggrMap = checkBucketsAndHitsOfResponseAggs(response);
if(null == esAggrMap) {
return new HashMap<>();
}
Map<String, List<MetricPointVO>> metricMap = new HashMap<>();
for(String metric : metrics){
List<MetricPointVO> metricPoints = new ArrayList<>();
esAggrMap.get(HIST).getBucketList().forEach( esBucket -> {
try {
if (null != esBucket.getUnusedMap().get(KEY)) {
Long timestamp = Long.valueOf(esBucket.getUnusedMap().get(KEY).toString());
String value = esBucket.getAggrMap().get(metric).getUnusedMap().get(VALUE).toString();
MetricPointVO metricPoint = new MetricPointVO();
metricPoint.setAggType(aggType);
metricPoint.setTimeStamp(timestamp);
metricPoint.setValue(value);
metricPoint.setName(metric);
metricPoints.add(metricPoint);
}
}catch (Exception e){
LOGGER.error("method=handleESQueryResponse||metric={}||errMsg=exception!", metric, e);
}
} );
metricMap.put(metric, optimizeMetricPoints(metricPoints));
}
return metricMap;
}
}

View File

@@ -80,4 +80,6 @@ public class DslsConstant {
public static final String COUNT_GROUP_NOT_METRIC_VALUE = "GroupMetricESDAO/countGroupNotMetricValue";
/**************************************************** Zookeeper ****************************************************/
public static final String GET_ZOOKEEPER_AGG_LIST_METRICS = "ZookeeperMetricESDAO/getAggListZookeeperMetrics";
}

View File

@@ -12,5 +12,7 @@ import javax.management.ObjectName;
public interface JmxDAO {
Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute);
Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute);
Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute);
Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig);
}

View File

@@ -19,24 +19,28 @@ public class JmxDAOImpl implements JmxDAO {
@Override
public Object getJmxValue(String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) {
return this.getJmxValue(null, null, jmxHost, jmxPort, jmxConfig, objectName, attribute);
return this.getJmxValue(null, jmxHost, jmxPort, jmxConfig, objectName, attribute);
}
@Override
public Object getJmxValue(Long clusterPhyId, Integer brokerId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) {
public Object getJmxValue(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig, ObjectName objectName, String attribute) {
JmxConnectorWrap jmxConnectorWrap = null;
try {
jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, brokerId, null, jmxHost, jmxPort, jmxConfig);
jmxConnectorWrap = new JmxConnectorWrap(clusterPhyId, null, null, jmxHost, jmxPort, jmxConfig);
if (!jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) {
log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed",
clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig);
log.error(
"method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMgs=create jmx client failed",
clusterPhyId, jmxHost, jmxPort, jmxConfig
);
return null;
}
return jmxConnectorWrap.getAttribute(objectName, attribute);
} catch (Exception e) {
log.error("method=getJmxValue||clusterPhyId={}||brokerId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg={}",
clusterPhyId, brokerId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e);
log.error(
"method=getJmxValue||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||objectName={}||attribute={}||msg=get attribute failed||errMsg=exception!",
clusterPhyId, jmxHost, jmxPort, jmxConfig, objectName, attribute, e
);
} finally {
if (jmxConnectorWrap != null) {
jmxConnectorWrap.close();
@@ -45,4 +49,27 @@ public class JmxDAOImpl implements JmxDAO {
return null;
}
@Override
public Long getServerStartTime(Long clusterPhyId, String jmxHost, Integer jmxPort, JmxConfig jmxConfig) {
try {
Object object = this.getJmxValue(
clusterPhyId,
jmxHost,
jmxPort,
jmxConfig,
new ObjectName("java.lang:type=Runtime"),
"StartTime"
);
return object == null? null: (Long) object;
} catch (Exception e) {
log.error(
"class=JmxDAOImpl||method=getServerStartTime||clusterPhyId={}||jmxHost={}||jmxPort={}||jmxConfig={}||errMsg=exception!",
clusterPhyId, jmxHost, jmxPort, jmxConfig, e
);
}
return null;
}
}

View File

@@ -0,0 +1,9 @@
package com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.xiaojukeji.know.streaming.km.common.bean.po.zookeeper.ZookeeperInfoPO;
import org.springframework.stereotype.Repository;
@Repository
public interface ZookeeperDAO extends BaseMapper<ZookeeperInfoPO> {
}

View File

@@ -0,0 +1,44 @@
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"clusterPhyId": {
"value": %d
}
}
},
{
"term": {
"brokerId": {
"value": %d
}
}
},
{
"range": {
"timestamp": {
"gte": %d,
"lte": %d
}
}
}
]
}
},
"aggs": {
"hist": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "%s",
"time_zone": "Asia/Shanghai",
"min_doc_count": 0
},
"aggs": {
%s
}
}
}
}

View File

@@ -0,0 +1,63 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.cluster;
import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterZookeepersManager;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersOverviewVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterZookeepersOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ClusterZookeepersStateVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.zookeeper.ZnodeVO;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author zengqiao
* @date 22/09/19
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "集群ZK-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_PREFIX)
public class ClusterZookeepersController {
@Autowired
private ClusterZookeepersManager clusterZookeepersManager;
@Autowired
private ZnodeService znodeService;
@ApiOperation("集群Zookeeper状态信息")
@GetMapping(value = "clusters/{clusterPhyId}/zookeepers-state")
public Result<ClusterZookeepersStateVO> getClusterZookeepersState(@PathVariable Long clusterPhyId) {
return clusterZookeepersManager.getClusterPhyZookeepersState(clusterPhyId);
}
@ApiOperation("集群Zookeeper信息列表")
@PostMapping(value = "clusters/{clusterPhyId}/zookeepers-overview")
public PaginationResult<ClusterZookeepersOverviewVO> getClusterZookeepersOverview(@PathVariable Long clusterPhyId,
@RequestBody ClusterZookeepersOverviewDTO dto) {
return clusterZookeepersManager.getClusterPhyZookeepersOverview(clusterPhyId, dto);
}
@ApiOperation("Zookeeper节点数据")
@GetMapping(value = "clusters/{clusterPhyId}/znode-data")
public Result<ZnodeVO> getClusterZookeeperData(@PathVariable Long clusterPhyId,
@RequestParam String path) {
return clusterZookeepersManager.getZnodeVO(clusterPhyId, path);
}
@ApiOperation("Zookeeper节点列表")
@GetMapping(value = "clusters/{clusterPhyId}/znode-children")
public Result<List<String>> getClusterZookeeperChild(@PathVariable Long clusterPhyId,
@RequestParam String path,
@RequestParam(required = false) String keyword) {
return znodeService.listZnodeChildren(clusterPhyId, path, keyword);
}
}

View File

@@ -0,0 +1,52 @@
package com.xiaojukeji.know.streaming.km.rest.api.v3.zk;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ZookeeperMetrics;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author zengqiao
* @date 22/09/19
*/
@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "ZKMetrics-相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V3_PREFIX)
public class ZookeeperMetricsController {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperMetricsController.class);
@Autowired
private ZookeeperMetricService zookeeperMetricService;
@ApiOperation(value = "ZK-最近指标", notes = "")
@PostMapping(value = "clusters/{clusterPhyId}/zookeeper-latest-metrics")
@ResponseBody
public Result<BaseMetrics> getLatestMetrics(@PathVariable Long clusterPhyId, @RequestBody List<String> metricsNames) {
Result<ZookeeperMetrics> metricsResult = zookeeperMetricService.batchCollectMetricsFromZookeeper(clusterPhyId, metricsNames);
if (metricsResult.failed()) {
return Result.buildFromIgnoreData(metricsResult);
}
return Result.buildSuc(metricsResult.getData());
}
@ApiOperation(value = "ZK-多指标历史信息", notes = "多条指标线")
@PostMapping(value = "clusters/{clusterPhyId}/zookeeper-metrics")
@ResponseBody
public Result<List<MetricLineVO>> getMetricsLine(@PathVariable Long clusterPhyId, @RequestBody MetricDTO dto) {
return zookeeperMetricService.listMetricsFromES(clusterPhyId, dto);
}
}

View File

@@ -0,0 +1,47 @@
package com.xiaojukeji.know.streaming.km.task.metadata;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
@Task(name = "SyncZookeeperTask",
description = "ZK信息同步到DB",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class SyncZookeeperTask extends AbstractAsyncMetadataDispatchTask {
private static final ILog log = LogFactory.getLog(SyncZookeeperTask.class);
@Autowired
private ZookeeperService zookeeperService;
@Override
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) {
Result<List<ZookeeperInfo>> infoResult = zookeeperService.listFromZookeeper(
clusterPhy.getId(),
clusterPhy.getZookeeper(),
ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class)
);
if (infoResult.failed()) {
return new TaskResult(TaskResult.FAIL_CODE, infoResult.getMessage());
}
zookeeperService.batchReplaceDataInDB(clusterPhy.getId(), infoResult.getData());
return TaskResult.SUCCESS;
}
}

View File

@@ -0,0 +1,33 @@
package com.xiaojukeji.know.streaming.km.task.metrics;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.collector.metric.ZookeeperMetricCollector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author didi
*/
@Task(name = "ZookeeperMetricCollectorTask",
description = "Zookeeper指标采集任务",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class ZookeeperMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
private static final ILog log = LogFactory.getLog(ZookeeperMetricCollectorTask.class);
@Autowired
private ZookeeperMetricCollector zookeeperMetricCollector;
@Override
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
zookeeperMetricCollector.collectMetrics(clusterPhy);
return TaskResult.SUCCESS;
}
}