Compare commits

..

4 Commits

Author SHA1 Message Date
zengqiao
77b87f1dbe 升级至企业版3.3.0 2023-02-24 17:52:27 +08:00
zengqiao
a82d7f594e 合并3.3.0企业版改动 2023-02-24 17:49:26 +08:00
zengqiao
cca7246281 合并3.3.0分支 2023-02-24 17:13:50 +08:00
zengqiao
c56d8cfb0f 增加rebalance / testing / license能力 2023-02-23 11:56:46 +08:00
307 changed files with 12350 additions and 4570 deletions

View File

@@ -1,43 +0,0 @@
name: KnowStreaming Build
on:
push:
branches: [ "*" ]
pull_request:
branches: [ "*" ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Setup Node
uses: actions/setup-node@v1
with:
node-version: '12.22.12'
- name: Build With Maven
run: mvn -Prelease-package -Dmaven.test.skip=true clean install -U
- name: Get KnowStreaming Version
if: ${{ success() }}
run: |
version=`mvn -Dexec.executable='echo' -Dexec.args='${project.version}' --non-recursive exec:exec -q`
echo "VERSION=${version}" >> $GITHUB_ENV
- name: Upload Binary Package
if: ${{ success() }}
uses: actions/upload-artifact@v3
with:
name: KnowStreaming-${{ env.VERSION }}.tar.gz
path: km-dist/target/KnowStreaming-${{ env.VERSION }}.tar.gz

View File

@@ -90,7 +90,6 @@
- [单机部署手册](docs/install_guide/单机部署手册.md)
- [版本升级手册](docs/install_guide/版本升级手册.md)
- [本地源码启动手册](docs/dev_guide/本地源码启动手册.md)
- [页面无数据排查手册](docs/dev_guide/页面无数据排查手册.md)
**`产品相关手册`**
@@ -101,9 +100,7 @@
**点击 [这里](https://doc.knowstreaming.com/product),也可以从官网获取到更多文档**
**`产品网址`**
- [产品官网https://knowstreaming.com](https://knowstreaming.com)
- [体验环境https://demo.knowstreaming.com](https://demo.knowstreaming.com),登陆账号admin/admin
@@ -146,7 +143,7 @@ PS: 提问请尽量把问题一次性描述清楚,并告知环境信息情况
**`2、微信群`**
微信加群:添加`PenceXie` 的微信号备注KnowStreaming加群。
微信加群:添加`mike_zhangliang``PenceXie``szzdzhp001`的微信号备注KnowStreaming加群。
<br/>
加群之前有劳点一下 star一个小小的 star 是对KnowStreaming作者们努力建设社区的动力。
@@ -158,4 +155,3 @@ PS: 提问请尽量把问题一次性描述清楚,并告知环境信息情况
## Star History
[![Star History Chart](https://api.star-history.com/svg?repos=didi/KnowStreaming&type=Date)](https://star-history.com/#didi/KnowStreaming&Date)

View File

@@ -1,78 +1,4 @@
## v3.4.0
**问题修复**
- [Bugfix]修复 Overview 指标文案错误的错误 ([#1190](https://github.com/didi/KnowStreaming/issues/1190))
- [Bugfix]修复删除 Kafka 集群后Connect 集群任务出现 NPE 问题 ([#1129](https://github.com/didi/KnowStreaming/issues/1129))
- [Bugfix]修复在 Ldap 登录时,设置 auth-user-registration: false 会导致空指针的问题 ([#1117](https://github.com/didi/KnowStreaming/issues/1117))
- [Bugfix]修复 Ldap 登录,调用 user.getId() 出现 NPE 的问题 ([#1108](https://github.com/didi/KnowStreaming/issues/1108))
- [Bugfix]修复前端新增角色失败等问题 ([#1107](https://github.com/didi/KnowStreaming/issues/1107))
- [Bugfix]修复 ZK 四字命令解析错误的问题
- [Bugfix]修复 zk standalone 模式下,状态获取错误的问题
- [Bugfix]修复 Broker 元信息解析方法未调用导致接入集群失败的问题 ([#993](https://github.com/didi/KnowStreaming/issues/993))
- [Bugfix]修复 ConsumerAssignment 类型转换错误的问题
- [Bugfix]修复对 Connect 集群的 clusterUrl 的动态更新导致配置不生效的问题 ([#1079](https://github.com/didi/KnowStreaming/issues/1079))
- [Bugfix]修复消费组不支持重置到最旧 Offset 的问题 ([#1059](https://github.com/didi/KnowStreaming/issues/1059))
- [Bugfix]后端增加查看 User 密码的权限点 ([#1095](https://github.com/didi/KnowStreaming/issues/1095))
- [Bugfix]修复 Connect-JMX 端口维护信息错误的问题 ([#1146](https://github.com/didi/KnowStreaming/issues/1146))
- [Bugfix]修复系统管理子应用无法正常启动的问题 ([#1167](https://github.com/didi/KnowStreaming/issues/1167))
- [Bugfix]修复 Security 模块,权限点缺失问题 ([#1069](https://github.com/didi/KnowStreaming/issues/1069)), ([#1154](https://github.com/didi/KnowStreaming/issues/1154))
- [Bugfix]修复 Connect-Worker Jmx 不生效的问题 ([#1067](https://github.com/didi/KnowStreaming/issues/1067))
- [Bugfix]修复权限 ACL 管理中,消费组列表展示错误的问题 ([#1037](https://github.com/didi/KnowStreaming/issues/1037))
- [Bugfix]修复 Connect 模块没有默认勾选指标的问题([#1022](https://github.com/didi/KnowStreaming/issues/1022)
- [Bugfix]修复 es 索引 create/delete 死循环的问题 ([#1021](https://github.com/didi/KnowStreaming/issues/1021))
- [Bugfix]修复 Connect-GroupDescription 解析失败的问题 ([#1015](https://github.com/didi/KnowStreaming/issues/1015))
- [Bugfix]修复 Prometheus 开放接口中Partition 指标 tag 缺失的问题 ([#1014](https://github.com/didi/KnowStreaming/issues/1014))
- [Bugfix]修复 Topic 消息展示offset 为 0 不显示的问题 ([#1192](https://github.com/didi/KnowStreaming/issues/1192))
- [Bugfix]修复重置offset接口调用过多问题
- [Bugfix]Connect 提交任务变更为只保存用户修改的配置,并修复 JSON 模式下配置展示不全的问题 ([#1158](https://github.com/didi/KnowStreaming/issues/1158))
- [Bugfix]修复消费组 Offset 重置后提示重置成功但是前端不刷新数据Offset 无变化的问题 ([#1090](https://github.com/didi/KnowStreaming/issues/1090))
- [Bugfix]修复未勾选系统管理查看权限,但是依然可以查看系统管理的问题 ([#1105](https://github.com/didi/KnowStreaming/issues/1105))
**产品优化**
- [Optimize]补充接入集群时,可选的 Kafka 版本列表 ([#1204](https://github.com/didi/KnowStreaming/issues/1204))
- [Optimize]GroupTopic 信息修改为实时获取 ([#1196](https://github.com/didi/KnowStreaming/issues/1196))
- [Optimize]增加 AdminClient 观测信息 ([#1111](https://github.com/didi/KnowStreaming/issues/1111))
- [Optimize]增加 Connector 运行状态指标 ([#1110](https://github.com/didi/KnowStreaming/issues/1110))
- [Optimize]统一 DB 元信息更新格式 ([#1127](https://github.com/didi/KnowStreaming/issues/1127)), ([#1125](https://github.com/didi/KnowStreaming/issues/1125)), ([#1006](https://github.com/didi/KnowStreaming/issues/1006))
- [Optimize]日志输出增加支持 MDC方便用户在 logback.xml 中 json 格式化日志 ([#1032](https://github.com/didi/KnowStreaming/issues/1032))
- [Optimize]Jmx 相关日志优化 ([#1082](https://github.com/didi/KnowStreaming/issues/1082))
- [Optimize]Topic-Partitions增加主动超时功能 ([#1076](https://github.com/didi/KnowStreaming/issues/1076))
- [Optimize]Topic-Messages页面后端增加按照Partition和Offset纬度的排序 ([#1075](https://github.com/didi/KnowStreaming/issues/1075))
- [Optimize]Connect-JSON模式下的JSON格式和官方API的格式不一致 ([#1080](https://github.com/didi/KnowStreaming/issues/1080)), ([#1153](https://github.com/didi/KnowStreaming/issues/1153)), ([#1192](https://github.com/didi/KnowStreaming/issues/1192))
- [Optimize]登录页面展示的 star 数量修改为最新的数量
- [Optimize]Group 列表的 maxLag 指标调整为实时获取 ([#1074](https://github.com/didi/KnowStreaming/issues/1074))
- [Optimize]Connector增加重启、编辑、删除等权限点 ([#1066](https://github.com/didi/KnowStreaming/issues/1066)), ([#1147](https://github.com/didi/KnowStreaming/issues/1147))
- [Optimize]优化 pom.xml 中KS版本的标签名
- [Optimize]优化集群Brokers中, Controller显示存在延迟的问题 ([#1162](https://github.com/didi/KnowStreaming/issues/1162))
- [Optimize]bump jackson version to 2.13.5
- [Optimize]权限新增 ACL自定义权限配置资源 TransactionalId 优化 ([#1192](https://github.com/didi/KnowStreaming/issues/1192))
- [Optimize]Connect 样式优化
- [Optimize]消费组详情控制数据实时刷新
**功能新增**
- [Feature]新增删除 Group 或 GroupOffset 功能 ([#1064](https://github.com/didi/KnowStreaming/issues/1064)), ([#1084](https://github.com/didi/KnowStreaming/issues/1084)), ([#1040](https://github.com/didi/KnowStreaming/issues/1040)), ([#1144](https://github.com/didi/KnowStreaming/issues/1144))
- [Feature]增加 Truncate 数据功能 ([#1062](https://github.com/didi/KnowStreaming/issues/1062)), ([#1043](https://github.com/didi/KnowStreaming/issues/1043)), ([#1145](https://github.com/didi/KnowStreaming/issues/1145))
- [Feature]支持指定 Server 的具体 Jmx 端口 ([#965](https://github.com/didi/KnowStreaming/issues/965))
**文档更新**
- [Doc]FAQ 补充 ES 8.x 版本使用说明 ([#1189](https://github.com/didi/KnowStreaming/issues/1189))
- [Doc]补充启动失败的说明 ([#1126](https://github.com/didi/KnowStreaming/issues/1126))
- [Doc]补充 ZK 无数据排查说明 ([#1004](https://github.com/didi/KnowStreaming/issues/1004))
- [Doc]无数据排查文档,补充 ES 集群 Shard 满的异常日志
- [Doc]README 补充页面无数据排查手册链接
- [Doc]补充连接特定 Jmx 端口的说明 ([#965](https://github.com/didi/KnowStreaming/issues/965))
- [Doc]补充 zk_properties 字段的使用说明 ([#1003](https://github.com/didi/KnowStreaming/issues/1003))
---
## v3.3.0
**问题修复**

View File

@@ -47,13 +47,14 @@
**1、`Header` 规范**
`Header` 格式为 `[Type]Message` 主要有三部分组成,分别是`Type``Message`
`Header` 格式为 `[Type]Message(#IssueID)` 主要有三部分组成,分别是`Type``Message``IssueID`
- `Type`:说明这个提交是哪一个类型的,比如有 Bugfix、Feature、Optimize等
- `Message`说明提交的信息比如修复xx问题
- `IssueID`该提交关联的Issue的编号
实际例子:[`[Bugfix]修复新接入的集群Controller-Host不显示的问题`](https://github.com/didi/KnowStreaming/pull/933/commits)
实际例子:[`[Bugfix]修复新接入的集群Controller-Host不显示的问题(#927)`](https://github.com/didi/KnowStreaming/pull/933/commits)
@@ -66,7 +67,7 @@
**3、实际例子**
```
[Optimize]优化 MySQL & ES 测试容器的初始化
[Optimize]优化 MySQL & ES 测试容器的初始化(#906)
主要的变更
1、knowstreaming/knowstreaming-manager 容器;
@@ -137,7 +138,7 @@
1. 切换到主分支:`git checkout github_master`
2. 主分支拉最新代码:`git pull`
3. 基于主分支拉新分支:`git checkout -b fix_928`
4. 提交代码安装commit的规范进行提交例如`git commit -m "[Optimize]优化xxx问题"`
4. 提交代码安装commit的规范进行提交例如`git commit -m "[Optimize]优化xxx问题(#928)"`
5. 提交到自己远端仓库:`git push --set-upstream origin fix_928`
6. `GitHub` 页面发起 `Pull Request` 请求,管理员合入主仓库。这部分详细见下一节;
@@ -161,8 +162,6 @@
### 4.1、如何将多个 Commit-Log 合并为一个?
可以不需要将多个commit合并为一个如果要合并可以使用 `git rebase -i` 命令进行解决。
可以使用 `git rebase -i` 命令进行解决。

View File

@@ -1,115 +0,0 @@
## YML文件MYSQL密码加密存储手册
### 1、本地部署加密
**第一步:生成密文**
在本地仓库中找到jasypt-1.9.3.jar默认在org/jasypt/jasypt/1.9.3中,使用`java -cp`生成密文。
```bash
java -cp jasypt-1.9.3.jar org.jasypt.intf.cli.JasyptPBEStringEncryptionCLI input=mysql密码 password=加密的salt algorithm=PBEWithMD5AndDES
```
```bash
## 得到密文
DYbVDLg5D0WRcJSCUGWjiw==
```
**第二步配置jasypt**
在YML文件中配置jasypt例如
```yaml
jasypt:
encryptor:
algorithm: PBEWithMD5AndDES
iv-generator-classname: org.jasypt.iv.NoIvGenerator
```
**第三步:配置密文**
使用密文替换YML文件中的明文密码为ENC(密文),例如[application.yml](https://github.com/didi/KnowStreaming/blob/master/km-rest/src/main/resources/application.yml)中MYSQL密码。
```yaml
know-streaming:
username: root
password: ENC(DYbVDLg5D0WRcJSCUGWjiw==)
```
**第四步配置加密的salt选择其一**
- 配置在YML文件中不推荐
```yaml
jasypt:
encryptor:
password: salt
```
- 配置程序启动时的命令行参数
```bash
java -jar xxx.jar --jasypt.encryptor.password=salt
```
- 配置程序启动时的环境变量
```bash
export JASYPT_PASSWORD=salt
java -jar xxx.jar --jasypt.encryptor.password=${JASYPT_PASSWORD}
```
## 2、容器部署加密
利用docker swarm 提供的 secret 机制加密存储密码使用docker swarm来管理密码。
### 2.1、secret加密存储
**第一步初始化docker swarm**
```bash
docker swarm init
```
**第二步:创建密钥**
```bash
echo "admin2022_" | docker secret create mysql_password -
# 输出密钥
f964wi4gg946hu78quxsh2ge9
```
**第三步:使用密钥**
```yaml
# mysql用户密码
SERVER_MYSQL_USER: root
SERVER_MYSQL_PASSWORD: mysql_password
knowstreaming-mysql:
# root 用户密码
MYSQL_ROOT_PASSWORD: mysql_password
secrets:
mysql_password:
external: true
```
### 2.2、使用密钥文件加密
**第一步:创建密钥**
```bash
echo "admin2022_" > password
```
**第二步:使用密钥**
```yaml
# mysql用户密码
SERVER_MYSQL_USER: root
SERVER_MYSQL_PASSWORD: mysql_password
secrets:
mysql_password:
file: ./password
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 382 KiB

View File

@@ -6,72 +6,72 @@
### 3.3.1、Cluster 指标
| 指标名称 | 指标单位 | 指标含义 | kafka 版本 | 企业/开源版指标 |
| ------------------------- | -------- |--------------------------------| ---------------- | --------------- |
| HealthScore | 分 | 集群总体的健康分 | 全部版本 | 开源版 |
| HealthCheckPassed | 个 | 集群总体健康检查通过数 | 全部版本 | 开源版 |
| HealthCheckTotal | 个 | 集群总体健康检查总数 | 全部版本 | 开源版 |
| 指标名称 | 指标单位 | 指标含义 | kafka 版本 | 企业/开源版指标 |
| ------------------------- | -------- | ------------------------------------ | ---------------- | --------------- |
| HealthScore | 分 | 集群总体的健康分 | 全部版本 | 开源版 |
| HealthCheckPassed | 个 | 集群总体健康检查通过数 | 全部版本 | 开源版 |
| HealthCheckTotal | 个 | 集群总体健康检查总数 | 全部版本 | 开源版 |
| HealthScore_Topics | 分 | 集群 Topics 的健康分 | 全部版本 | 开源版 |
| HealthCheckPassed_Topics | 个 | 集群 Topics 健康检查通过数 | 全部版本 | 开源版 |
| HealthCheckTotal_Topics | 个 | 集群 Topics 健康检查总数 | 全部版本 | 开源版 |
| HealthCheckPassed_Topics | 个 | 集群 Topics 健康检查通过数 | 全部版本 | 开源版 |
| HealthCheckTotal_Topics | 个 | 集群 Topics 健康检查总数 | 全部版本 | 开源版 |
| HealthScore_Brokers | 分 | 集群 Brokers 的健康分 | 全部版本 | 开源版 |
| HealthCheckPassed_Brokers | 个 | 集群 Brokers 健康检查通过数 | 全部版本 | 开源版 |
| HealthCheckTotal_Brokers | 个 | 集群 Brokers 健康检查总数 | 全部版本 | 开源版 |
| HealthCheckPassed_Brokers | 个 | 集群 Brokers 健康检查通过数 | 全部版本 | 开源版 |
| HealthCheckTotal_Brokers | 个 | 集群 Brokers 健康检查总数 | 全部版本 | 开源版 |
| HealthScore_Groups | 分 | 集群 Groups 的健康分 | 全部版本 | 开源版 |
| HealthCheckPassed_Groups | 个 | 集群 Groups 健康检查总数 | 全部版本 | 开源版 |
| HealthCheckTotal_Groups | 个 | 集群 Groups 健康检查总数 | 全部版本 | 开源版 |
| HealthScore_Cluster | 分 | 集群自身的健康分 | 全部版本 | 开源版 |
| HealthCheckPassed_Cluster | 个 | 集群自身健康检查通过数 | 全部版本 | 开源版 |
| HealthCheckTotal_Cluster | 个 | 集群自身健康检查总数 | 全部版本 | 开源版 |
| TotalRequestQueueSize | 个 | 集群中总的请求队列数 | 全部版本 | 开源版 |
| TotalResponseQueueSize | 个 | 集群中总的响应队列数 | 全部版本 | 开源版 |
| HealthCheckPassed_Groups | 个 | 集群 Groups 健康检查总数 | 全部版本 | 开源版 |
| HealthCheckTotal_Groups | 个 | 集群 Groups 健康检查总数 | 全部版本 | 开源版 |
| HealthScore_Cluster | 分 | 集群自身的健康分 | 全部版本 | 开源版 |
| HealthCheckPassed_Cluster | 个 | 集群自身健康检查通过数 | 全部版本 | 开源版 |
| HealthCheckTotal_Cluster | 个 | 集群自身健康检查总数 | 全部版本 | 开源版 |
| TotalRequestQueueSize | 个 | 集群中总的请求队列数 | 全部版本 | 开源版 |
| TotalResponseQueueSize | 个 | 集群中总的响应队列数 | 全部版本 | 开源版 |
| EventQueueSize | 个 | 集群中 Controller 的 EventQueue 大小 | 2.0.0 及以上版本 | 开源版 |
| ActiveControllerCount | 个 | 集群中存活的 Controller 数 | 全部版本 | 开源版 |
| TotalProduceRequests | 个 | 集群中的 Produce 每秒请求数 | 全部版本 | 开源版 |
| TotalLogSize | byte | 集群总的已使用的磁盘大小 | 全部版本 | 开源版 |
| ConnectionsCount | 个 | 集群的连接(Connections)个数 | 全部版本 | 开源版 |
| Zookeepers | 个 | 集群中存活的 zk 节点个数 | 全部版本 | 开源版 |
| ActiveControllerCount | 个 | 集群中存活的 Controller 数 | 全部版本 | 开源版 |
| TotalProduceRequests | 个 | 集群中的 Produce 每秒请求数 | 全部版本 | 开源版 |
| TotalLogSize | byte | 集群总的已使用的磁盘大小 | 全部版本 | 开源版 |
| ConnectionsCount | 个 | 集群的连接(Connections)个数 | 全部版本 | 开源版 |
| Zookeepers | 个 | 集群中存活的 zk 节点个数 | 全部版本 | 开源版 |
| ZookeepersAvailable | 是/否 | ZK 地址是否合法 | 全部版本 | 开源版 |
| Brokers | 个 | 集群的 broker 的总数 | 全部版本 | 开源版 |
| BrokersAlive | 个 | 集群的 broker 的存活数 | 全部版本 | 开源版 |
| BrokersNotAlive | 个 | 集群的 broker 的未存活数 | 全部版本 | 开源版 |
| BrokersAlive | 个 | 集群的 broker 的存活数 | 全部版本 | 开源版 |
| BrokersNotAlive | 个 | 集群的 broker 的未存活数 | 全部版本 | 开源版 |
| Replicas | 个 | 集群中 Replica 的总数 | 全部版本 | 开源版 |
| Topics | 个 | 集群中 Topic 的总数 | 全部版本 | 开源版 |
| Partitions | 个 | 集群的 Partitions 总数 | 全部版本 | 开源版 |
| Partitions | 个 | 集群的 Partitions 总数 | 全部版本 | 开源版 |
| PartitionNoLeader | 个 | 集群中的 PartitionNoLeader 总数 | 全部版本 | 开源版 |
| PartitionMinISR_S | 个 | 集群中的小于 PartitionMinISR 总数 | 全部版本 | 开源版 |
| PartitionMinISR_E | 个 | 集群中的等于 PartitionMinISR 总数 | 全部版本 | 开源版 |
| PartitionURP | 个 | 集群中的未同步的 Partition 总数 | 全部版本 | 开源版 |
| MessagesIn | 条/s | 集群每消息写入条数 | 全部版本 | 开源版 |
| Messages | 条 | 集群总的消息条数 | 全部版本 | 开源版 |
| LeaderMessages | 条 | 集群中 leader 总的消息条数 | 全部版本 | 开源版 |
| BytesIn | byte/s | 集群的每秒写入字节数 | 全部版本 | 开源版 |
| BytesIn_min_5 | byte/s | 集群的每秒写入字节数5 分钟均值 | 全部版本 | 开源版 |
| BytesIn_min_15 | byte/s | 集群的每秒写入字节数15 分钟均值 | 全部版本 | 开源版 |
| BytesOut | byte/s | 集群的每秒流出字节数 | 全部版本 | 开源版 |
| BytesOut_min_5 | byte/s | 集群的每秒流出字节数5 分钟均值 | 全部版本 | 开源版 |
| BytesOut_min_15 | byte/s | 集群的每秒流出字节数15 分钟均值 | 全部版本 | 开源版 |
| PartitionMinISR_S | 个 | 集群中的小于 PartitionMinISR 总数 | 全部版本 | 开源版 |
| PartitionMinISR_E | 个 | 集群中的等于 PartitionMinISR 总数 | 全部版本 | 开源版 |
| PartitionURP | 个 | 集群中的未同步的 Partition 总数 | 全部版本 | 开源版 |
| MessagesIn | 条/s | 集群每消息写入条数 | 全部版本 | 开源版 |
| Messages | 条 | 集群总的消息条数 | 全部版本 | 开源版 |
| LeaderMessages | 条 | 集群中 leader 总的消息条数 | 全部版本 | 开源版 |
| BytesIn | byte/s | 集群的每秒写入字节数 | 全部版本 | 开源版 |
| BytesIn_min_5 | byte/s | 集群的每秒写入字节数5 分钟均值 | 全部版本 | 开源版 |
| BytesIn_min_15 | byte/s | 集群的每秒写入字节数15 分钟均值 | 全部版本 | 开源版 |
| BytesOut | byte/s | 集群的每秒流出字节数 | 全部版本 | 开源版 |
| BytesOut_min_5 | byte/s | 集群的每秒流出字节数5 分钟均值 | 全部版本 | 开源版 |
| BytesOut_min_15 | byte/s | 集群的每秒流出字节数15 分钟均值 | 全部版本 | 开源版 |
| Groups | 个 | 集群中 Group 的总数 | 全部版本 | 开源版 |
| GroupActives | 个 | 集群中 ActiveGroup 的总数 | 全部版本 | 开源版 |
| GroupEmptys | 个 | 集群中 EmptyGroup 的总数 | 全部版本 | 开源版 |
| GroupRebalances | 个 | 集群中 RebalanceGroup 的总数 | 全部版本 | 开源版 |
| GroupDeads | 个 | 集群中 DeadGroup 的总数 | 全部版本 | 开源版 |
| Alive | 是/否 | 集群是否存活1存活0没有存活 | 全部版本 | 开源版 |
| AclEnable | 是/否 | 集群是否开启 Acl10 | 全部版本 | 开源版 |
| Acls | 个 | ACL 数 | 全部版本 | 开源版 |
| AclUsers | 个 | ACL-KafkaUser 数 | 全部版本 | 开源版 |
| AclTopics | 个 | ACL-Topic 数 | 全部版本 | 开源版 |
| AclGroups | 个 | ACL-Group 数 | 全部版本 | 开源版 |
| Alive | 是/否 | 集群是否存活1存活0没有存活 | 全部版本 | 开源版 |
| AclEnable | 是/否 | 集群是否开启 Acl10否 | 全部版本 | 开源版 |
| Acls | 个 | ACL 数 | 全部版本 | 开源版 |
| AclUsers | 个 | ACL-KafkaUser 数 | 全部版本 | 开源版 |
| AclTopics | 个 | ACL-Topic 数 | 全部版本 | 开源版 |
| AclGroups | 个 | ACL-Group 数 | 全部版本 | 开源版 |
| Jobs | 个 | 集群任务总数 | 全部版本 | 开源版 |
| JobsRunning | 个 | 集群 running 任务总数 | 全部版本 | 开源版 |
| JobsWaiting | 个 | 集群 waiting 任务总数 | 全部版本 | 开源版 |
| JobsSuccess | 个 | 集群 success 任务总数 | 全部版本 | 开源版 |
| JobsFailed | 个 | 集群 failed 任务总数 | 全部版本 | 开源版 |
| LoadReBalanceEnable | 是/否 | 是否开启均衡, 10 | 全部版本 | 企业版 |
| LoadReBalanceCpu | 是/否 | CPU 是否均衡, 10 | 全部版本 | 企业版 |
| LoadReBalanceNwIn | 是/否 | BytesIn 是否均衡, 10 | 全部版本 | 企业版 |
| LoadReBalanceNwOut | 是/否 | BytesOut 是否均衡, 10 | 全部版本 | 企业版 |
| LoadReBalanceDisk | 是/否 | Disk 是否均衡, 10 | 全部版本 | 企业版 |
| LoadReBalanceEnable | 是/否 | 是否开启均衡, 10否 | 全部版本 | 企业版 |
| LoadReBalanceCpu | 是/否 | CPU 是否均衡, 10否 | 全部版本 | 企业版 |
| LoadReBalanceNwIn | 是/否 | BytesIn 是否均衡, 10否 | 全部版本 | 企业版 |
| LoadReBalanceNwOut | 是/否 | BytesOut 是否均衡, 10否 | 全部版本 | 企业版 |
| LoadReBalanceDisk | 是/否 | Disk 是否均衡, 10否 | 全部版本 | 企业版 |
### 3.3.2、Broker 指标

View File

@@ -1,180 +0,0 @@
![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png)
---
# 接入 ZK 带认证的 Kafka 集群
- [接入 ZK 带认证的 Kafka 集群](#接入-zk-带认证的-kafka-集群)
- [1、简要说明](#1简要说明)
- [2、支持 Digest-MD5 认证](#2支持-digest-md5-认证)
- [3、支持 Kerberos 认证](#3支持-kerberos-认证)
## 1、简要说明
- 1、当前 KnowStreaming 暂无页面可以直接配置 ZK 的认证信息,但是 KnowStreaming 的后端预留了 MySQL 的字段用于存储 ZK 的认证信息,用户可通过将认证信息存储至该字段,从而达到支持接入 ZK 带认证的 Kafka 集群。
&nbsp;
- 2、该字段位于 MySQL 库 ks_km_physical_cluster 表中的 zk_properties 字段,该字段的格式是:
```json
{
"openSecure": false, # 是否开启认证开启时配置为true
"sessionTimeoutUnitMs": 15000, # session超时时间
"requestTimeoutUnitMs": 5000, # request超时时间
"otherProps": { # 其他配置,认证信息主要配置在该位置
"zookeeper.sasl.clientconfig": "kafkaClusterZK1" # 例子,
}
}
```
- 3、实际生效的代码位置
```java
// 代码位置https://github.com/didi/KnowStreaming/blob/master/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminZKClient.java
kafkaZkClient = KafkaZkClient.apply(
clusterPhy.getZookeeper(),
zkConfig.getOpenSecure(), // 是否开启认证开启时配置为true
zkConfig.getSessionTimeoutUnitMs(), // session超时时间
zkConfig.getRequestTimeoutUnitMs(), // request超时时间
5,
Time.SYSTEM,
"KS-ZK-ClusterPhyId-" + clusterPhyId,
"KS-ZK-SessionExpireListener-clusterPhyId-" + clusterPhyId,
Option.apply("KS-ZK-ClusterPhyId-" + clusterPhyId),
Option.apply(this.getZKConfig(clusterPhyId, zkConfig.getOtherProps())) // 其他配置,认证信息主要配置在该位置
);
```
- 4、SQL例子
```sql
update ks_km_physical_cluster set zk_properties='{ "openSecure": true, "otherProps": { "zookeeper.sasl.clientconfig": "kafkaClusterZK1" } }' where id=集群1ID;
```
- 5、zk_properties 字段不能覆盖所有的场景,所以实际使用过程中还可能需要在此基础之上,进行其他的调整。比如,`Digest-MD5 认证``Kerberos 认证` 都还需要修改启动脚本等。后续看能否通过修改 ZK 客户端的源码,使得 ZK 认证的相关配置能和 Kafka 认证的配置一样方便。
---
## 2、支持 Digest-MD5 认证
1. 假设你有两个 Kafka 集群, 对应两个 ZK 集群;
2. 两个 ZK 集群的认证信息如下所示
```bash
# ZK1集群的认证信息这里的 kafkaClusterZK1 可以是随意的名称,只需要和后续数据库的配置对应上即可。
kafkaClusterZK1 {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zk1"
password="zk1-passwd";
};
# ZK2集群的认证信息这里的 kafkaClusterZK2 可以是随意的名称,只需要和后续数据库的配置对应上即可。
kafkaClusterZK2 {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zk2"
password="zk2-passwd";
};
```
3. 将这两个ZK集群的认证信息存储到 `/xxx/zk_client_jaas.conf` 文件中,文件中的内容如下所示:
```bash
kafkaClusterZK1 {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zk1"
password="zk1-passwd";
};
kafkaClusterZK2 {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zk2"
password="zk2-passwd";
};
```
4. 修改 KnowStreaming 的启动脚本
```bash
# `KnowStreaming/bin/startup.sh` 中的 47 行的 JAVA_OPT 中追加如下设置
-Djava.security.auth.login.config=/xxx/zk_client_jaas.conf
```
5. 修改 KnowStreaming 的表数据
```sql
# 这里的 kafkaClusterZK1 要和 /xxx/zk_client_jaas.conf 中的对应上
update ks_km_physical_cluster set zk_properties='{ "openSecure": true, "otherProps": { "zookeeper.sasl.clientconfig": "kafkaClusterZK1" } }' where id=集群1ID;
update ks_km_physical_cluster set zk_properties='{ "openSecure": true, "otherProps": { "zookeeper.sasl.clientconfig": "kafkaClusterZK2" } }' where id=集群2ID;
```
6. 重启 KnowStreaming
---
## 3、支持 Kerberos 认证
**第一步查看用户在ZK的ACL**
假设我们使用的用户是 `kafka` 这个用户。
- 1、查看 server.properties 的配置的 zookeeper.connect 的地址;
- 2、使用 `zkCli.sh -serve zookeeper.connect的地址` 登录到ZK页面
- 3、ZK页面上执行命令 `getAcl /kafka` 查看 `kafka` 用户的权限;
此时,我们可以看到如下信息:
![watch_user_acl.png](assets/support_kerberos_zk/watch_user_acl.png)
`kafka` 用户需要的权限是 `cdrwa`。如果用户没有 `cdrwa` 权限的话,需要创建用户并授权,授权命令为:`setAcl`
**第二步创建Kerberos的keytab并修改 KnowStreaming 主机**
- 1、在 Kerberos 的域中创建 `kafka/_HOST``keytab`,并导出。例如:`kafka/dbs-kafka-test-8-53`
- 2、导出 keytab 后上传到安装 KS 的机器的 `/etc/keytab` 下;
- 3、在 KS 机器上,执行 `kinit -kt zookeepe.keytab kafka/dbs-kafka-test-8-53` 看是否能进行 `Kerberos` 登录;
- 4、可以登录后配置 `/opt/zookeeper.jaas` 文件,例子如下:
```bash
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=false
serviceName="zookeeper"
keyTab="/etc/keytab/zookeeper.keytab"
principal="kafka/dbs-kafka-test-8-53@XXX.XXX.XXX";
};
```
- 5、需要配置 `KDC-Server``KnowStreaming` 的机器开通防火墙并在KS的机器 `/etc/host/` 配置 `kdc-server``hostname`。并将 `krb5.conf` 导入到 `/etc` 下;
**第三步:修改 KnowStreaming 的配置**
- 1、修改数据库开启ZK的认证
```sql
update ks_km_physical_cluster set zk_properties='{ "openSecure": true }' where id=集群1ID;
```
- 2、在 `KnowStreaming/bin/startup.sh` 中的47行的JAVA_OPT中追加如下设置
```bash
-Dsun.security.krb5.debug=true -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/zookeeper.jaas
```
- 3、重启KS集群后再 start.out 中看到如下信息则证明Kerberos配置成功
![success_1.png](assets/support_kerberos_zk/success_1.png)
![success_2.png](assets/support_kerberos_zk/success_2.png)
**第四步:补充说明**
- 1、多Kafka集群如果用的是一样的Kerberos域的话只需在每个`ZK`中给`kafka`用户配置`crdwa`权限即可,这样集群初始化的时候`zkclient`是都可以认证;
- 2、多个Kerberos域暂时未适配

View File

@@ -0,0 +1,69 @@
## 支持Kerberos认证的ZK
### 1、修改 KnowStreaming 代码
代码位置:`src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminZKClient.java`
`createZKClient``135行 的 false 改为 true
![need_modify_code.png](assets/support_kerberos_zk/need_modify_code.png)
修改完后重新进行打包编译,打包编译见:[打包编译](https://github.com/didi/KnowStreaming/blob/master/docs/install_guide/%E6%BA%90%E7%A0%81%E7%BC%96%E8%AF%91%E6%89%93%E5%8C%85%E6%89%8B%E5%86%8C.md
)
### 2、查看用户在ZK的ACL
假设我们使用的用户是 `kafka` 这个用户。
- 1、查看 server.properties 的配置的 zookeeper.connect 的地址;
- 2、使用 `zkCli.sh -serve zookeeper.connect的地址` 登录到ZK页面
- 3、ZK页面上执行命令 `getAcl /kafka` 查看 `kafka` 用户的权限;
此时,我们可以看到如下信息:
![watch_user_acl.png](assets/support_kerberos_zk/watch_user_acl.png)
`kafka` 用户需要的权限是 `cdrwa`。如果用户没有 `cdrwa` 权限的话,需要创建用户并授权,授权命令为:`setAcl`
### 3、创建Kerberos的keytab并修改 KnowStreaming 主机
- 1、在 Kerberos 的域中创建 `kafka/_HOST` 的 `keytab`,并导出。例如:`kafka/dbs-kafka-test-8-53`
- 2、导出 keytab 后上传到安装 KS 的机器的 `/etc/keytab` 下;
- 3、在 KS 机器上,执行 `kinit -kt zookeepe.keytab kafka/dbs-kafka-test-8-53` 看是否能进行 `Kerberos` 登录;
- 4、可以登录后配置 `/opt/zookeeper.jaas` 文件,例子如下:
```sql
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=false
serviceName="zookeeper"
keyTab="/etc/keytab/zookeeper.keytab"
principal="kafka/dbs-kafka-test-8-53@XXX.XXX.XXX";
};
```
- 5、需要配置 `KDC-Server` 对 `KnowStreaming` 的机器开通防火墙并在KS的机器 `/etc/host/` 配置 `kdc-server` 的 `hostname`。并将 `krb5.conf` 导入到 `/etc` 下;
### 4、修改 KnowStreaming 的配置
- 1、在 `/usr/local/KnowStreaming/KnowStreaming/bin/startup.sh` 中的47行的JAVA_OPT中追加如下设置
```bash
-Dsun.security.krb5.debug=true -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/zookeeper.jaas
```
- 2、重启KS集群后再 start.out 中看到如下信息则证明Kerberos配置成功
![success_1.png](assets/support_kerberos_zk/success_1.png)
![success_2.png](assets/support_kerberos_zk/success_2.png)
### 5、补充说明
- 1、多Kafka集群如果用的是一样的Kerberos域的话只需在每个`ZK`中给`kafka`用户配置`crdwa`权限即可,这样集群初始化的时候`zkclient`是都可以认证;
- 2、当前需要修改代码重新打包才可以支持后续考虑通过页面支持Kerberos认证的ZK接入
- 3、多个Kerberos域暂时未适配

View File

@@ -0,0 +1,285 @@
## 1、集群接入错误
### 1.1、异常现象
如下图所示,集群非空时,大概率为地址配置错误导致。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_BRiXBvqYFK2dxSF1aqgZ width="80%">
### 1.2、解决方案
接入集群时,依据提示的错误,进行相应的解决。例如:
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_Yn4LhV8aeSEKX1zrrkUi width="50%">
### 1.3、正常情况
接入集群时,页面信息都自动正常出现,没有提示错误。
## 2、JMX连接失败需使用3.0.1及以上版本)
### 2.1异常现象
Broker列表的JMX Port列出现红色感叹号则该Broker的JMX连接异常。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_MLlLCfAktne4X6MBtBUd width="90%">
#### 2.1.1、原因一JMX未开启
##### 2.1.1.1、异常现象
broker列表的JMX Port值为-1对应Broker的JMX未开启。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_E1PD8tPsMeR2zYLFBFAu width="90%">
##### 2.1.1.2、解决方案
开启JMX开启流程如下
1、修改kafka的bin目录下面的`kafka-server-start.sh`文件
```
# 在这个下面增加JMX端口的配置
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT=9999 # 增加这个配置, 这里的数值并不一定是要9999
fi
```
2、修改kafka的bin目录下面对的`kafka-run-class.sh`文件
```
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${当前机器的IP}"
fi
# JMX port to use
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT - Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
fi
```
3、重启Kafka-Broker。
#### 2.1.2、原因二JMX配置错误
##### 2.1.2.1、异常现象
错误日志:
```
# 错误一: 错误提示的是真实的IP这样的话基本就是JMX配置的有问题了。
2021-01-27 10:06:20.730 ERROR 50901 --- [ics-Thread-1-62] c.x.k.m.c.utils.jmx.JmxConnectorWrap : JMX connect exception, host:192.168.0.1 port:9999. java.rmi.ConnectException: Connection refused to host: 192.168.0.1; nested exception is:
# 错误二错误提示的是127.0.0.1这个IP这个是机器的hostname配置的可能有问题。
2021-01-27 10:06:20.730 ERROR 50901 --- [ics-Thread-1-62] c.x.k.m.c.utils.jmx.JmxConnectorWrap : JMX connect exception, host:127.0.0.1 port:9999. java.rmi.ConnectException: Connection refused to host: 127.0.0.1;; nested exception is:
```
##### 2.1.2.2、解决方案
开启JMX开启流程如下
1、修改kafka的bin目录下面的`kafka-server-start.sh`文件
```
# 在这个下面增加JMX端口的配置
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT=9999 # 增加这个配置, 这里的数值并不一定是要9999
fi
```
2、修改kafka的bin目录下面对的`kafka-run-class.sh`文件
```
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${当前机器的IP}"
fi
# JMX port to use
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT - Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
fi
```
3、重启Kafka-Broker。
#### 2.1.3、原因三JMX开启SSL
##### 2.1.3.1、解决方案
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_kNyCi8H9wtHSRkWurB6S width="50%">
#### 2.1.4、原因四连接了错误IP
##### 2.1.4.1、异常现象
Broker 配置了内外网而JMX在配置时可能配置了内网IP或者外网IP此时`KnowStreaming` 需要连接到特定网络的IP才可以进行访问。
比如Broker在ZK的存储结构如下所示我们期望连接到 `endpoints` 中标记为 `INTERNAL` 的地址,但是 `KnowStreaming` 却连接了 `EXTERNAL` 的地址。
```json
{
"listener_security_protocol_map": {
"EXTERNAL": "SASL_PLAINTEXT",
"INTERNAL": "SASL_PLAINTEXT"
},
"endpoints": [
"EXTERNAL://192.168.0.1:7092",
"INTERNAL://192.168.0.2:7093"
],
"jmx_port": 8099,
"host": "192.168.0.1",
"timestamp": "1627289710439",
"port": -1,
"version": 4
}
```
##### 2.1.4.2、解决方案
可以手动往`ks_km_physical_cluster`表的`jmx_properties`字段增加一个`useWhichEndpoint`字段,从而控制 `KnowStreaming` 连接到特定的JMX IP及PORT。
`jmx_properties`格式:
```json
{
"maxConn": 100, // KM对单台Broker的最大JMX连接数
"username": "xxxxx", //用户名,可以不填写
"password": "xxxx", // 密码,可以不填写
"openSSL": true, //开启SSL, true表示开启ssl, false表示关闭
"useWhichEndpoint": "EXTERNAL" //指定要连接的网络名称填写EXTERNAL就是连接endpoints里面的EXTERNAL地址
}
```
SQL例子
```sql
UPDATE ks_km_physical_cluster SET jmx_properties='{ "maxConn": 10, "username": "xxxxx", "password": "xxxx", "openSSL": false , "useWhichEndpoint": "xxx"}' where id={xxx};
```
### 2.2、正常情况
修改完成后,如果看到 JMX PORT这一列全部为绿色则表示JMX已正常。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_ymtDTCiDlzfrmSCez2lx width="90%">
## 3、Elasticsearch问题
注意mac系统在执行curl指令时可能报zsh错误。可参考以下操作。
```
1 进入.zshrc 文件 vim ~/.zshrc
2.在.zshrc中加入 setopt no_nomatch
3.更新配置 source ~/.zshrc
```
### 3.1、原因一:缺少索引
#### 3.1.1、异常现象
报错信息
```
com.didiglobal.logi.elasticsearch.client.model.exception.ESIndexNotFoundException: method [GET], host[http://127.0.0.1:9200], URI [/ks_kafka_broker_metric_2022-10-21,ks_kafka_broker_metric_2022-10-22/_search], status line [HTTP/1.1 404 Not Found]
```
curl http://{ES的IP地址}:{ES的端口号}/_cat/indices/ks_kafka* 查看KS索引列表发现没有索引。
#### 3.1.2、解决方案
执行 [ES索引及模版初始化](https://github.com/didi/KnowStreaming/blob/master/bin/init_es_template.sh) 脚本,来创建索引及模版。
### 3.2、原因二:索引模板错误
#### 3.2.1、异常现象
多集群列表有数据集群详情页图标无数据。查询KS索引模板列表发现不存在。
```
curl {ES的IP地址}:{ES的端口号}/_cat/templates/ks_kafka*?v&h=name
```
正常KS模板如下图所示。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_l79bPYSci9wr6KFwZDA6 width="90%">
#### 3.2.2、解决方案
删除KS索引模板和索引
```
curl -XDELETE {ES的IP地址}:{ES的端口号}/ks_kafka*
curl -XDELETE {ES的IP地址}:{ES的端口号}/_template/ks_kafka*
```
执行 [ES索引及模版初始化](https://github.com/didi/KnowStreaming/blob/master/bin/init_es_template.sh) 脚本,来创建索引及模版。
### 3.3、原因三集群Shard满
#### 3.3.1、异常现象
报错信息
```
com.didiglobal.logi.elasticsearch.client.model.exception.ESIndexNotFoundException: method [GET], host[http://127.0.0.1:9200], URI [/ks_kafka_broker_metric_2022-10-21,ks_kafka_broker_metric_2022-10-22/_search], status line [HTTP/1.1 404 Not Found]
```
尝试手动创建索引失败。
```
#创建ks_kafka_cluster_metric_test索引的指令
curl -s -XPUT http://{ES的IP地址}:{ES的端口号}/ks_kafka_cluster_metric_test
```
#### 3.3.2、解决方案
ES索引的默认分片数量为1000达到数量以后索引创建失败。
+ 扩大ES索引数量上限执行指令
```
curl -XPUT -H"content-type:application/json" http://{ES的IP地址}:{ES的端口号}/_cluster/settings -d '
{
"persistent": {
"cluster": {
"max_shards_per_node":{索引上限默认为1000}
}
}
}'
```
执行 [ES索引及模版初始化](https://github.com/didi/KnowStreaming/blob/master/bin/init_es_template.sh) 脚本,来补全索引。

View File

@@ -2,275 +2,125 @@
![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png)
## JMX-连接失败问题解决
## 2、解决连接 JMX 失败
集群正常接入`KnowStreaming`之后即可以看到集群的Broker列表此时如果查看不了Topic的实时流量或者是Broker的实时流量信息时那么大概率就是`JMX`连接的问题了。
- [2、解决连接 JMX 失败](#2解决连接-jmx-失败)
- [2.1、正异常现象](#21正异常现象)
- [2.2、异因一JMX未开启](#22异因一jmx未开启)
- [2.2.1、异常现象](#221异常现象)
- [2.2.2、解决方案](#222解决方案)
- [2.3、异原二JMX配置错误](#23异原二jmx配置错误)
- [2.3.1、异常现象](#231异常现象)
- [2.3.2、解决方案](#232解决方案)
- [2.4、异因三JMX开启SSL](#24异因三jmx开启ssl)
- [2.4.1、异常现象](#241异常现象)
- [2.4.2、解决方案](#242解决方案)
- [2.5、异因四连接了错误IP](#25异因四连接了错误ip)
- [2.5.1、异常现象](#251异常现象)
- [2.5.2、解决方案](#252解决方案)
- [2.6、异因五:连接了错误端口](#26异因五连接了错误端口)
- [2.6.1、异常现象](#261异常现象)
- [2.6.2、解决方案](#262解决方案)
下面我们按照步骤来一步一步的检查。
### 1、问题说明
**类型一JMX配置未开启**
未开启时,直接到`2、解决方法`查看如何开启即可。
![check_jmx_opened](http://img-ys011.didistatic.com/static/dc2img/do1_dRX6UHE2IUSHqsN95DGb)
背景Kafka 通过 JMX 服务进行运行指标的暴露,因此 `KnowStreaming` 会主动连接 Kafka 的 JMX 服务进行指标采集。如果我们发现页面缺少指标,那么可能原因之一是 Kafka 的 JMX 端口配置的有问题导致指标获取失败,进而页面没有数据。
**类型二:配置错误**
`JMX`端口已经开启的情况下,有的时候开启的配置不正确,此时也会导致出现连接失败的问题。这里大概列举几种原因:
- `JMX`配置错误:见`2、解决方法`
- 存在防火墙或者网络限制:网络通的另外一台机器`telnet`试一下看是否可以连接上。
- 需要进行用户名及密码的认证:见`3、解决方法 —— 认证的JMX`
### 2.1、正异常现象
**1、异常现象**
Broker 列表的 JMX PORT 列出现红色感叹号,则表示 JMX 连接存在异常。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_MLlLCfAktne4X6MBtBUd width="90%">
错误日志例子:
```
# 错误一: 错误提示的是真实的IP这样的话基本就是JMX配置的有问题了。
2021-01-27 10:06:20.730 ERROR 50901 --- [ics-Thread-1-62] c.x.k.m.c.utils.jmx.JmxConnectorWrap : JMX connect exception, host:192.168.0.1 port:9999.
java.rmi.ConnectException: Connection refused to host: 192.168.0.1; nested exception is:
**2、正常现象**
Broker 列表的 JMX PORT 列出现绿色,则表示 JMX 连接正常。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_ymtDTCiDlzfrmSCez2lx width="90%">
---
### 2.2、异因一JMX未开启
#### 2.2.1、异常现象
broker列表的JMX Port值为-1对应Broker的JMX未开启。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_E1PD8tPsMeR2zYLFBFAu width="90%">
#### 2.2.2、解决方案
开启JMX开启流程如下
1、修改kafka的bin目录下面的`kafka-server-start.sh`文件
```bash
# 在这个下面增加JMX端口的配置
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT=9999 # 增加这个配置, 这里的数值并不一定是要9999
fi
# 错误二错误提示的是127.0.0.1这个IP这个是机器的hostname配置的可能有问题。
2021-01-27 10:06:20.730 ERROR 50901 --- [ics-Thread-1-62] c.x.k.m.c.utils.jmx.JmxConnectorWrap : JMX connect exception, host:127.0.0.1 port:9999.
java.rmi.ConnectException: Connection refused to host: 127.0.0.1;; nested exception is:
```
**类型三连接特定IP**
2、修改kafka的bin目录下面对的`kafka-run-class.sh`文件
Broker 配置了内外网而JMX在配置时可能配置了内网IP或者外网IP此时 `KnowStreaming` 需要连接到特定网络的IP才可以进行访问。
```bash
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=当前机器的IP"
fi
比如:
# JMX port to use
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
fi
```
3、重启Kafka-Broker。
---
### 2.3、异原二JMX配置错误
#### 2.3.1、异常现象
错误日志:
```log
# 错误一: 错误提示的是真实的IP这样的话基本就是JMX配置的有问题了。
2021-01-27 10:06:20.730 ERROR 50901 --- [ics-Thread-1-62] c.x.k.m.c.utils.jmx.JmxConnectorWrap : JMX connect exception, host:192.168.0.1 port:9999. java.rmi.ConnectException: Connection refused to host: 192.168.0.1; nested exception is:
# 错误二错误提示的是127.0.0.1这个IP这个是机器的hostname配置的可能有问题。
2021-01-27 10:06:20.730 ERROR 50901 --- [ics-Thread-1-62] c.x.k.m.c.utils.jmx.JmxConnectorWrap : JMX connect exception, host:127.0.0.1 port:9999. java.rmi.ConnectException: Connection refused to host: 127.0.0.1;; nested exception is:
```
#### 2.3.2、解决方案
开启JMX开启流程如下
1、修改kafka的bin目录下面的`kafka-server-start.sh`文件
```bash
# 在这个下面增加JMX端口的配置
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT=9999 # 增加这个配置, 这里的数值并不一定是要9999
fi
```
2、修改kafka的bin目录下面对的`kafka-run-class.sh`文件
```bash
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=当前机器的IP"
fi
# JMX port to use
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
fi
```
3、重启Kafka-Broker。
---
### 2.4、异因三JMX开启SSL
#### 2.4.1、异常现象
```log
# 连接JMX的日志中出现SSL认证失败的相关日志。TODO欢迎补充具体日志案例。
```
#### 2.4.2、解决方案
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_kNyCi8H9wtHSRkWurB6S width="50%">
---
### 2.5、异因四连接了错误IP
#### 2.5.1、异常现象
Broker 配置了内外网而JMX在配置时可能配置了内网IP或者外网IP此时`KnowStreaming` 需要连接到特定网络的IP才可以进行访问。
比如Broker在ZK的存储结构如下所示我们期望连接到 `endpoints` 中标记为 `INTERNAL` 的地址,但是 `KnowStreaming` 却连接了 `EXTERNAL` 的地址。
Broker在ZK的存储结构如下所示我们期望连接到 `endpoints` 中标记为 `INTERNAL` 的地址,但是 `KnowStreaming` 却连接了 `EXTERNAL` 的地址,此时可以看 `4、解决方法 —— JMX连接特定网络` 进行解决。
```json
{
"listener_security_protocol_map": {
"EXTERNAL": "SASL_PLAINTEXT",
"INTERNAL": "SASL_PLAINTEXT"
},
"endpoints": [
"EXTERNAL://192.168.0.1:7092",
"INTERNAL://192.168.0.2:7093"
],
"jmx_port": 8099,
"host": "192.168.0.1",
"timestamp": "1627289710439",
"port": -1,
{
"listener_security_protocol_map": {"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"},
"endpoints": ["EXTERNAL://192.168.0.1:7092","INTERNAL://192.168.0.2:7093"],
"jmx_port": 8099,
"host": "192.168.0.1",
"timestamp": "1627289710439",
"port": -1,
"version": 4
}
}
```
#### 2.5.2、解决方
### 2、解决方
这里仅介绍一下比较通用的解决方式,如若有更好的方式,欢迎大家指导告知一下。
修改`kafka-server-start.sh`文件:
```
# 在这个下面增加JMX端口的配置
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT=9999 # 增加这个配置, 这里的数值并不一定是要9999
fi
```
&nbsp;
修改`kafka-run-class.sh`文件
```
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${当前机器的IP}"
fi
# JMX port to use
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
fi
```
### 3、解决方法 —— 认证的JMX
如果您是直接看的这个部分,建议先看一下上一节:`2、解决方法`以确保`JMX`的配置没有问题了。
`JMX`的配置等都没有问题的情况下,如果是因为认证的原因导致连接不了的,可以在集群接入界面配置你的`JMX`认证信息。
<img src='http://img-ys011.didistatic.com/static/dc2img/do1_EUU352qMEX1Jdp7pxizp' width=350>
### 4、解决方法 —— JMX连接特定网络
可以手动往`ks_km_physical_cluster`表的`jmx_properties`字段增加一个`useWhichEndpoint`字段,从而控制 `KnowStreaming` 连接到特定的JMX IP及PORT。
`jmx_properties`格式:
```json
{
"maxConn": 100, // KM对单台Broker的最大JMX连接数
"username": "xxxxx", //用户名,可以不填写
"password": "xxxx", // 密码,可以不填写
"openSSL": true, //开启SSL, true表示开启ssl, false表示关闭
"useWhichEndpoint": "EXTERNAL" //指定要连接的网络名称填写EXTERNAL就是连接endpoints里面的EXTERNAL地址
"maxConn": 100, # KM对单台Broker的最大JMX连接数
"username": "xxxxx", # 用户名,可以不填写
"password": "xxxx", # 密码,可以不填写
"openSSL": true, # 开启SSL, true表示开启ssl, false表示关闭
"useWhichEndpoint": "EXTERNAL" #指定要连接的网络名称填写EXTERNAL就是连接endpoints里面的EXTERNAL地址
}
```
&nbsp;
SQL例子
```sql
UPDATE ks_km_physical_cluster SET jmx_properties='{ "maxConn": 10, "username": "xxxxx", "password": "xxxx", "openSSL": false , "useWhichEndpoint": "xxx"}' where id={xxx};
```
注意:
---
+ 目前此功能只支持采用 `ZK` 做分布式协调的kafka集群。
### 2.6、异因五:连接了错误端口
3.3.0 以上版本,或者是 master 分支最新代码,才具备该能力。
#### 2.6.1、异常现象
在 AWS 或者是容器上的 Kafka-Broker使用同一个IP但是外部服务想要去连接 JMX 端口时,需要进行映射。因此 KnowStreaming 如果直接连接 ZK 上获取到的 JMX 端口,会连接失败,因此需要具备连接端口可配置的能力。
TODO补充具体的日志。
#### 2.6.2、解决方案
可以手动往`ks_km_physical_cluster`表的`jmx_properties`字段增加一个`specifiedJmxPortList`字段,从而控制 `KnowStreaming` 连接到特定的JMX PORT。
`jmx_properties`格式:
```json
{
"jmxPort": 2445, // 最低优先级使用的jmx端口
"maxConn": 100, // KM对单台Broker的最大JMX连接数
"username": "xxxxx", //用户名,可以不填写
"password": "xxxx", // 密码,可以不填写
"openSSL": true, //开启SSL, true表示开启ssl, false表示关闭
"useWhichEndpoint": "EXTERNAL", //指定要连接的网络名称填写EXTERNAL就是连接endpoints里面的EXTERNAL地址
"specifiedJmxPortList": [ // 配置最高优先使用的jmx端口
{
"serverId": "1", // kafka-broker的brokerId, 注意这个是字符串类型字符串类型的原因是要兼容connect的jmx端口的连接
"jmxPort": 1234 // 该 broker 所连接的jmx端口
},
{
"serverId": "2",
"jmxPort": 1234
},
]
}
```
SQL例子
```sql
UPDATE ks_km_physical_cluster SET jmx_properties='{ "maxConn": 10, "username": "xxxxx", "password": "xxxx", "openSSL": false , "specifiedJmxPortList": [{"serverId": "1", "jmxPort": 1234}] }' where id={xxx};
```
---

View File

@@ -1,183 +0,0 @@
![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png)
# 页面无数据排查手册
- [页面无数据排查手册](#页面无数据排查手册)
- [1、集群接入错误](#1集群接入错误)
- [1.1、异常现象](#11异常现象)
- [1.2、解决方案](#12解决方案)
- [1.3、正常情况](#13正常情况)
- [2、JMX连接失败](#2jmx连接失败)
- [3、ElasticSearch问题](#3elasticsearch问题)
- [3.1、异因一:缺少索引](#31异因一缺少索引)
- [3.1.1、异常现象](#311异常现象)
- [3.1.2、解决方案](#312解决方案)
- [3.2、异因二:索引模板错误](#32异因二索引模板错误)
- [3.2.1、异常现象](#321异常现象)
- [3.2.2、解决方案](#322解决方案)
- [3.3、异因三集群Shard满](#33异因三集群shard满)
- [3.3.1、异常现象](#331异常现象)
- [3.3.2、解决方案](#332解决方案)
---
## 1、集群接入错误
### 1.1、异常现象
如下图所示,集群非空时,大概率为地址配置错误导致。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_BRiXBvqYFK2dxSF1aqgZ width="80%">
### 1.2、解决方案
接入集群时,依据提示的错误,进行相应的解决。例如:
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_Yn4LhV8aeSEKX1zrrkUi width="50%">
### 1.3、正常情况
接入集群时,页面信息都自动正常出现,没有提示错误。
---
## 2、JMX连接失败
背景Kafka 通过 JMX 服务进行运行指标的暴露,因此 `KnowStreaming` 会主动连接 Kafka 的 JMX 服务进行指标采集。如果我们发现页面缺少指标,那么可能原因之一是 Kafka 的 JMX 端口配置的有问题导致指标获取失败,进而页面没有数据。
具体见同目录下的文档:[解决连接JMX失败](./%E8%A7%A3%E5%86%B3%E8%BF%9E%E6%8E%A5JMX%E5%A4%B1%E8%B4%A5.md)
---
## 3、ElasticSearch问题
**背景:**
`KnowStreaming` 将从 Kafka 中采集到的指标存储到 ES 中,如果 ES 存在问题,则也可能会导致页面出现无数据的情况。
**日志:**
`KnowStreaming` 读写 ES 相关日志,在 `logs/es/es.log` 中!
**注意:**
mac系统在执行curl指令时可能报zsh错误。可参考以下操作。
```bash
1 进入.zshrc 文件 vim ~/.zshrc
2.在.zshrc中加入 setopt no_nomatch
3.更新配置 source ~/.zshrc
```
---
### 3.1、异因一:缺少索引
#### 3.1.1、异常现象
报错信息
```log
# 日志位置 logs/es/es.log
com.didiglobal.logi.elasticsearch.client.model.exception.ESIndexNotFoundException: method [GET], host[http://127.0.0.1:9200], URI [/ks_kafka_broker_metric_2022-10-21,ks_kafka_broker_metric_2022-10-22/_search], status line [HTTP/1.1 404 Not Found]
```
`curl http://{ES的IP地址}:{ES的端口号}/_cat/indices/ks_kafka*` 查看KS索引列表发现没有索引。
#### 3.1.2、解决方案
执行 [ES索引及模版初始化](https://github.com/didi/KnowStreaming/blob/master/bin/init_es_template.sh) 脚本,来创建索引及模版。
---
### 3.2、异因二:索引模板错误
#### 3.2.1、异常现象
多集群列表有数据集群详情页图标无数据。查询KS索引模板列表发现不存在。
```bash
curl {ES的IP地址}:{ES的端口号}/_cat/templates/ks_kafka*?v&h=name
```
正常KS模板如下图所示。
<img src=http://img-ys011.didistatic.com/static/dc2img/do1_l79bPYSci9wr6KFwZDA6 width="90%">
#### 3.2.2、解决方案
删除KS索引模板和索引
```bash
curl -XDELETE {ES的IP地址}:{ES的端口号}/ks_kafka*
curl -XDELETE {ES的IP地址}:{ES的端口号}/_template/ks_kafka*
```
执行 [ES索引及模版初始化](https://github.com/didi/KnowStreaming/blob/master/bin/init_es_template.sh) 脚本,来创建索引及模版。
---
### 3.3、异因三集群Shard满
#### 3.3.1、异常现象
报错信息
```log
# 日志位置 logs/es/es.log
{"error":{"root_cause":[{"type":"validation_exception","reason":"Validation Failed: 1: this action would add [4] total shards, but this cluster currently has [1000]/[1000] maximum shards open;"}],"type":"validation_exception","reason":"Validation Failed: 1: this action would add [4] total shards, but this cluster currently has [1000]/[1000] maximum shards open;"},"status":400}
```
尝试手动创建索引失败。
```bash
#创建ks_kafka_cluster_metric_test索引的指令
curl -s -XPUT http://{ES的IP地址}:{ES的端口号}/ks_kafka_cluster_metric_test
```
#### 3.3.2、解决方案
ES索引的默认分片数量为1000达到数量以后索引创建失败。
+ 扩大ES索引数量上限执行指令
```
curl -XPUT -H"content-type:application/json" http://{ES的IP地址}:{ES的端口号}/_cluster/settings -d '
{
"persistent": {
"cluster": {
"max_shards_per_node":{索引上限默认为1000, 测试时可以将其调整为10000}
}
}
}'
```
执行 [ES索引及模版初始化](https://github.com/didi/KnowStreaming/blob/master/bin/init_es_template.sh) 脚本,来补全索引。

View File

@@ -6,67 +6,6 @@
### 升级至 `master` 版本
暂无
---
### 升级至 `3.4.0` 版本
**配置变更**
```yaml
# 新增的配置
request: # 请求相关的配置
api-call: # api调用
timeout-unit-ms: 8000 # 超时时间默认8000毫秒
```
**SQL 变更**
```sql
-- 多集群管理权限2023-06-27新增
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2026', 'Connector-新增', '1593', '1', '2', 'Connector-新增', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2028', 'Connector-编辑', '1593', '1', '2', 'Connector-编辑', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2030', 'Connector-删除', '1593', '1', '2', 'Connector-删除', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2032', 'Connector-重启', '1593', '1', '2', 'Connector-重启', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2034', 'Connector-暂停&恢复', '1593', '1', '2', 'Connector-暂停&恢复', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2026', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2028', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2030', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2032', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2034', '0', 'know-streaming');
-- 多集群管理权限2023-06-29新增
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2036', 'Security-ACL新增', '1593', '1', '2', 'Security-ACL新增', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2038', 'Security-ACL删除', '1593', '1', '2', 'Security-ACL删除', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2040', 'Security-User新增', '1593', '1', '2', 'Security-User新增', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2042', 'Security-User删除', '1593', '1', '2', 'Security-User删除', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2044', 'Security-User修改密码', '1593', '1', '2', 'Security-User修改密码', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2036', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2038', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2040', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2042', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2044', '0', 'know-streaming');
-- 多集群管理权限2023-07-06新增
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2046', 'Group-删除', '1593', '1', '2', 'Group-删除', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2048', 'GroupOffset-Topic纬度删除', '1593', '1', '2', 'GroupOffset-Topic纬度删除', '0', 'know-streaming');
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2050', 'GroupOffset-Partition纬度删除', '1593', '1', '2', 'GroupOffset-Partition纬度删除', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2046', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2048', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2050', '0', 'know-streaming');
-- 多集群管理权限2023-07-18新增
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2052', 'Security-User查看密码', '1593', '1', '2', 'Security-User查看密码', '0', 'know-streaming');
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2052', '0', 'know-streaming');
```
---
### 升级至 `3.3.0` 版本
@@ -127,8 +66,6 @@ ALTER TABLE `ks_km_kafka_change_record` DROP INDEX `idx_cluster_phy_id` ,
ADD INDEX `idx_cluster_update_time` (`cluster_phy_id` ASC, `update_time` ASC);
```
---
### 升级至 `3.2.0` 版本
**配置变更**

View File

@@ -1,37 +1,13 @@
![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png)
# FAQ
- [FAQ](#faq)
- [1、支持哪些 Kafka 版本?](#1支持哪些-kafka-版本)
- [1、2.x 版本和 3.0 版本有什么差异?](#12x-版本和-30-版本有什么差异)
- [3、页面流量信息等无数据](#3页面流量信息等无数据)
- [4、`Jmx`连接失败如何解决?](#4jmx连接失败如何解决)
- [5、有没有 API 文档?](#5有没有-api-文档)
- [6、删除 Topic 成功后,为何过段时间又出现了?](#6删除-topic-成功后为何过段时间又出现了)
- [7、如何在不登录的情况下调用接口](#7如何在不登录的情况下调用接口)
- [8、Specified key was too long; max key length is 767 bytes](#8specified-key-was-too-long-max-key-length-is-767-bytes)
- [9、出现 ESIndexNotFoundEXception 报错](#9出现-esindexnotfoundexception-报错)
- [10、km-console 打包构建失败](#10km-console-打包构建失败)
- [11、在 `km-console` 目录下执行 `npm run start` 时看不到应用构建和热加载过程?如何启动单个应用?](#11在-km-console-目录下执行-npm-run-start-时看不到应用构建和热加载过程如何启动单个应用)
- [12、权限识别失败问题](#12权限识别失败问题)
- [13、接入开启kerberos认证的kafka集群](#13接入开启kerberos认证的kafka集群)
- [14、对接Ldap的配置](#14对接ldap的配置)
- [15、测试时使用Testcontainers的说明](#15测试时使用testcontainers的说明)
- [16、JMX连接失败怎么办](#16jmx连接失败怎么办)
- [17、zk监控无数据问题](#17zk监控无数据问题)
- [18、启动失败报NoClassDefFoundError如何解决](#18启动失败报noclassdeffounderror如何解决)
- [19、依赖ElasticSearch 8.0以上版本部署后指标信息无法正常显示如何解决]
## 1、支持哪些 Kafka 版本?
## 8.1、支持哪些 Kafka 版本?
- 支持 0.10+ 的 Kafka 版本;
- 支持 ZK 及 Raft 运行模式的 Kafka 版本;
&nbsp;
## 1、2.x 版本和 3.0 版本有什么差异?
## 8.1、2.x 版本和 3.0 版本有什么差异?
**全新设计理念**
@@ -47,7 +23,7 @@
&nbsp;
## 3、页面流量信息等无数据
## 8.3、页面流量信息等无数据
- 1、`Broker JMX`未正确开启
@@ -59,13 +35,13 @@
&nbsp;
## 4、`Jmx`连接失败如何解决?
## 8.4、`Jmx`连接失败如何解决?
- 参看 [Jmx 连接配置&问题解决](https://doc.knowstreaming.com/product/9-attachment#91jmx-%E8%BF%9E%E6%8E%A5%E5%A4%B1%E8%B4%A5%E9%97%AE%E9%A2%98%E8%A7%A3%E5%86%B3) 说明。
&nbsp;
## 5、有没有 API 文档?
## 8.5、有没有 API 文档?
`KnowStreaming` 采用 Swagger 进行 API 说明,在启动 KnowStreaming 服务之后,就可以从下面地址看到。
@@ -73,7 +49,7 @@ Swagger-API 地址: [http://IP:PORT/swagger-ui.html#/](http://IP:PORT/swagger-
&nbsp;
## 6、删除 Topic 成功后,为何过段时间又出现了?
## 8.6、删除 Topic 成功后,为何过段时间又出现了?
**原因说明:**
@@ -98,7 +74,7 @@ for (int i= 0; i < 100000; ++i) {
&nbsp;
## 7、如何在不登录的情况下调用接口
## 8.7、如何在不登录的情况下调用接口
步骤一:接口调用时,在 header 中,增加如下信息:
@@ -133,7 +109,7 @@ SECURITY.TRICK_USERS
但是还有一点需要注意,绕过的用户仅能调用他有权限的接口,比如一个普通用户,那么他就只能调用普通的接口,不能去调用运维人员的接口。
## 8、Specified key was too long; max key length is 767 bytes
## 8.8、Specified key was too long; max key length is 767 bytes
**原因:** 不同版本的 InoDB 引擎参数innodb_large_prefix默认值不同即在 5.6 默认值为 OFF5.7 默认值为 ON。
@@ -145,13 +121,13 @@ SECURITY.TRICK_USERS
- 将字符集改为 latin1一个字符=一个字节)。
- 开启innodb_large_prefix修改默认行格式innodb_file_format为 Barracuda并设置 row_format=dynamic。
## 9、出现 ESIndexNotFoundEXception 报错
## 8.9、出现 ESIndexNotFoundEXception 报错
**原因 **没有创建 ES 索引模版
**解决方案:**执行 init_es_template.sh 脚本,创建 ES 索引模版即可。
## 10、km-console 打包构建失败
## 8.10、km-console 打包构建失败
首先,**请确保您正在使用最新版本**,版本列表见 [Tags](https://github.com/didi/KnowStreaming/tags)。如果不是最新版本,请升级后再尝试有无问题。
@@ -185,14 +161,14 @@ Node 版本: v12.22.12
错误截图:
```
## 11、在 `km-console` 目录下执行 `npm run start` 时看不到应用构建和热加载过程?如何启动单个应用?
## 8.11、在 `km-console` 目录下执行 `npm run start` 时看不到应用构建和热加载过程?如何启动单个应用?
需要到具体的应用中执行 `npm run start`,例如 `cd packages/layout-clusters-fe` 后,执行 `npm run start`
应用启动后需要到基座应用中查看(需要启动基座应用,即 layout-clusters-fe
## 12、权限识别失败问题
## 8.12、权限识别失败问题
1、使用admin账号登陆KnowStreaming时点击系统管理-用户管理-角色管理-新增角色,查看页面是否正常。
<img src="http://img-ys011.didistatic.com/static/dc2img/do1_gwGfjN9N92UxzHU8dfzr" width = "400" >
@@ -208,7 +184,7 @@ Node 版本: v12.22.12
+ 解决方案清空数据库数据将数据库字符集调整为utf8最后重新执行[dml-logi.sql](https://github.com/didi/KnowStreaming/blob/master/km-dist/init/sql/dml-logi.sql)脚本导入数据即可。
## 13、接入开启kerberos认证的kafka集群
## 8.13、接入开启kerberos认证的kafka集群
1. 部署KnowStreaming的机器上安装krb客户端
2. 替换/etc/krb5.conf配置文件
@@ -224,7 +200,7 @@ Node 版本: v12.22.12
```
## 14、对接Ldap的配置
## 8.14、对接Ldap的配置
```yaml
# 需要在application.yml中增加如下配置。相关配置的信息按实际情况进行调整
@@ -247,75 +223,6 @@ spring:
login-extend-bean-name: ksLdapLoginService # 表示使用ldap的service
```
## 15、测试时使用Testcontainers的说明
## 8.15、测试时使用Testcontainers的说明
1. 需要docker运行环境 [Testcontainers运行环境说明](https://www.testcontainers.org/supported_docker_environment/)
2. 如果本机没有docker可以使用[远程访问docker](https://docs.docker.com/config/daemon/remote-access/) [Testcontainers配置说明](https://www.testcontainers.org/features/configuration/#customizing-docker-host-detection)
## 16、JMX连接失败怎么办
详细见:[解决连接JMX失败](../dev_guide/%E8%A7%A3%E5%86%B3%E8%BF%9E%E6%8E%A5JMX%E5%A4%B1%E8%B4%A5.md)
## 17、zk监控无数据问题
**现象:**
zookeeper集群正常但Ks上zk页面所有监控指标无数据`KnowStreaming` log_error.log日志提示
```vim
[MetricCollect-Shard-0-8-thread-1] ERROR class=c.x.k.s.k.c.s.h.c.z.HealthCheckZookeeperService||method=checkWatchCount||param=ZookeeperParam(zkAddressList=[Tuple{v1=192.168.xxx.xx, v2=2181}, Tuple{v1=192.168.xxx.xx, v2=2181}, Tuple{v1=192.168.xxx.xx, v2=2181}], zkConfig=null)||config=HealthAmountRatioConfig(amount=100000, ratio=0.8)||result=Result{message='mntr is not executed because it is not in the whitelist.
', code=8031, data=null}||errMsg=get metrics failed, may be collect failed or zk mntr command not in whitelist.
2023-04-23 14:39:07.234 [MetricCollect-Shard-0-8-thread-1] ERROR class=c.x.k.s.k.c.s.h.checker.AbstractHeal
```
原因就很明确了。需要开放zk的四字命令`zoo.cfg`配置文件中添加
```
4lw.commands.whitelist=mntr,stat,ruok,envi,srvr,envi,cons,conf,wchs,wchp
```
建议至少开放上述几个四字命令,当然,您也可以全部开放
```
4lw.commands.whitelist=*
```
## 18、启动失败报NoClassDefFoundError如何解决
**错误现象:**
```log
# 启动失败报nested exception is java.lang.NoClassDefFoundError: Could not initialize class com.didiglobal.logi.job.core.WorkerSingleton$Singleton
2023-08-11 22:54:29.842 [main] ERROR class=org.springframework.boot.SpringApplication||Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'quartzScheduler' defined in class path resource [com/didiglobal/logi/job/LogIJobAutoConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.didiglobal.logi.job.core.Scheduler]: Factory method 'quartzScheduler' threw exception; nested exception is java.lang.NoClassDefFoundError: Could not initialize class com.didiglobal.logi.job.core.WorkerSingleton$Singleton
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:657)
```
**问题原因:**
1. `KnowStreaming` 依赖的 `Logi-Job` 初始化 `WorkerSingleton$Singleton` 失败。
2. `WorkerSingleton$Singleton` 初始化的过程中,会去获取一些操作系统的信息,如果获取时出现了异常,则会导致 `WorkerSingleton$Singleton` 初始化失败。
**临时建议:**
`Logi-Job` 问题的修复时间不好控制,之前我们测试验证了一下,在 `Windows``Mac``CentOS` 这几个操作系统下基本上都是可以正常运行的。
所以,如果有条件的话,可以暂时先使用这几个系统部署 `KnowStreaming`
如果在在 `Windows``Mac``CentOS` 这几个操作系统下也出现了启动失败的问题可以重试2-3次看是否还是启动失败或者换一台机器试试。
## 依赖ElasticSearch 8.0以上版本部署后指标信息无法正常显示如何解决
**错误现象**
```log
Warnings: [299 Elasticsearch-8.9.1-a813d015ef1826148d9d389bd1c0d781c6e349f0 "Legacy index templates are deprecated in favor of composable templates."]
```
**问题原因**
1. ES8.0和ES7.0版本存在Template模式的差异建议使用 /_index_template 端点来管理模板;
2. ES java client在此版本的行为很奇怪表现为读取数据为空
**解决方法**
修改`es_template_create.sh`脚本中所有的`/_template``/_index_template`后执行即可。
2. 如果本机没有docker可以使用[远程访问docker](https://docs.docker.com/config/daemon/remote-access/) [Testcontainers配置说明](https://www.testcontainers.org/features/configuration/#customizing-docker-host-detection)

View File

@@ -5,13 +5,13 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>km-biz</artifactId>
<version>${revision}</version>
<version>${km.revision}</version>
<packaging>jar</packaging>
<parent>
<artifactId>km</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>${revision}</version>
<version>${km.revision}</version>
</parent>
<properties>
@@ -29,6 +29,11 @@
<artifactId>km-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>km-rebalance</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- spring -->
<dependency>

View File

@@ -202,7 +202,7 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
//补充非zk模式的JMXPort信息
if (!clusterPhy.getRunState().equals(ClusterRunStateEnum.RUN_ZK.getRunState())) {
JmxConfig jmxConfig = ConvertUtil.str2ObjByJson(clusterPhy.getJmxProperties(), JmxConfig.class);
voList.forEach(elem -> elem.setJmxPort(jmxConfig.getFinallyJmxPort(String.valueOf(elem.getBrokerId()))));
voList.forEach(elem -> elem.setJmxPort(jmxConfig.getJmxPort() == null ? -1 : jmxConfig.getJmxPort()));
}
return voList;

View File

@@ -62,8 +62,7 @@ public class ClusterZookeepersManagerImpl implements ClusterZookeepersManager {
vo.setTotalObserverCount(0);
vo.setAliveServerCount(0);
for (ZookeeperInfo info: infoList) {
if (info.getRole().equals(ZKRoleEnum.LEADER.getRole()) || info.getRole().equals(ZKRoleEnum.STANDALONE.getRole())) {
// leader 或者 standalone
if (info.getRole().equals(ZKRoleEnum.LEADER.getRole())) {
vo.setLeaderNode(info.getHost());
}

View File

@@ -15,6 +15,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.converter.ClusterVOConverter;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
@@ -24,6 +25,10 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems;
import com.xiaojukeji.know.streaming.km.rebalance.algorithm.model.Resource;
import com.xiaojukeji.know.streaming.km.rebalance.common.BalanceMetricConstant;
import com.xiaojukeji.know.streaming.km.rebalance.common.bean.entity.ClusterBalanceItemState;
import com.xiaojukeji.know.streaming.km.rebalance.core.service.ClusterBalanceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -40,6 +45,9 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
@Autowired
private ClusterMetricService clusterMetricService;
@Autowired
private ClusterBalanceService clusterBalanceService;
@Override
public ClusterPhysState getClusterPhysState() {
List<ClusterPhy> clusterPhyList = clusterPhyService.listAllClusters();
@@ -153,6 +161,11 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
ClusterMetrics clusterMetrics = clusterMetricService.getLatestMetricsFromCache(vo.getId());
clusterMetrics.getMetrics().putIfAbsent(ClusterMetricVersionItems.CLUSTER_METRIC_HEALTH_STATE, (float) HealthStateEnum.UNKNOWN.getDimension());
Result<ClusterMetrics> balanceMetricsResult = this.getClusterLoadReBalanceInfo(vo.getId());
if (balanceMetricsResult.hasData()) {
clusterMetrics.putMetric(balanceMetricsResult.getData().getMetrics());
}
metricsList.add(clusterMetrics);
}
@@ -174,4 +187,21 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
dto.setClusterPhyIds(clusterIdList);
return dto;
}
private Result<ClusterMetrics> getClusterLoadReBalanceInfo(Long clusterPhyId) {
Result<ClusterBalanceItemState> stateResult = clusterBalanceService.getItemStateFromCacheFirst(clusterPhyId);
if (stateResult.failed()) {
return Result.buildFromIgnoreData(stateResult);
}
ClusterBalanceItemState state = stateResult.getData();
ClusterMetrics metric = ClusterMetrics.initWithMetrics(clusterPhyId, BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_ENABLE, state.getEnable()? Constant.YES: Constant.NO);
metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_CPU, state.getResItemState(Resource.CPU).floatValue());
metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_NW_IN, state.getResItemState(Resource.NW_IN).floatValue());
metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_NW_OUT, state.getResItemState(Resource.NW_OUT).floatValue());
metric.putMetric(BalanceMetricConstant.CLUSTER_METRIC_LOAD_RE_BALANCE_DISK, state.getResItemState(Resource.DISK).floatValue());
return Result.buildSuc(metric);
}
}

View File

@@ -12,7 +12,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import org.apache.kafka.connect.runtime.AbstractStatus;
@@ -31,9 +30,6 @@ public class ConnectorManagerImpl implements ConnectorManager {
@Autowired
private ConnectorService connectorService;
@Autowired
private OpConnectorService opConnectorService;
@Autowired
private WorkerConnectorService workerConnectorService;
@@ -48,37 +44,37 @@ public class ConnectorManagerImpl implements ConnectorManager {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Connector参数错误");
}
return opConnectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
return connectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
}
@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
if (ksConnectorResult.failed()) {
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功但是获取元信息失败页面元信息会存在1分钟延迟");
}
opConnectorService.addNewToDB(ksConnectorResult.getData());
connectorService.addNewToDB(ksConnectorResult.getData());
return Result.buildSuc();
}
@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) {
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
if (ksConnectorResult.failed()) {
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功但是获取元信息失败页面元信息会存在1分钟延迟");
}
@@ -87,7 +83,7 @@ public class ConnectorManagerImpl implements ConnectorManager {
connector.setCheckpointConnectorName(checkpointName);
connector.setHeartbeatConnectorName(heartbeatName);
opConnectorService.addNewToDB(connector);
connectorService.addNewToDB(connector);
return Result.buildSuc();
}

View File

@@ -37,7 +37,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
@@ -49,7 +48,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -68,9 +66,6 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
@Autowired
private ConnectorService connectorService;
@Autowired
private OpConnectorService opConnectorService;
@Autowired
private WorkerConnectorService workerConnectorService;
@@ -136,17 +131,17 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
} else if (checkpointResult.failed() && checkpointResult.failed()) {
return Result.buildFromRSAndMsg(
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
String.format("创建 checkpoint & heartbeat 失败.%n失败信息分别为%s%n%n%s", checkpointResult.getMessage(), heartbeatResult.getMessage())
String.format("创建 checkpoint & heartbeat 失败.\n失败信息分别为%s\n\n%s", checkpointResult.getMessage(), heartbeatResult.getMessage())
);
} else if (checkpointResult.failed()) {
return Result.buildFromRSAndMsg(
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
String.format("创建 checkpoint 失败.%n失败信息分别为%s", checkpointResult.getMessage())
String.format("创建 checkpoint 失败.\n失败信息分别为%s", checkpointResult.getMessage())
);
} else{
return Result.buildFromRSAndMsg(
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
String.format("创建 heartbeat 失败.%n失败信息分别为%s", heartbeatResult.getMessage())
String.format("创建 heartbeat 失败.\n失败信息分别为%s", heartbeatResult.getMessage())
);
}
}
@@ -160,20 +155,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}
return opConnectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
return connectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
}
@Override
@@ -185,20 +180,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName()) && dto.getCheckpointConnectorConfigs() != null) {
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
}
if (rv.failed()) {
return rv;
}
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName()) && dto.getHeartbeatConnectorConfigs() != null) {
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
}
if (rv.failed()) {
return rv;
}
return opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
}
@Override
@@ -210,20 +205,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = connectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = connectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}
return opConnectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
return connectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
}
@Override
@@ -235,20 +230,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = connectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = connectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}
return opConnectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
return connectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
}
@Override
@@ -260,20 +255,20 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}
return opConnectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
return connectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
}
@Override
@@ -430,7 +425,7 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
public Result<List<ConnectConfigInfosVO>> validateConnectors(MirrorMakerCreateDTO dto) {
List<ConnectConfigInfosVO> voList = new ArrayList<>();
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getSuitableConfig());
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs());
if (infoResult.failed()) {
return Result.buildFromIgnoreData(infoResult);
}
@@ -484,11 +479,11 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(connectCluster.getKafkaClusterPhyId()));
}
if (!dto.getSuitableConfig().containsKey(CONNECTOR_CLASS_FILED_NAME)) {
if (!dto.getConfigs().containsKey(CONNECTOR_CLASS_FILED_NAME)) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector缺少connector.class");
}
if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getSuitableConfig().getProperty(CONNECTOR_CLASS_FILED_NAME))) {
if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getConfigs().getProperty(CONNECTOR_CLASS_FILED_NAME))) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector的connector.class类型错误");
}
@@ -593,14 +588,16 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
}
}
voList.forEach(elem -> elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName())));
voList.forEach(elem -> {
elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName()));
});
return voList;
}
private List<ClusterMirrorMakerOverviewVO> completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {
Map<String, KSConnectorInfo> connectorInfoMap = new ConcurrentHashMap<>();
Map<String, KSConnectorInfo> connectorInfoMap = new HashMap<>();
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
ApiCallThreadPoolService.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()),
@@ -610,10 +607,12 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
if (connectorInfoRet.hasData()) {
connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData());
}
return connectorInfoRet.getData();
});
}
ApiCallThreadPoolService.waitResult();
ApiCallThreadPoolService.waitResult(1000);
List<ClusterMirrorMakerOverviewVO> newMirrorMakerVOList = new ArrayList<>();
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {

View File

@@ -1,7 +1,6 @@
package com.xiaojukeji.know.streaming.km.biz.group;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
@@ -26,7 +25,7 @@ public interface GroupManager {
String searchGroupKeyword,
PaginationBaseDTO dto);
PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto) throws Exception;
PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto);
PaginationResult<GroupOverviewVO> pagingClusterGroupsOverview(Long clusterPhyId, ClusterGroupSummaryDTO dto);
@@ -40,9 +39,5 @@ public interface GroupManager {
Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator) throws Exception;
Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception;
@Deprecated
List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList);
List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList, Integer timeoutUnitMs);
}

View File

@@ -4,7 +4,6 @@ import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.biz.group.GroupManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
@@ -18,9 +17,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberConsume
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberDescription;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
@@ -36,7 +32,6 @@ import com.xiaojukeji.know.streaming.km.common.converter.GroupConverter;
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
@@ -45,14 +40,11 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.config.KSConfigUtils;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.group.OpGroupService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems;
import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.GroupMetricESDAO;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
@@ -60,14 +52,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum.CONNECT_CLUSTER_PROTOCOL_TYPE;
@Component
public class GroupManagerImpl implements GroupManager {
private static final ILog LOGGER = LogFactory.getLog(GroupManagerImpl.class);
private static final ILog log = LogFactory.getLog(GroupManagerImpl.class);
@Autowired
private TopicService topicService;
@@ -75,9 +66,6 @@ public class GroupManagerImpl implements GroupManager {
@Autowired
private GroupService groupService;
@Autowired
private OpGroupService opGroupService;
@Autowired
private PartitionService partitionService;
@@ -90,9 +78,6 @@ public class GroupManagerImpl implements GroupManager {
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private KSConfigUtils ksConfigUtils;
@Override
public PaginationResult<GroupTopicOverviewVO> pagingGroupMembers(Long clusterPhyId,
String topicName,
@@ -100,33 +85,20 @@ public class GroupManagerImpl implements GroupManager {
String searchTopicKeyword,
String searchGroupKeyword,
PaginationBaseDTO dto) {
long startTimeUnitMs = System.currentTimeMillis();
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, groupName, searchTopicKeyword, searchGroupKeyword, dto);
if (!paginationResult.hasData()) {
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
}
List<GroupTopicOverviewVO> groupTopicVOList = this.getGroupTopicOverviewVOList(
clusterPhyId,
paginationResult.getData().getBizData(),
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间
);
List<GroupTopicOverviewVO> groupTopicVOList = this.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
}
@Override
public PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto) throws Exception {
long startTimeUnitMs = System.currentTimeMillis();
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null) {
return PaginationResult.buildFailure(MsgConstant.getClusterPhyNotExist(clusterPhyId), dto);
}
Group group = groupService.getGroupFromKafka(clusterPhy, groupName);
public PaginationResult<GroupTopicOverviewVO> pagingGroupTopicMembers(Long clusterPhyId, String groupName, PaginationBaseDTO dto) {
Group group = groupService.getGroupFromDB(clusterPhyId, groupName);
//没有topicMember则直接返回
if (group == null || ValidateUtils.isEmptyList(group.getTopicMembers())) {
@@ -141,14 +113,7 @@ public class GroupManagerImpl implements GroupManager {
List<GroupMemberPO> groupMemberPOList = paginationResult.getData().getBizData().stream().map(elem -> new GroupMemberPO(clusterPhyId, elem.getTopicName(), groupName, group.getState().getState(), elem.getMemberCount())).collect(Collectors.toList());
return PaginationResult.buildSuc(
this.getGroupTopicOverviewVOList(
clusterPhyId,
groupMemberPOList,
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间
),
paginationResult
);
return PaginationResult.buildSuc(this.getGroupTopicOverviewVOList(clusterPhyId, groupMemberPOList), paginationResult);
}
@Override
@@ -156,7 +121,7 @@ public class GroupManagerImpl implements GroupManager {
List<Group> groupList = groupService.listClusterGroups(clusterPhyId);
// 类型转化
List<GroupOverviewVO> voList = groupList.stream().map(GroupConverter::convert2GroupOverviewVO).collect(Collectors.toList());
List<GroupOverviewVO> voList = groupList.stream().map(elem -> GroupConverter.convert2GroupOverviewVO(elem)).collect(Collectors.toList());
// 搜索groupName
voList = PaginationUtil.pageByFuzzyFilter(voList, dto.getSearchGroupName(), Arrays.asList("name"));
@@ -203,10 +168,9 @@ public class GroupManagerImpl implements GroupManager {
// 转换存储格式
Map<TopicPartition, KSMemberDescription> tpMemberMap = new HashMap<>();
// 如果不是connect集群
//如果不是connect集群
if (!groupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) {
for (KSMemberDescription description : groupDescription.members()) {
// 如果是 Consumer 的 Description ,则 Assignment 的类型为 KSMemberConsumerAssignment 的
KSMemberConsumerAssignment assignment = (KSMemberConsumerAssignment) description.assignment();
for (TopicPartition tp : assignment.topicPartitions()) {
tpMemberMap.put(tp, description);
@@ -281,52 +245,6 @@ public class GroupManagerImpl implements GroupManager {
return groupService.resetGroupOffsets(dto.getClusterId(), dto.getGroupName(), offsetMapResult.getData(), operator);
}
@Override
public Result<Void> deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(dto.getClusterPhyId());
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(dto.getClusterPhyId()));
}
// 按照group纬度进行删除
if (ValidateUtils.isBlank(dto.getGroupName())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "groupName不允许为空");
}
if (DeleteGroupTypeEnum.GROUP.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupOffset(
new DeleteGroupParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP),
operator
);
}
// 按照topic纬度进行删除
if (ValidateUtils.isBlank(dto.getTopicName())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "topicName不允许为空");
}
if (DeleteGroupTypeEnum.GROUP_TOPIC.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupTopicOffset(
new DeleteGroupTopicParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName()),
operator
);
}
// 按照partition纬度进行删除
if (ValidateUtils.isNullOrLessThanZero(dto.getPartitionId())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "partitionId不允许为空或小于0");
}
if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.getCode().equals(dto.getDeleteType())) {
return opGroupService.deleteGroupTopicPartitionOffset(
new DeleteGroupTopicPartitionParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName(), dto.getPartitionId()),
operator
);
}
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "deleteType类型错误");
}
@Override
public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> groupMemberPOList) {
// 获取指标
@@ -338,54 +256,11 @@ public class GroupManagerImpl implements GroupManager {
);
if (metricsListResult.failed()) {
// 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回
LOGGER.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
log.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult);
}
return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData());
}
@Override
public List<GroupTopicOverviewVO> getGroupTopicOverviewVOList(Long clusterPhyId, List<GroupMemberPO> poList, Integer timeoutUnitMs) {
Set<String> requestedGroupSet = new HashSet<>();
// 获取指标
Map<String, Map<String, Float>> groupTopicLagMap = new ConcurrentHashMap<>();
poList.forEach(elem -> {
if (requestedGroupSet.contains(elem.getGroupName())) {
// 该Group已经处理过
return;
}
requestedGroupSet.add(elem.getGroupName());
ApiCallThreadPoolService.runnableTask(
String.format("clusterPhyId=%d||groupName=%s||msg=getGroupTopicLag", clusterPhyId, elem.getGroupName()),
timeoutUnitMs,
() -> {
Result<List<GroupMetrics>> listResult = groupMetricService.collectGroupMetricsFromKafka(clusterPhyId, elem.getGroupName(), GroupMetricVersionItems.GROUP_METRIC_LAG);
if (listResult == null || !listResult.hasData()) {
return;
}
Map<String, Float> lagMetricMap = new HashMap<>();
listResult.getData().forEach(item -> {
Float newLag = item.getMetric(GroupMetricVersionItems.GROUP_METRIC_LAG);
if (newLag == null) {
return;
}
Float oldLag = lagMetricMap.getOrDefault(item.getTopic(), newLag);
lagMetricMap.put(item.getTopic(), Math.max(oldLag, newLag));
});
groupTopicLagMap.put(elem.getGroupName(), lagMetricMap);
}
);
});
ApiCallThreadPoolService.waitResult();
return this.convert2GroupTopicOverviewVOList(poList, groupTopicLagMap);
}
/**************************************************** private method ****************************************************/
@@ -439,22 +314,13 @@ public class GroupManagerImpl implements GroupManager {
metricsList = new ArrayList<>();
}
// <GroupName, <TopicName, lag>>
Map<String, Map<String, Float>> metricsMap = new HashMap<>();
// <GroupName, <TopicName, GroupMetrics>>
Map<String, Map<String, GroupMetrics>> metricsMap = new HashMap<>();
metricsList.stream().forEach(elem -> {
Float metricValue = elem.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG);
if (metricValue == null) {
return;
}
metricsMap.putIfAbsent(elem.getGroup(), new HashMap<>());
metricsMap.get(elem.getGroup()).put(elem.getTopic(), metricValue);
metricsMap.get(elem.getGroup()).put(elem.getTopic(), elem);
});
return this.convert2GroupTopicOverviewVOList(poList, metricsMap);
}
private List<GroupTopicOverviewVO> convert2GroupTopicOverviewVOList(List<GroupMemberPO> poList, Map<String, Map<String, Float>> metricsMap) {
List<GroupTopicOverviewVO> voList = new ArrayList<>();
for (GroupMemberPO po: poList) {
GroupTopicOverviewVO vo = ConvertUtil.obj2Obj(po, GroupTopicOverviewVO.class);
@@ -462,9 +328,9 @@ public class GroupManagerImpl implements GroupManager {
continue;
}
Float metricValue = metricsMap.getOrDefault(po.getGroupName(), new HashMap<>()).get(po.getTopicName());
if (metricValue != null) {
vo.setMaxLag(ConvertUtil.Float2Long(metricValue));
GroupMetrics metrics = metricsMap.getOrDefault(po.getGroupName(), new HashMap<>()).get(po.getTopicName());
if (metrics != null) {
vo.setMaxLag(ConvertUtil.Float2Long(metrics.getMetrics().get(GroupMetricVersionItems.GROUP_METRIC_LAG)));
}
voList.add(vo);

View File

@@ -19,9 +19,4 @@ public interface OpTopicManager {
* 扩分区
*/
Result<Void> expandTopic(TopicExpansionDTO dto, String operator);
/**
* 清空Topic
*/
Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator);
}

View File

@@ -10,12 +10,10 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
@@ -158,16 +156,6 @@ public class OpTopicManagerImpl implements OpTopicManager {
return rv;
}
@Override
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
// 清空Topic
Result<Void> rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator);
if (rv.failed()) {
return rv;
}
return Result.buildSuc();
}
/**************************************************** private method ****************************************************/

View File

@@ -28,7 +28,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.constant.PaginationConstant;
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
@@ -39,7 +38,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.config.KSConfigUtils;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
@@ -47,7 +45,8 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems;
import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
@@ -61,7 +60,7 @@ import java.util.stream.Collectors;
@Component
public class TopicStateManagerImpl implements TopicStateManager {
private static final ILog LOGGER = LogFactory.getLog(TopicStateManagerImpl.class);
private static final ILog log = LogFactory.getLog(TopicStateManagerImpl.class);
@Autowired
private TopicService topicService;
@@ -90,9 +89,6 @@ public class TopicStateManagerImpl implements TopicStateManager {
@Autowired
private GroupManager groupManager;
@Autowired
private KSConfigUtils ksConfigUtils;
@Override
public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException {
Topic topic = topicService.getTopic(clusterPhyId, topicName);
@@ -105,7 +101,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
TopicBrokerAllVO allVO = new TopicBrokerAllVO();
allVO.setTotal(topic.getBrokerIdSet().size());
allVO.setLive((int)brokerMap.values().stream().filter(Broker::alive).count());
allVO.setLive((int)brokerMap.values().stream().filter(elem -> elem.alive()).count());
allVO.setDead(allVO.getTotal() - allVO.getLive());
allVO.setPartitionCount(topic.getPartitionNum());
@@ -157,28 +153,97 @@ public class TopicStateManagerImpl implements TopicStateManager {
return Result.buildFromIgnoreData(endOffsetsMapResult);
}
// 数据采集
List<TopicRecordVO> voList = this.getTopicMessages(clusterPhy, topicName, beginOffsetsMapResult.getData(), endOffsetsMapResult.getData(), startTime, dto);
List<TopicRecordVO> voList = new ArrayList<>();
// 排序
if (ValidateUtils.isBlank(dto.getSortType())) {
// 默认按时间倒序排序
dto.setSortType(SortTypeEnum.DESC.getSortType());
}
if (ValidateUtils.isBlank(dto.getSortField())) {
// 默认按照timestampUnitMs字段排序
dto.setSortField(PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD);
}
KafkaConsumer<String, String> kafkaConsumer = null;
try {
// 创建kafka-consumer
kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()));
if (PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD.equals(dto.getSortField())) {
// 如果是时间类型则第二排序规则是offset
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType(), PaginationConstant.TOPIC_RECORDS_OFFSET_SORTED_FIELD, dto.getSortType());
} else {
// 如果是非时间类型,则第二排序规则是时间
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType(), PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD, dto.getSortType());
}
List<TopicPartition> partitionList = new ArrayList<>();
long maxMessage = 0;
for (Map.Entry<TopicPartition, Long> entry : endOffsetsMapResult.getData().entrySet()) {
long begin = beginOffsetsMapResult.getData().get(entry.getKey());
long end = entry.getValue();
if (begin == end){
continue;
}
maxMessage += end - begin;
partitionList.add(entry.getKey());
}
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
kafkaConsumer.assign(partitionList);
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
// 获取指定时间每个分区的offset按指定开始时间查询消息时
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
partitionList.forEach(topicPartition -> {
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
});
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
}
for (TopicPartition partition : partitionList) {
if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
// 重置到最旧
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
} else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
// 重置到指定时间
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
} else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
// 重置到指定位置
} else {
// 默认,重置到最新
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
}
}
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时如果这里不减去则可能会导致poll之后超过要求的时间
while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
continue;
}
voList.add(TopicVOConverter.convert2TopicRecordVO(topicName, consumerRecord));
if (voList.size() >= dto.getMaxRecords()) {
break;
}
}
// 超时则返回
if (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS > dto.getPullTimeoutUnitMs()
|| voList.size() > dto.getMaxRecords()) {
break;
}
}
// 排序
if (ObjectUtils.isNotEmpty(voList)) {
// 默认按时间倒序排序
if (StringUtils.isBlank(dto.getSortType())) {
dto.setSortType(SortTypeEnum.DESC.getSortType());
}
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
}
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
} catch (Exception e) {
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);
throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED);
} finally {
if (kafkaConsumer != null) {
try {
kafkaConsumer.close(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
} catch (Exception e) {
// ignore
}
}
}
}
@Override
@@ -233,37 +298,26 @@ public class TopicStateManagerImpl implements TopicStateManager {
@Override
public Result<List<TopicPartitionVO>> getTopicPartitions(Long clusterPhyId, String topicName, List<String> metricsNames) {
long startTime = System.currentTimeMillis();
List<Partition> partitionList = partitionService.listPartitionByTopic(clusterPhyId, topicName);
if (ValidateUtils.isEmptyList(partitionList)) {
return Result.buildSuc();
}
Result<List<PartitionMetrics>> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames);
if (metricsResult.failed()) {
// 仅打印错误日志,但是不直接返回错误
log.error(
"method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from es failed",
clusterPhyId, topicName, metricsResult
);
}
// 转map
Map<Integer, PartitionMetrics> metricsMap = new HashMap<>();
ApiCallThreadPoolService.runnableTask(
String.format("clusterPhyId=%d||topicName=%s||method=getTopicPartitions", clusterPhyId, topicName),
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTime),
() -> {
Result<List<PartitionMetrics>> metricsResult = partitionMetricService.collectPartitionsMetricsFromKafka(clusterPhyId, topicName, metricsNames);
if (metricsResult.failed()) {
// 仅打印错误日志,但是不直接返回错误
LOGGER.error(
"method=getTopicPartitions||clusterPhyId={}||topicName={}||result={}||msg=get metrics from kafka failed",
clusterPhyId, topicName, metricsResult
);
}
for (PartitionMetrics metrics: metricsResult.getData()) {
metricsMap.put(metrics.getPartitionId(), metrics);
}
}
);
boolean finished = ApiCallThreadPoolService.waitResultAndReturnFinished(1);
if (!finished && metricsMap.isEmpty()) {
// 未完成 -> 打印日志
LOGGER.error("method=getTopicPartitions||clusterPhyId={}||topicName={}||msg=get metrics from kafka failed", clusterPhyId, topicName);
if (metricsResult.hasData()) {
for (PartitionMetrics metrics: metricsResult.getData()) {
metricsMap.put(metrics.getPartitionId(), metrics);
}
}
List<TopicPartitionVO> voList = new ArrayList<>();
@@ -282,7 +336,7 @@ public class TopicStateManagerImpl implements TopicStateManager {
// Broker统计信息
vo.setBrokerCount(brokerMap.size());
vo.setLiveBrokerCount((int)brokerMap.values().stream().filter(Broker::alive).count());
vo.setLiveBrokerCount((int)brokerMap.values().stream().filter(elem -> elem.alive()).count());
vo.setDeadBrokerCount(vo.getBrokerCount() - vo.getLiveBrokerCount());
// Partition统计信息
@@ -306,19 +360,13 @@ public class TopicStateManagerImpl implements TopicStateManager {
@Override
public PaginationResult<GroupTopicOverviewVO> pagingTopicGroupsOverview(Long clusterPhyId, String topicName, String searchGroupName, PaginationBaseDTO dto) {
long startTimeUnitMs = System.currentTimeMillis();
PaginationResult<GroupMemberPO> paginationResult = groupService.pagingGroupMembers(clusterPhyId, topicName, "", "", searchGroupName, dto);
if (!paginationResult.hasData()) {
return PaginationResult.buildSuc(new ArrayList<>(), paginationResult);
}
List<GroupTopicOverviewVO> groupTopicVOList = groupManager.getGroupTopicOverviewVOList(
clusterPhyId,
paginationResult.getData().getBizData(),
ksConfigUtils.getApiCallLeftTimeUnitMs(System.currentTimeMillis() - startTimeUnitMs) // 超时时间
);
List<GroupTopicOverviewVO> groupTopicVOList = groupManager.getGroupTopicOverviewVOList(clusterPhyId, paginationResult.getData().getBizData());
return PaginationResult.buildSuc(groupTopicVOList, paginationResult);
}
@@ -338,8 +386,11 @@ public class TopicStateManagerImpl implements TopicStateManager {
// ignore
return true;
}
if (filterValue != null && consumerRecord.value() != null && !consumerRecord.value().contains(filterValue)) {
return true;
}
return (filterValue != null && consumerRecord.value() != null && !consumerRecord.value().contains(filterValue));
return false;
}
private TopicBrokerSingleVO getTopicBrokerSingle(Long clusterPhyId,
@@ -399,90 +450,4 @@ public class TopicStateManagerImpl implements TopicStateManager {
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Math.max(2, Math.min(5, maxPollRecords)));
return props;
}
private List<TopicRecordVO> getTopicMessages(ClusterPhy clusterPhy,
String topicName,
Map<TopicPartition, Long> beginOffsetsMap,
Map<TopicPartition, Long> endOffsetsMap,
long startTime,
TopicRecordDTO dto) throws AdminOperateException {
List<TopicRecordVO> voList = new ArrayList<>();
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()))) {
// 移动到指定位置
long maxMessage = this.assignAndSeekToSpecifiedOffset(kafkaConsumer, beginOffsetsMap, endOffsetsMap, dto);
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时如果这里不减去则可能会导致poll之后超过要求的时间
while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
continue;
}
voList.add(TopicVOConverter.convert2TopicRecordVO(topicName, consumerRecord));
if (voList.size() >= dto.getMaxRecords()) {
break;
}
}
// 超时则返回
if (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS > dto.getPullTimeoutUnitMs()
|| voList.size() > dto.getMaxRecords()) {
break;
}
}
return voList;
} catch (Exception e) {
LOGGER.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e);
throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED);
}
}
private long assignAndSeekToSpecifiedOffset(KafkaConsumer<String, String> kafkaConsumer,
Map<TopicPartition, Long> beginOffsetsMap,
Map<TopicPartition, Long> endOffsetsMap,
TopicRecordDTO dto) {
List<TopicPartition> partitionList = new ArrayList<>();
long maxMessage = 0;
for (Map.Entry<TopicPartition, Long> entry : endOffsetsMap.entrySet()) {
long begin = beginOffsetsMap.get(entry.getKey());
long end = entry.getValue();
if (begin == end){
continue;
}
maxMessage += end - begin;
partitionList.add(entry.getKey());
}
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
kafkaConsumer.assign(partitionList);
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
// 获取指定时间每个分区的offset按指定开始时间查询消息时
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
partitionList.forEach(topicPartition -> timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs()));
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
}
for (TopicPartition partition : partitionList) {
if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
// 重置到最旧
kafkaConsumer.seek(partition, beginOffsetsMap.get(partition));
} else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
// 重置到指定时间
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
} else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
// 重置到指定位置
} else {
// 默认,重置到最新
kafkaConsumer.seek(partition, Math.max(beginOffsetsMap.get(partition), endOffsetsMap.get(partition) - dto.getMaxRecords()));
}
}
return maxMessage;
}
}

View File

@@ -35,8 +35,6 @@ import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafk
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectClusterMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectorMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ZookeeperMetricVersionItems.*;
@Service
@@ -125,42 +123,6 @@ public class VersionControlManagerImpl implements VersionControlManager {
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_RECORD_RATE, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_MIRROR_MAKER.getCode(), MIRROR_MAKER_METRIC_REPLICATION_LATENCY_MS_MAX, true));
// Connect Cluster
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_CONNECTOR_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_TASK_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_CONNECTOR_STARTUP_ATTEMPTS_TOTAL, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_CONNECTOR_STARTUP_FAILURE_PERCENTAGE, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_CONNECTOR_STARTUP_FAILURE_TOTAL, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_TASK_STARTUP_ATTEMPTS_TOTAL, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_TASK_STARTUP_FAILURE_PERCENTAGE, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_TASK_STARTUP_FAILURE_TOTAL, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CLUSTER.getCode(), CONNECT_CLUSTER_METRIC_COLLECT_COST_TIME, true));
// Connect Connector
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_HEALTH_STATE, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_HEALTH_CHECK_PASSED, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_HEALTH_CHECK_TOTAL, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_COLLECT_COST_TIME, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_CONNECTOR_TOTAL_TASK_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_CONNECTOR_RUNNING_TASK_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_CONNECTOR_FAILED_TASK_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_ACTIVE_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_POLL_TOTAL, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_WRITE_TOTAL, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_ACTIVE_COUNT, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_READ_TOTAL, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_SEND_TOTAL, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_DEADLETTERQUEUE_PRODUCE_FAILURES, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_DEADLETTERQUEUE_PRODUCE_REQUESTS, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_TOTAL_ERRORS_LOGGED, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_POLL_RATE, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SOURCE_RECORD_WRITE_RATE, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_READ_RATE, true));
defaultMetrics.add(new UserMetricConfig(METRIC_CONNECT_CONNECTOR.getCode(), CONNECTOR_METRIC_SINK_RECORD_SEND_RATE, true));
}
@Autowired

View File

@@ -5,13 +5,13 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>km-collector</artifactId>
<version>${revision}</version>
<version>${km.revision}</version>
<packaging>jar</packaging>
<parent>
<artifactId>km</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>${revision}</version>
<version>${km.revision}</version>
</parent>
<dependencies>

View File

@@ -44,7 +44,7 @@ public class ConnectConnectorMetricCollector extends AbstractConnectMetricCollec
Long connectClusterId = connectCluster.getId();
List<VersionControlItem> items = versionControlService.listVersionControlItem(this.getClusterVersion(connectCluster), collectorType().getCode());
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectCluster);
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectClusterId);
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(connectClusterId);

View File

@@ -5,13 +5,13 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>km-common</artifactId>
<version>${revision}</version>
<version>${km.revision}</version>
<packaging>jar</packaging>
<parent>
<artifactId>km</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>${revision}</version>
<version>${km.revision}</version>
</parent>
<properties>

View File

@@ -1,12 +1,12 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotNull;
import java.util.Properties;
/**
@@ -14,23 +14,15 @@ import java.util.Properties;
* @date 2022-10-17
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@NoArgsConstructor
@ApiModel(description = "创建Connector")
public class ConnectorCreateDTO extends ClusterConnectorDTO {
@Deprecated
@ApiModelProperty(value = "配置, 优先使用config字段3.5.0版本将删除该字段", example = "")
@NotNull(message = "configs不允许为空")
@ApiModelProperty(value = "配置", example = "")
protected Properties configs;
@ApiModelProperty(value = "配置", example = "")
protected Properties config;
public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties config) {
public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties configs) {
super(connectClusterId, connectorName);
this.config = config;
}
public Properties getSuitableConfig() {
return config != null? config: configs;
this.configs = configs;
}
}

View File

@@ -40,7 +40,7 @@ public class MirrorMakerCreateDTO extends ConnectorCreateDTO {
targetKafkaProps = new Properties();
}
this.unifyData(this.getSuitableConfig(), sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);
this.unifyData(this.configs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);
if (heartbeatConnectorConfigs != null) {
this.unifyData(this.heartbeatConnectorConfigs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);

View File

@@ -1,40 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.group;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
/**
* 删除offset
* @author zengqiao
* @date 19/4/8
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class GroupOffsetDeleteDTO extends BaseDTO {
@Min(value = 0, message = "clusterPhyId不允许为null或者小于0")
@ApiModelProperty(value = "集群ID", example = "6")
private Long clusterPhyId;
@NotBlank(message = "groupName不允许为空")
@ApiModelProperty(value = "消费组名称", example = "g-know-streaming")
private String groupName;
@ApiModelProperty(value = "Topic名称按照Topic纬度进行删除时需要传", example = "know-streaming")
protected String topicName;
@ApiModelProperty(value = "分区ID按照分区纬度进行删除时需要传")
private Integer partitionId;
/**
* @see com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum
*/
@NotNull(message = "deleteType不允许为空")
@ApiModelProperty(value = "删除类型", example = "0:group纬度1Topic纬度2Partition纬度")
private Integer deleteType;
}

View File

@@ -4,8 +4,6 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.broker;
import com.alibaba.fastjson.TypeReference;
import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData;
import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -73,10 +71,10 @@ public class Broker implements Serializable {
metadata.setBrokerId(node.id());
metadata.setHost(node.host());
metadata.setPort(node.port());
metadata.setJmxPort(JmxEnum.UNKNOWN.getPort());
metadata.setJmxPort(-1);
metadata.setStartTimestamp(startTimestamp);
metadata.setRack(node.rack());
metadata.setStatus(Constant.ALIVE);
metadata.setStatus(1);
return metadata;
}

View File

@@ -1,29 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.config;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
/**
* @author zengqiao
* @date 23/05/19
*/
@Data
@ApiModel(description = "Jmx配置")
public class JmxAuthConfig implements Serializable {
@ApiModelProperty(value="最大连接", example = "100")
protected Integer maxConn;
@ApiModelProperty(value="是否开启SSL如果开始则username 与 token 必须非空", example = "false")
protected Boolean openSSL;
@ApiModelProperty(value="SSL情况下的username", example = "Ks-Km")
protected String username;
@ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19")
protected String token;
}

View File

@@ -1,12 +1,10 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.config;
import com.xiaojukeji.know.streaming.km.common.bean.entity.jmx.ServerIdJmxPort;
import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.List;
import java.io.Serializable;
/**
* @author zengqiao
@@ -14,69 +12,24 @@ import java.util.List;
*/
@Data
@ApiModel(description = "Jmx配置")
public class JmxConfig extends JmxAuthConfig {
@ApiModelProperty(value="jmx端口,最低优先使用的端口", example = "8099")
public class JmxConfig implements Serializable {
@ApiModelProperty(value="jmx端口", example = "8099")
private Integer jmxPort;
@ApiModelProperty(value="最大连接", example = "100")
private Integer maxConn;
@ApiModelProperty(value="是否开启SSL如果开始则username 与 token 必须非空", example = "false")
private Boolean openSSL;
@ApiModelProperty(value="SSL情况下的username", example = "Ks-Km")
private String username;
@ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19")
private String token;
@ApiModelProperty(value="使用哪个endpoint网络", example = "EXTERNAL")
private String useWhichEndpoint;
@ApiModelProperty(value="指定server的JMX端口, 最高优先使用的端口", example = "")
private List<ServerIdJmxPort> specifiedJmxPortList;
/**
* 选取最终的jmx端口
* @param serverId 服务ID
* @param metadataJmxPort ks从元信息中获取到的jmx端口
*/
public Integer getFinallyJmxPort(String serverId, Integer metadataJmxPort) {
if (specifiedJmxPortList == null || specifiedJmxPortList.isEmpty()) {
// 未进行特殊指定时zkJMX端口存在则优先使用zkJmxPort否则使用配置的jmxPort
return this.selectJmxPort(jmxPort, metadataJmxPort);
}
// 进行特殊配置时
for (ServerIdJmxPort serverIdJmxPort: specifiedJmxPortList) {
if (serverId.equals(serverIdJmxPort.getServerId()) && serverIdJmxPort.getJmxPort() != null) {
// 当前server有指定具体的jmx端口时则使用具体的端口
return serverIdJmxPort.getJmxPort();
}
}
return this.selectJmxPort(jmxPort, metadataJmxPort);
}
/**
* 选取最终的jmx端口
* @param serverId serverId
*/
public Integer getFinallyJmxPort(String serverId) {
return this.getFinallyJmxPort(serverId, null);
}
/**
* 选取jmx端口
* @param feJmxPort 前端页面配置的jmx端口
* @param metadataJmxPort ks从元信息中获取到的jmx端口
*/
private Integer selectJmxPort(Integer feJmxPort, Integer metadataJmxPort) {
if (metadataJmxPort == null) {
return feJmxPort != null? feJmxPort: JmxEnum.NOT_OPEN.getPort();
}
if (JmxEnum.NOT_OPEN.getPort().equals(metadataJmxPort)) {
// 如果元信息提示未开启,则直接返回未开启
return JmxEnum.NOT_OPEN.getPort();
}
if (JmxEnum.UNKNOWN.getPort().equals(metadataJmxPort)) {
// 如果元信息提示未知则直接返回feJmxPort 或者 未开启
return feJmxPort != null? feJmxPort: JmxEnum.NOT_OPEN.getPort();
}
// 其他情况,返回 metadataJmxPort
return metadataJmxPort;
}
}

View File

@@ -1,7 +1,6 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.EntityIdInterface;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import lombok.Data;
import java.io.Serializable;
@@ -55,22 +54,6 @@ public class ConnectCluster implements Serializable, Comparable<ConnectCluster>,
*/
private String clusterUrl;
public String getSuitableRequestUrl() {
// 优先使用用户填写的url
String suitableRequestUrl = this.clusterUrl;
if (ValidateUtils.isBlank(suitableRequestUrl)) {
// 用户如果没有填写则使用元信息中的url
suitableRequestUrl = this.memberLeaderUrl;
}
//url去斜杠
if (suitableRequestUrl.length() > 0 && suitableRequestUrl.charAt(suitableRequestUrl.length() - 1) == '/') {
return suitableRequestUrl.substring(0, suitableRequestUrl.length() - 1);
}
return suitableRequestUrl;
}
@Override
public int compareTo(ConnectCluster connectCluster) {
return this.id.compareTo(connectCluster.getId());

View File

@@ -1,25 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.jmx;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author didi
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerIdJmxPort implements Serializable {
/**
* serverID
*/
private String serverId;
/**
* JMX端口
*/
private Integer jmxPort;
}

View File

@@ -27,10 +27,6 @@ public abstract class BaseMetrics implements Serializable {
protected Map<String, Float> metrics = new ConcurrentHashMap<>();
public void putMetric(String key, Float value){
if (value == null || key == null) {
return;
}
metrics.put(key, value);
}

View File

@@ -1,6 +1,7 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@@ -11,18 +12,20 @@ import lombok.ToString;
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ConnectClusterMetrics extends BaseMetrics {
protected Long connectClusterId;
private Long connectClusterId;
public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId ){
public ConnectClusterMetrics(Long clusterPhyId, Long connectClusterId){
super(clusterPhyId);
this.connectClusterId = connectClusterId;
}
public ConnectClusterMetrics(Long connectClusterId, String metricName, Float metricValue) {
this(null, connectClusterId);
this.putMetric(metricName, metricValue);
public static ConnectClusterMetrics initWithMetric(Long connectClusterId, String metric, Float value) {
ConnectClusterMetrics brokerMetrics = new ConnectClusterMetrics(connectClusterId, connectClusterId);
brokerMetrics.putMetric(metric, value);
return brokerMetrics;
}
@Override

View File

@@ -1,5 +1,7 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@@ -9,19 +11,25 @@ import lombok.ToString;
* @date 2022/11/2
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class ConnectWorkerMetrics extends ConnectClusterMetrics {
public class ConnectWorkerMetrics extends BaseMetrics {
private Long connectClusterId;
private String workerId;
public ConnectWorkerMetrics(Long connectClusterId, String workerId, String metricName, Float metricValue) {
super(null, connectClusterId);
this.workerId = workerId;
this.putMetric(metricName, metricValue);
public static ConnectWorkerMetrics initWithMetric(Long connectClusterId, String workerId, String metric, Float value) {
ConnectWorkerMetrics connectWorkerMetrics = new ConnectWorkerMetrics();
connectWorkerMetrics.setConnectClusterId(connectClusterId);
connectWorkerMetrics.setWorkerId(workerId);
connectWorkerMetrics.putMetric(metric, value);
return connectWorkerMetrics;
}
@Override
public String unique() {
return "KCW@" + clusterPhyId + "@" + connectClusterId + "@" + workerId;
return "KCC@" + clusterPhyId + "@" + connectClusterId + "@" + workerId;
}
}

View File

@@ -1,5 +1,6 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@@ -11,21 +12,24 @@ import lombok.ToString;
@Data
@NoArgsConstructor
@ToString
public class ConnectorMetrics extends ConnectClusterMetrics {
protected String connectorName;
public class ConnectorMetrics extends BaseMetrics {
private Long connectClusterId;
protected String connectorNameAndClusterId;
private String connectorName;
private String connectorNameAndClusterId;
public ConnectorMetrics(Long connectClusterId, String connectorName) {
super(null, connectClusterId);
super(null);
this.connectClusterId = connectClusterId;
this.connectorName = connectorName;
this.connectorNameAndClusterId = connectorName + "#" + connectClusterId;
}
public ConnectorMetrics(Long connectClusterId, String connectorName, String metricName, Float metricValue) {
this(connectClusterId, connectorName);
this.putMetric(metricName, metricValue);
public static ConnectorMetrics initWithMetric(Long connectClusterId, String connectorName, String metricName, Float value) {
ConnectorMetrics metrics = new ConnectorMetrics(connectClusterId, connectorName);
metrics.putMetric(metricName, value);
return metrics;
}
@Override

View File

@@ -1,5 +1,6 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@@ -11,7 +12,11 @@ import lombok.ToString;
@Data
@NoArgsConstructor
@ToString
public class ConnectorTaskMetrics extends ConnectorMetrics {
public class ConnectorTaskMetrics extends BaseMetrics {
private Long connectClusterId;
private String connectorName;
private Integer taskId;
public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer taskId) {
@@ -20,13 +25,14 @@ public class ConnectorTaskMetrics extends ConnectorMetrics {
this.taskId = taskId;
}
public ConnectorTaskMetrics(Long connectClusterId, String connectorName, Integer taskId, String metricName, Float metricValue) {
this(connectClusterId, connectorName, taskId);
this.putMetric(metricName, metricValue);
public static ConnectorTaskMetrics initWithMetric(Long connectClusterId, String connectorName, Integer taskId, String metricName, Float value) {
ConnectorTaskMetrics metrics = new ConnectorTaskMetrics(connectClusterId, connectorName, taskId);
metrics.putMetric(metricName,value);
return metrics;
}
@Override
public String unique() {
return "KCORT@" + connectClusterId + "@" + connectorName + "@" + taskId;
return "KCOR@" + connectClusterId + "@" + connectorName + "@" + taskId;
}
}

View File

@@ -1,16 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class DeleteGroupParam extends GroupParam {
protected DeleteGroupTypeEnum deleteGroupTypeEnum;
public DeleteGroupParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum) {
super(clusterPhyId, groupName);
this.deleteGroupTypeEnum = deleteGroupTypeEnum;
}
}

View File

@@ -1,16 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class DeleteGroupTopicParam extends DeleteGroupParam {
protected String topicName;
public DeleteGroupTopicParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum, String topicName) {
super(clusterPhyId, groupName, deleteGroupTypeEnum);
this.topicName = topicName;
}
}

View File

@@ -1,16 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class DeleteGroupTopicPartitionParam extends DeleteGroupTopicParam {
protected Integer partitionId;
public DeleteGroupTopicPartitionParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum, String topicName, Integer partitionId) {
super(clusterPhyId, groupName, deleteGroupTypeEnum, topicName);
this.partitionId = partitionId;
}
}

View File

@@ -1,11 +1,13 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class GroupParam extends ClusterPhyParam {
protected String groupName;

View File

@@ -1,29 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TopicTruncateParam extends ClusterPhyParam {
protected String topicName;
protected long offset;
public TopicTruncateParam(Long clusterPhyId, String topicName, long offset) {
super(clusterPhyId);
this.topicName = topicName;
this.offset = offset;
}
@Override
public String toString() {
return "TopicParam{" +
"clusterPhyId=" + clusterPhyId +
", topicName='" + topicName + '\'' +
", offset='" + offset + '\'' +
'}';
}
}

View File

@@ -25,15 +25,15 @@ public class MonitorCmdData extends BaseFourLetterWordCmdData {
private Float zkAvgLatency;
private Float zkMaxLatency;
private Float zkMinLatency;
private Float zkPacketsReceived;
private Float zkPacketsSent;
private Float zkNumAliveConnections;
private Float zkOutstandingRequests;
private Long zkPacketsReceived;
private Long zkPacketsSent;
private Long zkNumAliveConnections;
private Long zkOutstandingRequests;
private String zkServerState;
private Float zkZnodeCount;
private Float zkWatchCount;
private Float zkEphemeralsCount;
private Float zkApproximateDataSize;
private Float zkOpenFileDescriptorCount;
private Float zkMaxFileDescriptorCount;
private Long zkZnodeCount;
private Long zkWatchCount;
private Long zkEphemeralsCount;
private Long zkApproximateDataSize;
private Long zkOpenFileDescriptorCount;
private Long zkMaxFileDescriptorCount;
}

View File

@@ -20,11 +20,11 @@ public class ServerCmdData extends BaseFourLetterWordCmdData {
private Float zkAvgLatency;
private Float zkMaxLatency;
private Float zkMinLatency;
private Float zkPacketsReceived;
private Float zkPacketsSent;
private Float zkNumAliveConnections;
private Float zkOutstandingRequests;
private Long zkPacketsReceived;
private Long zkPacketsSent;
private Long zkNumAliveConnections;
private Long zkOutstandingRequests;
private String zkServerState;
private Float zkZnodeCount;
private Long zkZnodeCount;
private Long zkZxid;
}

View File

@@ -51,7 +51,7 @@ public class MonitorCmdDataParser implements FourLetterWordDataParser<MonitorCmd
}
MonitorCmdData monitorCmdData = new MonitorCmdData();
dataMap.entrySet().forEach(elem -> {
dataMap.entrySet().stream().forEach(elem -> {
try {
switch (elem.getKey()) {
case "zk_version":
@@ -67,37 +67,37 @@ public class MonitorCmdDataParser implements FourLetterWordDataParser<MonitorCmd
monitorCmdData.setZkMinLatency(ConvertUtil.string2Float(elem.getValue()));
break;
case "zk_packets_received":
monitorCmdData.setZkPacketsReceived(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
break;
case "zk_packets_sent":
monitorCmdData.setZkPacketsSent(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
break;
case "zk_num_alive_connections":
monitorCmdData.setZkNumAliveConnections(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
break;
case "zk_outstanding_requests":
monitorCmdData.setZkOutstandingRequests(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
break;
case "zk_server_state":
monitorCmdData.setZkServerState(elem.getValue());
break;
case "zk_znode_count":
monitorCmdData.setZkZnodeCount(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
break;
case "zk_watch_count":
monitorCmdData.setZkWatchCount(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkWatchCount(Long.valueOf(elem.getValue()));
break;
case "zk_ephemerals_count":
monitorCmdData.setZkEphemeralsCount(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkEphemeralsCount(Long.valueOf(elem.getValue()));
break;
case "zk_approximate_data_size":
monitorCmdData.setZkApproximateDataSize(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkApproximateDataSize(Long.valueOf(elem.getValue()));
break;
case "zk_open_file_descriptor_count":
monitorCmdData.setZkOpenFileDescriptorCount(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkOpenFileDescriptorCount(Long.valueOf(elem.getValue()));
break;
case "zk_max_file_descriptor_count":
monitorCmdData.setZkMaxFileDescriptorCount(ConvertUtil.string2Float(elem.getValue()));
monitorCmdData.setZkMaxFileDescriptorCount(Long.valueOf(elem.getValue()));
break;
case "Proposal sizes last/min/max":
case "zk_fsync_threshold_exceed_count":

View File

@@ -46,7 +46,7 @@ public class ServerCmdDataParser implements FourLetterWordDataParser<ServerCmdDa
}
ServerCmdData serverCmdData = new ServerCmdData();
dataMap.entrySet().forEach(elem -> {
dataMap.entrySet().stream().forEach(elem -> {
try {
switch (elem.getKey()) {
case "Zookeeper version":
@@ -59,22 +59,22 @@ public class ServerCmdDataParser implements FourLetterWordDataParser<ServerCmdDa
serverCmdData.setZkMaxLatency(ConvertUtil.string2Float(data[2]));
break;
case "Received":
serverCmdData.setZkPacketsReceived(ConvertUtil.string2Float(elem.getValue()));
serverCmdData.setZkPacketsReceived(Long.valueOf(elem.getValue()));
break;
case "Sent":
serverCmdData.setZkPacketsSent(ConvertUtil.string2Float(elem.getValue()));
serverCmdData.setZkPacketsSent(Long.valueOf(elem.getValue()));
break;
case "Connections":
serverCmdData.setZkNumAliveConnections(ConvertUtil.string2Float(elem.getValue()));
serverCmdData.setZkNumAliveConnections(Long.valueOf(elem.getValue()));
break;
case "Outstanding":
serverCmdData.setZkOutstandingRequests(ConvertUtil.string2Float(elem.getValue()));
serverCmdData.setZkOutstandingRequests(Long.valueOf(elem.getValue()));
break;
case "Mode":
serverCmdData.setZkServerState(elem.getValue());
break;
case "Node count":
serverCmdData.setZkZnodeCount(ConvertUtil.string2Float(elem.getValue()));
serverCmdData.setZkZnodeCount(Long.valueOf(elem.getValue()));
break;
case "Zxid":
serverCmdData.setZkZxid(Long.parseUnsignedLong(elem.getValue().trim().substring(2), 16));

View File

@@ -1,16 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect;
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyBaseEvent;
import lombok.Getter;
/**
* 集群删除事件
* @author zengqiao
* @date 23/08/15
*/
@Getter
public class ClusterPhyDeletedEvent extends ClusterPhyBaseEvent {
public ClusterPhyDeletedEvent(Object source, Long clusterPhyId) {
super(source, clusterPhyId);
}
}

View File

@@ -29,7 +29,7 @@ public class ConnectClusterPO extends BasePO {
private Integer state;
/**
* 用户填写的集群地址
* 集群地址
*/
private String clusterUrl;

View File

@@ -7,7 +7,6 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
import java.util.Objects;
@Data
@NoArgsConstructor
@@ -38,16 +37,4 @@ public class GroupMemberPO extends BasePO {
this.memberCount = memberCount;
this.updateTime = updateTime;
}
public boolean equal2GroupMemberPO(GroupMemberPO that) {
if (that == null) {
return false;
}
return Objects.equals(clusterPhyId, that.clusterPhyId)
&& Objects.equals(topicName, that.topicName)
&& Objects.equals(groupName, that.groupName)
&& Objects.equals(state, that.state)
&& Objects.equals(memberCount, that.memberCount);
}
}

View File

@@ -9,8 +9,6 @@ import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Objects;
@Data
@NoArgsConstructor
@@ -60,18 +58,4 @@ public class GroupPO extends BasePO {
*/
private int coordinatorId;
public boolean equal2GroupPO(GroupPO groupPO) {
if (groupPO == null) {
return false;
}
return coordinatorId == groupPO.coordinatorId
&& Objects.equals(clusterPhyId, groupPO.clusterPhyId)
&& Objects.equals(type, groupPO.type)
&& Objects.equals(name, groupPO.name)
&& Objects.equals(state, groupPO.state)
&& Objects.equals(memberCount, groupPO.memberCount)
&& Objects.equals(topicMembers, groupPO.topicMembers)
&& Objects.equals(partitionAssignor, groupPO.partitionAssignor);
}
}

View File

@@ -13,6 +13,7 @@ import lombok.Data;
@Data
@ApiModel(description = "集群MM2状态信息")
public class MirrorMakerBaseStateVO extends BaseVO {
@ApiModelProperty(value = "worker数", example = "1")
private Integer workerCount;

View File

@@ -49,8 +49,6 @@ public class KafkaConstant {
public static final Map<String, ConfigDef.ConfigKey> KAFKA_ALL_CONFIG_DEF_MAP = new ConcurrentHashMap<>();
public static final Integer TOPICK_TRUNCATE_DEFAULT_OFFSET = -1;
static {
try {
KAFKA_ALL_CONFIG_DEF_MAP.putAll(CollectionConverters.asJava(LogConfig$.MODULE$.configKeys()));

View File

@@ -27,8 +27,5 @@ public class PaginationConstant {
/**
* groupTopic列表的默认排序规则
*/
public static final String DEFAULT_GROUP_TOPIC_SORTED_FIELD = "topicName";
public static final String TOPIC_RECORDS_TIME_SORTED_FIELD = "timestampUnitMs";
public static final String TOPIC_RECORDS_OFFSET_SORTED_FIELD = "offset";
public static final String DEFAULT_GROUP_TOPIC_SORTED_FIELD = "topicName";
}

View File

@@ -16,8 +16,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiL
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,9 +24,6 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
public class ConnectConverter {
public static ConnectorBasicCombineExistVO convert2BasicVO(ConnectCluster connectCluster, ConnectorPO connectorPO) {
ConnectorBasicCombineExistVO vo = new ConnectorBasicCombineExistVO();
@@ -158,66 +153,6 @@ public class ConnectConverter {
return ksConnector;
}
public static List<KSConnector> convertAndSupplyMirrorMakerInfo(ConnectCluster connectCluster, List<Triple<KSConnectorInfo, List<String>, KSConnectorStateInfo>> connectorFullInfoList) {
// <connectorName, targetBootstrapServers + "@" + sourceBootstrapServers>
Map<String, String> sourceMap = new HashMap<>();
// <targetBootstrapServers + "@" + sourceBootstrapServers, connectorName>
Map<String, String> heartbeatMap = new HashMap<>();
Map<String, String> checkpointMap = new HashMap<>();
// 获取每个类型的connector的map信息
connectorFullInfoList.forEach(connector -> {
Map<String, String> mm2Map = null;
if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
mm2Map = sourceMap;
} else if (KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
mm2Map = heartbeatMap;
} else if (KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
mm2Map = checkpointMap;
}
String targetBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
String sourceBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
if (ValidateUtils.anyBlank(targetBootstrapServers, sourceBootstrapServers) || mm2Map == null) {
return;
}
if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
// source 类型的格式和 heartbeat & checkpoint 的不一样
mm2Map.put(connector.v1().getName(), targetBootstrapServers + "@" + sourceBootstrapServers);
} else {
mm2Map.put(targetBootstrapServers + "@" + sourceBootstrapServers, connector.v1().getName());
}
});
List<KSConnector> connectorList = new ArrayList<>();
connectorFullInfoList.forEach(connector -> {
// 转化并添加到list中
KSConnector ksConnector = ConnectConverter.convert2KSConnector(
connectCluster.getKafkaClusterPhyId(),
connectCluster.getId(),
connector.v1(),
connector.v3(),
connector.v2()
);
connectorList.add(ksConnector);
// 补充mm2信息
String targetAndSource = sourceMap.get(ksConnector.getConnectorName());
if (ValidateUtils.isBlank(targetAndSource)) {
return;
}
ksConnector.setHeartbeatConnectorName(heartbeatMap.getOrDefault(targetAndSource, ""));
ksConnector.setCheckpointConnectorName(checkpointMap.getOrDefault(targetAndSource, ""));
});
return connectorList;
}
private static String genConnectorKey(Long connectorId, String connectorName){
return connectorId + "#" + connectorName;
}

View File

@@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.stream.Collectors;
/**
@@ -58,7 +57,6 @@ public class GroupConverter {
po.setTopicMembers(ConvertUtil.obj2Json(group.getTopicMembers()));
po.setType(group.getType().getCode());
po.setState(group.getState().getState());
po.setUpdateTime(new Date());
return po;
}
}

View File

@@ -1,50 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.enums.connect;
import org.apache.kafka.connect.runtime.AbstractStatus;
/**
* connector运行状态
* @see AbstractStatus
*/
public enum ConnectStatusEnum {
UNASSIGNED(0, "UNASSIGNED"),
RUNNING(1,"RUNNING"),
PAUSED(2,"PAUSED"),
FAILED(3, "FAILED"),
DESTROYED(4, "DESTROYED"),
UNKNOWN(-1, "UNKNOWN")
;
ConnectStatusEnum(int status, String value) {
this.status = status;
this.value = value;
}
private final int status;
private final String value;
public static ConnectStatusEnum getByValue(String value) {
for (ConnectStatusEnum statusEnum: ConnectStatusEnum.values()) {
if (statusEnum.value.equals(value)) {
return statusEnum;
}
}
return ConnectStatusEnum.UNKNOWN;
}
public int getStatus() {
return status;
}
public String getValue() {
return value;
}
}

View File

@@ -1,28 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.enums.group;
import lombok.Getter;
/**
* @author wyb
* @date 2022/10/11
*/
@Getter
public enum DeleteGroupTypeEnum {
UNKNOWN(-1, "Unknown"),
GROUP(0, "Group纬度"),
GROUP_TOPIC(1, "GroupTopic纬度"),
GROUP_TOPIC_PARTITION(2, "GroupTopicPartition纬度");
private final Integer code;
private final String msg;
DeleteGroupTypeEnum(Integer code, String msg) {
this.code = code;
this.msg = msg;
}
}

View File

@@ -1,20 +0,0 @@
package com.xiaojukeji.know.streaming.km.common.enums.jmx;
import lombok.Getter;
@Getter
public enum JmxEnum {
NOT_OPEN(-1, "未开启JMX端口"),
UNKNOWN(-2, "JMX端口未知"),
;
private final Integer port;
private final String message;
JmxEnum(Integer port, String message) {
this.port = port;
this.message = message;
}
}

View File

@@ -2,6 +2,7 @@ package com.xiaojukeji.know.streaming.km.common.enums.operaterecord;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import java.util.List;
@@ -40,6 +41,9 @@ public enum ModuleEnum {
JOB_KAFKA_REPLICA_REASSIGN(110, "Job-KafkaReplica迁移"),
@EnterpriseLoadReBalance
JOB_CLUSTER_BALANCE(111, "Job-ClusterBalance"),
;
ModuleEnum(int code, String desc) {

View File

@@ -32,8 +32,6 @@ public enum OperationEnum {
RESTART(11, "重启"),
TRUNCATE(12, "清空"),
;
OperationEnum(int code, String desc) {

View File

@@ -73,27 +73,9 @@ public enum VersionEnum {
* 3.x.x
*/
V_3_0_0("3.0.0", normailze("3.0.0")),
V_3_1_0("3.1.0", normailze("3.1.0")),
V_3_1_1("3.1.1", normailze("3.1.1")),
V_3_1_2("3.1.2", normailze("3.1.2")),
V_3_2_0("3.2.0", normailze("3.2.0")),
V_3_2_1("3.2.1", normailze("3.2.1")),
V_3_2_3("3.2.3", normailze("3.2.3")),
V_3_3_0("3.3.0", normailze("3.3.0")),
V_3_3_1("3.3.1", normailze("3.3.1")),
V_3_3_2("3.3.2", normailze("3.3.2")),
V_3_4_0("3.4.0", normailze("3.4.0")),
V_3_4_1("3.4.1", normailze("3.4.1")),
V_3_5_0("3.5.0", normailze("3.5.0")),
V_3_5_1("3.5.1", normailze("3.5.1")),
V_3_6_0("3.6.0", normailze("3.6.0")),
V_MAX("x.x.x.x", Long.MAX_VALUE),

View File

@@ -41,8 +41,6 @@ public enum VersionItemTypeEnum {
SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"),
SERVICE_OP_GROUP(340, "service_group_operation"),
SERVICE_OP_CONNECT_CLUSTER(400, "service_connect_cluster_operation"),
SERVICE_OP_CONNECT_CONNECTOR(401, "service_connect_connector_operation"),
SERVICE_OP_CONNECT_PLUGIN(402, "service_connect_plugin_operation"),

View File

@@ -10,8 +10,6 @@ public enum ZKRoleEnum {
OBSERVER("observer"),
STANDALONE("standalone"),
UNKNOWN("unknown"),
;

View File

@@ -1,8 +1,6 @@
package com.xiaojukeji.know.streaming.km.common.jmx;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxAuthConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxConfig;
import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import org.slf4j.Logger;
@@ -35,26 +33,26 @@ public class JmxConnectorWrap {
private final Long brokerStartupTime;
private final String jmxHost;
private final String host;
private final Integer jmxPort;
private final Integer port;
private JMXConnector jmxConnector;
private final AtomicInteger atomicInteger;
private JmxAuthConfig jmxConfig;
private JmxConfig jmxConfig;
public JmxConnectorWrap(String clientLogIdent, Long brokerStartupTime, String jmxHost, Integer jmxPort, JmxAuthConfig jmxConfig) {
LOGGER.info(
"method=JmxConnectorWrap||clientLogIdent={}||brokerStartupTime={}||jmxHost={}||jmxPort={}||jmxConfig={}||msg=start construct JmxWrap.",
clientLogIdent, brokerStartupTime, jmxHost, jmxPort, jmxConfig
);
this.clientLogIdent = clientLogIdent;
public JmxConnectorWrap(String clientLogIdent, Long brokerStartupTime, String host, Integer port, JmxConfig jmxConfig) {
this.clientLogIdent=clientLogIdent;
this.brokerStartupTime = brokerStartupTime;
this.jmxHost = jmxHost;
this.jmxPort = (jmxPort == null? JmxEnum.UNKNOWN.getPort() : jmxPort);
this.host = host;
if (port == null || port == -1 && jmxConfig.getJmxPort() != null) {
this.port = jmxConfig.getJmxPort();
} else {
this.port = port;
}
this.jmxConfig = jmxConfig;
if (ValidateUtils.isNull(this.jmxConfig)) {
@@ -63,7 +61,6 @@ public class JmxConnectorWrap {
if (ValidateUtils.isNullOrLessThanZero(this.jmxConfig.getMaxConn())) {
this.jmxConfig.setMaxConn(1000);
}
this.atomicInteger = new AtomicInteger(this.jmxConfig.getMaxConn());
}
@@ -71,7 +68,7 @@ public class JmxConnectorWrap {
if (jmxConnector != null) {
return true;
}
if (jmxPort == null || jmxPort == -1) {
if (port == null || port == -1) {
return false;
}
return createJmxConnector();
@@ -94,10 +91,7 @@ public class JmxConnectorWrap {
jmxConnector = null;
} catch (IOException e) {
LOGGER.error(
"method=close||clientLogIdent={}||jmxHost={}||jmxPort={}||msg=close jmx JmxConnector exception.",
clientLogIdent, jmxHost, jmxPort, e
);
LOGGER.warn("close JmxConnector exception, clientLogIdent:{} host:{} port:{}.", clientLogIdent, host, port, e);
}
}
@@ -165,12 +159,7 @@ public class JmxConnectorWrap {
if (jmxConnector != null) {
return true;
}
LOGGER.info(
"method=createJmxConnector||clientLogIdent={}||brokerStartupTime={}||jmxHost={}||jmxPort={}||jmxConfig={}||msg=start create jmx connector.",
clientLogIdent, brokerStartupTime, jmxHost, jmxPort, jmxConfig
);
String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", jmxHost, jmxPort);
String jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port);
try {
Map<String, Object> environment = new HashMap<String, Object>();
if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getToken())) {
@@ -185,21 +174,12 @@ public class JmxConnectorWrap {
}
jmxConnector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment);
LOGGER.info(
"method=createJmxConnector||clientLogIdent={}||jmxHost={}||jmxPort={}||msg=jmx connect success.",
clientLogIdent, jmxHost, jmxPort
);
LOGGER.info("JMX connect success, clientLogIdent:{} host:{} port:{}.", clientLogIdent, host, port);
return true;
} catch (MalformedURLException e) {
LOGGER.error(
"method=createJmxConnector||clientLogIdent={}||jmxHost={}||jmxPort={}||jmxUrl={}||msg=jmx url exception.",
clientLogIdent, jmxHost, jmxPort, jmxUrl, e
);
LOGGER.error("JMX url exception, clientLogIdent:{} host:{} port:{} jmxUrl:{}", clientLogIdent, host, port, jmxUrl, e);
} catch (Exception e) {
LOGGER.error(
"method=createJmxConnector||clientLogIdent={}||jmxHost={}||jmxPort={}||msg=jmx connect exception.",
clientLogIdent, jmxHost, jmxPort, e
);
LOGGER.error("JMX connect exception, clientLogIdent:{} host:{} port:{}.", clientLogIdent, host, port, e);
}
return false;
}

View File

@@ -78,8 +78,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
@@ -1340,11 +1338,21 @@ public class KSPartialKafkaAdminClient {
if (groupMember.memberAssignment().length > 0) {
final Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>(assignment.partitions()));
} else {
memberBaseAssignment = new KSMemberConsumerAssignment(new HashSet<>());
}
} else {
memberBaseAssignment = deserializeConnectGroupDataCompatibility(groupMember);
ConnectProtocol.Assignment assignment = null;
if (groupMember.memberAssignment().length > 0) {
assignment = ConnectProtocol.
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
}
ConnectProtocol.WorkerState workerState = null;
if (groupMember.memberMetadata().length > 0) {
workerState = ConnectProtocol.
deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata()));
}
memberBaseAssignment = new KSMemberConnectAssignment(assignment, workerState);
}
memberDescriptions.add(new KSMemberDescription(
@@ -1373,36 +1381,6 @@ public class KSPartialKafkaAdminClient {
};
}
private KSMemberBaseAssignment deserializeConnectGroupDataCompatibility(DescribedGroupMember groupMember) {
try {
// 高版本的反序列化方式
ExtendedWorkerState workerState = null;
if (groupMember.memberMetadata().length > 0) {
workerState = IncrementalCooperativeConnectProtocol.
deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata()));
return new KSMemberConnectAssignment(workerState.assignment(), workerState);
}
} catch (Exception e) {
// ignore
}
// 低版本的反序列化方式
ConnectProtocol.Assignment assignment = null;
if (groupMember.memberAssignment().length > 0) {
assignment = ConnectProtocol.
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
}
ConnectProtocol.WorkerState workerState = null;
if (groupMember.memberMetadata().length > 0) {
workerState = ConnectProtocol.
deserializeMetadata(ByteBuffer.wrap(groupMember.memberMetadata()));
}
return new KSMemberConnectAssignment(assignment, workerState);
}
private Set<AclOperation> validAclOperations(final int authorizedOperations) {
if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {

View File

@@ -1,2 +1,2 @@
BUSINESS_VERSION='false'
BUSINESS_VERSION='true'
PUBLIC_PATH=''

View File

@@ -16,13 +16,6 @@ const babelOptions = {
cacheDirectory: true,
babelrc: false,
presets: [require.resolve('@babel/preset-env'), require.resolve('@babel/preset-typescript'), require.resolve('@babel/preset-react')],
overrides: [
// TODO编译时需要做的事情更多应该只针对目标第三方库
{
include: './node_modules',
sourceType: 'unambiguous'
}
],
plugins: [
[require.resolve('@babel/plugin-proposal-decorators'), { legacy: true }],
[require.resolve('@babel/plugin-proposal-class-properties'), { loose: true }],

View File

@@ -31,12 +31,7 @@ export const { Provider, Consumer } = React.createContext('zh');
const defaultLanguage = 'zh';
const AppContent = (props: {
getLicenseInfo?: (cbk: (msg: string) => void) => void | undefined;
licenseEventBus?: Record<string, any> | undefined;
}) => {
const { getLicenseInfo, licenseEventBus } = props;
const AppContent = (props: any) => {
return (
<div className="config-system">
<DProLayout.Sider prefixCls={'dcd-two-columns'} width={200} theme={'light'} systemKey={systemKey} menuConf={leftMenus} />
@@ -44,7 +39,7 @@ const AppContent = (props: {
<RouteGuard
routeList={pageRoutes}
beforeEach={() => {
getLicenseInfo?.((msg) => licenseEventBus?.emit('licenseError', msg));
// getLicenseInfo?.((msg) => licenseEventBus?.emit('licenseError', msg));
return Promise.resolve(true);
}}
noMatch={() => <Redirect to="/404" />}
@@ -55,7 +50,6 @@ const AppContent = (props: {
};
const App = (props: any) => {
const { getLicenseInfo, licenseEventBus } = props;
const intlMessages = _.get(localeMap[defaultLanguage], 'intlMessages', intlZhCN);
const locale = _.get(localeMap[defaultLanguage], 'intl', 'zh-CN');
const antdLocale = _.get(localeMap[defaultLanguage], 'dantd', dantdZhCN);
@@ -65,7 +59,7 @@ const App = (props: any) => {
<AppContainer intlProvider={{ locale, messages: intlMessages }} antdProvider={{ locale: antdLocale }}>
<Router basename={systemKey}>
<Switch>
<AppContent getLicenseInfo={getLicenseInfo} licenseEventBus={licenseEventBus} />
<AppContent />
</Switch>
</Router>
</AppContainer>

View File

@@ -96,7 +96,7 @@ const RoleDetailAndUpdate = forwardRef((props, ref): JSX.Element => {
arr.push(permissions[i].id);
}
});
formData.permissionIdList = formData.permissionIdList.flat().filter((item) => item !== undefined);
formData.permissionIdList = formData.permissionIdList.flat();
setConfirmLoading(true);
request(api.editRole, {
method: type === RoleOperate.Add ? 'POST' : 'PUT',
@@ -250,7 +250,7 @@ const RoleDetailAndUpdate = forwardRef((props, ref): JSX.Element => {
<CheckboxGroupContainer
key={i}
formInstance={form}
fieldName={`permissionIdList`}
fieldName="permissionIdList"
options={permission.options}
initSelectedOptions={initSelectedPermissions[permission.id] || []}
groupIdx={i}

View File

@@ -34,11 +34,11 @@ module.exports = {
proxy: {
'/ks-km/api/v3': {
changeOrigin: true,
target: 'http://127.0.0.1/',
target: 'https://api-kylin-xg02.intra.xiaojukeji.com/ks-km/',
},
'/logi-security/api/v1': {
changeOrigin: true,
target: 'http://127.0.0.1/',
target: 'https://api-kylin-xg02.intra.xiaojukeji.com/ks-km/',
},
},
},

View File

@@ -10004,12 +10004,6 @@
}
}
},
"pubsub-js": {
"version": "1.9.4",
"resolved": "https://registry.npmmirror.com/pubsub-js/-/pubsub-js-1.9.4.tgz",
"integrity": "sha512-hJYpaDvPH4w8ZX/0Fdf9ma1AwRgU353GfbaVfPjfJQf1KxZ2iHaHl3fAUw1qlJIR5dr4F3RzjGaWohYUEyoh7A==",
"dev": true
},
"pump": {
"version": "3.0.0",
"resolved": "https://registry.npmmirror.com/pump/-/pump-3.0.0.tgz",

View File

@@ -82,7 +82,6 @@
"@types/lodash": "^4.14.171",
"@types/node": "^12.12.25",
"@types/pubsub-js": "^1.5.18",
"pubsub-js": "^1.5.18",
"@typescript-eslint/eslint-plugin": "4.13.0",
"@typescript-eslint/parser": "4.13.0",
"babel-eslint": "10.1.0",
@@ -108,7 +107,6 @@
"optimize-css-assets-webpack-plugin": "^5.0.1",
"prettier": "2.3.2",
"progress-bar-webpack-plugin": "^1.12.1",
"pubsub-js": "^1.9.4",
"query-string": "^7.0.1",
"react-refresh": "^0.10.0",
"react-router-dom": "5.2.1",

View File

@@ -94,8 +94,7 @@ const api = {
getTopicGroupPartitionsHistory: (clusterPhyId: number, groupName: string) =>
getApi(`/clusters/${clusterPhyId}/groups/${groupName}/partitions`),
resetGroupOffset: () => getApi('/group-offsets'),
getGroupOverview: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/groups-overview`),
deleteGroupOffset: () => getApi('/group-offsets'),
// topics列表
getTopicsList: (clusterPhyId: number) => getApi(`/clusters/${clusterPhyId}/topics-overview`),
getReassignmentList: () => getApi(`/reassignment/topics-overview`),
@@ -108,7 +107,6 @@ const api = {
getTopicState: (clusterPhyId: number, topicName: string) => getApi(`/clusters/${clusterPhyId}/topics/${topicName}/state`),
getTopicMetadata: (clusterPhyId: number, topicName: string) =>
getApi(`/clusters/${clusterPhyId}/topics/${topicName}/metadata-combine-exist`),
deleteTopicData: () => getApi(`/topics/truncate-topic`),
// 最新的指标值
getMetricPointsLatest: (clusterPhyId: number) => getApi(`/physical-clusters/${clusterPhyId}/latest-metrics`),

View File

@@ -20,7 +20,6 @@ import { getLicenseInfo } from './constants/common';
import api from './api';
import ClusterContainer from './pages/index';
import ksLogo from './assets/ks-logo.png';
import {ClustersPermissionMap} from "./pages/CommonConfig";
interface ILocaleMap {
[index: string]: any;
@@ -73,53 +72,12 @@ const logout = () => {
localStorage.removeItem('userInfo');
};
const LicenseLimitModal = () => {
const [visible, setVisible] = useState<boolean>(false);
const [msg, setMsg] = useState<string>('');
useLayoutEffect(() => {
licenseEventBus.on('licenseError', (desc: string) => {
!visible && setVisible(true);
setMsg(desc);
});
return () => {
licenseEventBus.removeAll('licenseError');
};
}, []);
return (
<Modal
visible={visible}
centered={true}
width={400}
zIndex={10001}
title={
<>
<IconFont type="icon-yichang" style={{ marginRight: 10, fontSize: 18 }} />
</>
}
footer={null}
onCancel={() => setVisible(false)}
>
<div style={{ margin: '0 28px', lineHeight: '24px' }}>
<div>
{msg}<a></a>
</div>
</div>
</Modal>
);
};
const AppContent = (props: { setlanguage: (language: string) => void }) => {
const { pathname } = useLocation();
const history = useHistory();
const userInfo = localStorage.getItem('userInfo');
const [curActiveAppName, setCurActiveAppName] = useState('');
const [versionInfo, setVersionInfo] = useState<VersionInfo>();
const [global] = AppContainer.useGlobalValue();
const quickEntries=[];
useEffect(() => {
if (pathname.startsWith('/config')) {
setCurActiveAppName('config');
@@ -135,23 +93,6 @@ const AppContent = (props: { setlanguage: (language: string) => void }) => {
});
}, []);
if (global.hasPermission && global.hasPermission(ClustersPermissionMap.CLUSTERS_MANAGE_VIEW)){
quickEntries.push({
icon: <IconFont type="icon-duojiqunguanli"/>,
txt: '多集群管理',
ident: '',
active: curActiveAppName === 'cluster',
});
}
if (global.hasPermission && global.hasPermission(ClustersPermissionMap.SYS_MANAGE_VIEW)){
quickEntries.push({
icon: <IconFont type="icon-xitongguanli" />,
txt: '系统管理',
ident: 'config',
active: curActiveAppName === 'config',
});
}
return (
<DProLayout.Container
headerProps={{
@@ -162,7 +103,20 @@ const AppContent = (props: { setlanguage: (language: string) => void }) => {
),
username: userInfo ? JSON.parse(userInfo)?.userName : '',
icon: <DotChartOutlined />,
quickEntries: quickEntries,
quickEntries: [
{
icon: <IconFont type="icon-duojiqunguanli" />,
txt: '多集群管理',
ident: '',
active: curActiveAppName === 'cluster',
},
{
icon: <IconFont type="icon-xitongguanli" />,
txt: '系统管理',
ident: 'config',
active: curActiveAppName === 'config',
},
],
isFixed: false,
userDropMenuItems: [
<Menu.Item key={0}>
@@ -186,7 +140,7 @@ const AppContent = (props: { setlanguage: (language: string) => void }) => {
}}
onMount={(customProps: any) => {
judgePage404();
registerApps(systemsConfig, { ...customProps, getLicenseInfo, licenseEventBus }, () => {
registerApps(systemsConfig, { ...customProps }, () => {
// postMessage();
});
}}
@@ -207,7 +161,6 @@ const AppContent = (props: { setlanguage: (language: string) => void }) => {
}}
/>
</Switch>
<LicenseLimitModal />
</>
</DProLayout.Container>
);
@@ -241,7 +194,6 @@ export default function App(): JSX.Element {
<BrowserRouter basename="">
<Switch>
<Route path="/login" component={Login} />
<Route path="/no-license" exact component={NoLicense} />
<Route render={() => <AppContent setlanguage={setlanguage} />} />
</Switch>
</BrowserRouter>

View File

@@ -49,7 +49,7 @@ const ConnectDetailCard = (props: { record: any }) => {
return (
<>
{
<span style={{ fontFamily: 'HelveticaNeue-Medium', fontSize: 28, color: '#212529' }}>
<span style={{ fontFamily: 'HelveticaNeue-Medium', fontSize: 32, color: '#212529' }}>
{Utils.firstCharUppercase(type) || '-'}
</span>
}
@@ -64,7 +64,7 @@ const ConnectDetailCard = (props: { record: any }) => {
return (
<>
{
<span style={{ fontFamily: 'HelveticaNeue-Medium', fontSize: 28, color: stateEnum[state].color }}>
<span style={{ fontFamily: 'HelveticaNeue-Medium', fontSize: 32, color: stateEnum[state].color }}>
{Utils.firstCharUppercase(state) || '-'}
</span>
}

View File

@@ -33,6 +33,7 @@ interface PropsType {
};
onChange: (options: KsHeaderOptions) => void;
openMetricFilter: () => void;
setScreenType?: any;
}
interface ScopeData {
@@ -56,12 +57,29 @@ const GRID_SIZE_OPTIONS = [
},
];
// connect 筛选逻辑补充
const CONNECT_OPTIONS = [
{
label: '全部',
value: 'all',
},
{
label: 'Cluster',
value: 'Connect',
},
{
label: 'Connector',
value: 'Connector',
},
];
const MetricOperateBar = ({
nodeSelect = {},
hideNodeScope = false,
hideGridSelect = false,
onChange: onChangeCallback,
openMetricFilter,
setScreenType,
}: PropsType): JSX.Element => {
const [gridNum, setGridNum] = useState<number>(GRID_SIZE_OPTIONS[1].value);
const [rangeTime, setRangeTime] = useState<[number, number]>(() => {
@@ -139,6 +157,17 @@ const MetricOperateBar = ({
<DRangeTime timeChange={timeChange} rangeTimeArr={rangeTime} />
</div>
<div className="header-right">
{/* connect 单独逻辑 */}
{setScreenType && (
<Select
style={{ width: 120, marginRight: 10 }}
defaultValue="all"
options={CONNECT_OPTIONS}
onChange={(e) => {
setScreenType(e);
}}
/>
)}
{/* 节点范围 */}
{!hideNodeScope && (
<NodeSelect name={nodeSelect.name || ''} onChange={nodeScopeChange}>

View File

@@ -72,7 +72,7 @@ const ChartList = (props: ChartListProps) => {
const { metricName, metricType, metricUnit, metricLines, showLegend } = data;
return (
<div key={metricName} className="dashboard-drag-item-box">
<div key={metricName + metricType} className="dashboard-drag-item-box">
<div className="dashboard-drag-item-box-title">
<Tooltip
placement="topLeft"

View File

@@ -1,10 +0,0 @@
import { useCallback, useState } from 'react';
export function useForceRefresh() {
const [refreshKey, setRefresh] = useState<number>(0);
const forceRefresh: () => void = useCallback(() => {
setRefresh((x) => x + 1);
}, []);
return [refreshKey, forceRefresh];
}

View File

@@ -7,9 +7,6 @@ import { goLogin } from '@src/constants/axiosConfig';
export enum ClustersPermissionMap {
CLUSTERS_MANAGE = '多集群管理',
CLUSTERS_MANAGE_VIEW = '多集群管理查看',
//仅用作隐藏掉系统管理菜单
SYS_MANAGE = '系统管理',
SYS_MANAGE_VIEW = '系统管理查看',
// Cluster
CLUSTER_ADD = '接入集群',
CLUSTER_DEL = '删除集群',
@@ -33,9 +30,6 @@ export enum ClustersPermissionMap {
TOPIC_CANCEL_REPLICATOR = 'Topic-详情-取消Topic复制',
// Consumers
CONSUMERS_RESET_OFFSET = 'Consumers-重置Offset',
GROUP_DELETE = 'Group-删除',
GROUP_TOPIC_DELETE = 'GroupOffset-Topic纬度删除',
GROUP_PARTITION_DELETE = 'GroupOffset-Partition纬度删除',
// Test
TEST_CONSUMER = 'Test-Consumer',
TEST_PRODUCER = 'Test-Producer',
@@ -45,19 +39,6 @@ export enum ClustersPermissionMap {
MM2_DELETE = 'MM2-删除',
MM2_RESTART = 'MM2-重启',
MM2_STOP_RESUME = 'MM2-暂停&恢复',
// Connector
CONNECTOR_ADD = 'Connector-新增',
CONNECTOR_CHANGE_CONFIG = 'Connector-编辑',
CONNECTOR_DELETE = 'Connector-删除',
CONNECTOR_RESTART = 'Connector-重启',
CONNECTOR_STOP_RESUME = 'Connector-暂停&恢复',
// Security
SECURITY_ACL_ADD = 'Security-ACL新增',
SECURITY_ACL_DELETE = 'Security-ACL删除',
SECURITY_USER_ADD = 'Security-User新增',
SECURITY_USER_DELETE = 'Security-User删除',
SECURITY_USER_EDIT_PASSWORD = 'Security-User修改密码',
SECURITY_USER_VIEW_PASSWORD = 'Security-User查看密码',
}
export interface PermissionNode {
@@ -107,11 +88,6 @@ const CommonConfig = () => {
clustersPermissions &&
clustersPermissions.childList.forEach((node: PermissionNode) => node.has && userPermissions.push(node.permissionName));
// 获取用户在系统管理拥有的权限
const configPermissions = userPermissionTree.find((sys: PermissionNode) => sys.permissionName === ClustersPermissionMap.SYS_MANAGE);
configPermissions &&
configPermissions.childList.forEach((node: PermissionNode) => node.has && userPermissions.push(node.permissionName));
const hasPermission = (permissionName: ClustersPermissionMap) => permissionName && userPermissions.includes(permissionName);
setGlobal((curState: any) => ({ ...curState, permissions: allPermissions, userPermissions, hasPermission, userInfo }));

View File

@@ -189,14 +189,7 @@ const StepFormFirst = (props: SubFormProps) => {
const result: FormConnectorConfigs = {
pluginConfig: {},
};
// 获取一份默认配置
const defaultPluginConfig: any = {};
pluginConfig.configs.forEach(({ definition }) => {
// 获取一份默认配置
defaultPluginConfig[definition.name] = definition?.defaultValue;
if (!getExistFormItems(pluginType).includes(definition.name)) {
const pluginConfigs = result.pluginConfig;
const group = definition.group || 'Others';
@@ -212,7 +205,7 @@ const StepFormFirst = (props: SubFormProps) => {
Object.keys(result).length &&
form.setFieldsValue({
configs: { ...result, defaultPluginConfig, editConnectorConfig: result.connectorConfig },
configs: result,
});
})
.finally(() => props.setSubmitLoading(false));
@@ -823,8 +816,6 @@ const StepFormFifth = (props: SubFormProps) => {
<InputNumber />
) : type.toUpperCase() === 'BOOLEAN' ? (
<Switch size="small" />
) : type.toUpperCase() === 'PASSWORD' ? (
<Input.Password />
) : (
<Input />
)}
@@ -956,7 +947,7 @@ export default forwardRef(
success?: {
connectClusterId: number;
connectorName: string;
config: {
configs: {
[key: string]: any;
};
};
@@ -964,7 +955,6 @@ export default forwardRef(
}) => void
) => {
const promises: Promise<any>[] = [];
const compareConfig = stepsFormRef.current[0].getFieldValue('configs'); // 获取步骤一的form信息
Object.values(stepsFormRef.current).forEach((form, i) => {
const promise = form
.validateFields()
@@ -995,22 +985,11 @@ export default forwardRef(
const [k, ...v] = l.split('=');
result[k] = v.join('=');
});
const editConnectorConfig = operateInfo.type === 'edit' ? compareConfig.editConnectorConfig : {}; // 编辑状态时拿到config配置
const newCompareConfig = { ...compareConfig.defaultPluginConfig, ...editConnectorConfig, ...result }; // 整合后的表单提交信息
Object.keys(newCompareConfig).forEach((item) => {
if (
newCompareConfig[item] === compareConfig.defaultPluginConfig[item] ||
newCompareConfig[item]?.toString() === compareConfig.defaultPluginConfig[item]?.toString()
) {
delete newCompareConfig[item]; // 清除默认值
}
});
callback({
success: {
connectClusterId: res[0].connectClusterId,
connectorName: result['name'],
config: newCompareConfig,
configs: result,
},
});
},
@@ -1034,7 +1013,7 @@ export default forwardRef(
curClusterName = cluster.label;
}
});
(jsonRef as any)?.onOpen(operateInfo.type, curClusterName, info.success.config);
(jsonRef as any)?.onOpen(operateInfo.type, curClusterName, info.success.configs);
onClose();
}
});
@@ -1047,9 +1026,9 @@ export default forwardRef(
setCurrentStep(info.error);
} else {
setSubmitLoading(true);
Object.entries(info.success.config).forEach(([key, val]) => {
Object.entries(info.success.configs).forEach(([key, val]) => {
if (val === null) {
delete info.success.config[key];
delete info.success.configs[key];
}
});
Utils.put(api.validateConnectorConfig, info.success).then(

View File

@@ -1,7 +1,7 @@
import api from '@src/api';
import CodeMirrorFormItem from '@src/components/CodeMirrorFormItem';
import customMessage from '@src/components/Message';
import { Button, Divider, Drawer, Form, message, Space, Utils, Select } from 'knowdesign';
import { Button, Divider, Drawer, Form, message, Space, Utils } from 'knowdesign';
import React, { forwardRef, useEffect, useImperativeHandle, useState } from 'react';
import { useParams } from 'react-router-dom';
import { ConnectCluster, ConnectorPlugin, ConnectorPluginConfig, OperateInfo } from './AddConnector';
@@ -9,8 +9,9 @@ import { ConnectCluster, ConnectorPlugin, ConnectorPluginConfig, OperateInfo } f
const PLACEHOLDER = `配置格式如下
{
"name": "", // Connect Cluster 名称
"config": { // 具体配置项
"connectClusterName": "", // Connect Cluster 名称
"configs": { // 具体配置项
"name": "",
"connector.class": "",
"tasks.max": 1,
...
@@ -42,16 +43,11 @@ export default forwardRef((props: any, ref) => {
const onOpen = (type: 'create' | 'edit', connectClusterName?: string, defaultConfigs?: { [key: string]: any }) => {
if (defaultConfigs) {
setDefaultConfigs({ ...defaultConfigs, connectClusterName });
const connectorName = connectClusterName;
const connectClusterId = connectClusters.find((cluster) => cluster.label === connectClusterName).value;
form.setFieldsValue({
connectClusterId,
connectorName,
configs: JSON.stringify(
{
// connectClusterName,
name: defaultConfigs.name,
config: { ...defaultConfigs, name: undefined },
connectClusterName,
configs: defaultConfigs,
},
null,
2
@@ -67,14 +63,13 @@ export default forwardRef((props: any, ref) => {
form.validateFields().then(
(data) => {
const postData = JSON.parse(data.configs);
postData.connectorName = postData.name;
postData.connectClusterId = data.connectClusterId;
postData.config.name = postData.name;
// delete postData.connectClusterName;
delete postData.name;
Object.entries(postData.config).forEach(([key, val]) => {
postData.connectorName = postData.configs.name;
postData.connectClusterId = connectClusters.find((cluster) => cluster.label === postData.connectClusterName).value;
delete postData.connectClusterName;
Object.entries(postData.configs).forEach(([key, val]) => {
if (val === null) {
delete postData.config[key];
delete postData.configs[key];
}
});
Utils.put(api.validateConnectorConfig, postData).then(
@@ -166,26 +161,6 @@ export default forwardRef((props: any, ref) => {
}
>
<Form form={form} layout="vertical">
<Form.Item
name="connectClusterId"
label="Connect 集群"
rules={[
{
required: true,
validator(rule, value) {
if (!value) {
return Promise.reject('Connect 集群不能为空');
} else {
return Promise.resolve();
}
},
},
]}
initialValue={defaultConfigs?.connectClusterId}
className="connector-json-connectCluster"
>
<Select options={connectClusters} placeholder="请选择 Connect 集群" disabled={type === 'edit'} />
</Form.Item>
<Form.Item
name="configs"
validateTrigger="onBlur"
@@ -200,48 +175,57 @@ export default forwardRef((props: any, ref) => {
if (typeof v !== 'object') {
return Promise.reject('输入内容必须为 JSON');
}
let connectClusterId = form.getFieldValue('connectClusterId');
// 校验 connectorName 字段
if (!v.name) {
return Promise.reject('内容缺少 name 项');
let connectClusterId = -1;
// 校验 connectClusterName 字段
if (!v.connectClusterName) {
return Promise.reject('内容缺少 connectClusterName 字段或字段内容为空');
} else {
if (type === 'edit' && v.name !== defaultConfigs.name) {
return Promise.reject('编辑模式下不允许修改 name 字段');
if (type === 'edit') {
if (v.connectClusterName !== defaultConfigs.connectClusterName) {
return Promise.reject('编辑模式下不允许修改 connectClusterName 字段');
}
} else {
if (!connectClusters.length) {
getConnectClusters();
return Promise.reject('connectClusterName 列表获取失败,请重试');
}
const targetConnectCluster = connectClusters.find((cluster) => cluster.label === v.connectClusterName);
if (!targetConnectCluster) {
return Promise.reject('connectClusterName 不存在,请检查');
} else {
connectClusterId = targetConnectCluster.value;
}
}
}
if (!v.config || typeof v.config !== 'object') {
return Promise.reject('内容缺少 config 字段或字段格式错误');
if (!v.configs || typeof v.configs !== 'object') {
return Promise.reject('内容缺少 configs 字段或字段格式错误');
} else {
// // 校验 connectorName 字段
// if (!v.config.name) {
// return Promise.reject('config 字段下缺少 name 项');
// } else {
// if (type === 'edit' && v.config.name !== defaultConfigs.name) {
// return Promise.reject('编辑模式下不允许修改 name 字段');
// }
// }
if (!v.config['connector.class']) {
return Promise.reject('config 字段下缺少 connector.class 项');
} else if (type === 'edit' && v.config['connector.class'] !== defaultConfigs['connector.class']) {
// 校验 connectorName 字段
if (!v.configs.name) {
return Promise.reject('configs 字段下缺少 name 项');
} else {
if (type === 'edit' && v.configs.name !== defaultConfigs.name) {
return Promise.reject('编辑模式下不允许修改 name 字段');
}
}
if (!v.configs['connector.class']) {
return Promise.reject('configs 字段下缺少 connector.class 项');
} else if (type === 'edit' && v.configs['connector.class'] !== defaultConfigs['connector.class']) {
return Promise.reject('编辑模式下不允许修改 connector.class 字段');
}
}
if (type === 'create') {
// 校验创建时是否选择了connect集群
if (!connectClusterId) {
return Promise.reject('请先选择 Connect 集群');
}
// 异步校验 connector 名称是否重复 以及 className 是否存在
return Promise.all([
Utils.request(api.isConnectorExist(connectClusterId, v.config.name)),
Utils.request(api.isConnectorExist(connectClusterId, v.configs.name)),
Utils.request(api.getConnectorPlugins(connectClusterId)),
]).then(
([data, plugins]: [any, ConnectorPlugin[]]) => {
return data?.exist
? Promise.reject('name 与已有 Connector 重复')
: plugins.every((plugin) => plugin.className !== v.config['connector.class'])
: plugins.every((plugin) => plugin.className !== v.configs['connector.class'])
? Promise.reject('该 connectCluster 下不存在 connector.class 项配置的插件')
: Promise.resolve();
},

View File

@@ -1,9 +1,8 @@
import SmallChart from '@src/components/SmallChart';
import TagsWithHide from '@src/components/TagsWithHide';
import { Button, Tag, Tooltip, Utils, Popconfirm, AppContainer } from 'knowdesign';
import { Button, Tag, Tooltip, Utils, Popconfirm } from 'knowdesign';
import React from 'react';
import Delete from './Delete';
import { ClustersPermissionMap } from '../CommonConfig';
export const defaultPagination = {
current: 1,
pageSize: 10,
@@ -94,8 +93,7 @@ const renderLine = (record: any, metricName: string) => {
};
export const getConnectorsColumns = (arg?: any) => {
const [global] = AppContainer.useGlobalValue();
const columns: any = [
const columns = [
{
title: 'Connector Name',
dataIndex: 'connectorName',
@@ -215,10 +213,7 @@ export const getConnectorsColumns = (arg?: any) => {
return t && t.length > 0 ? <TagsWithHide placement="bottom" list={t} expandTagContent={(num: any) => `共有${num}`} /> : '-';
},
},
];
if (global.hasPermission) {
columns.push({
{
title: '操作',
dataIndex: 'options',
key: 'options',
@@ -229,24 +224,20 @@ export const getConnectorsColumns = (arg?: any) => {
render: (_t: any, r: any) => {
return (
<div>
{global.hasPermission(ClustersPermissionMap.CONNECTOR_RESTART) ? (
<Popconfirm
title="是否重启当前任务?"
onConfirm={() => arg?.optionConnect(r, 'restart')}
// onCancel={cancel}
okText=""
cancelText="否"
overlayClassName="connect-popconfirm"
>
<Button key="restart" type="link" size="small">
</Button>
</Popconfirm>
) : (
<></>
)}
<Popconfirm
title="是否重启当前任务?"
onConfirm={() => arg?.optionConnect(r, 'restart')}
// onCancel={cancel}
okText="是"
cancelText=""
overlayClassName="connect-popconfirm"
>
<Button key="restart" type="link" size="small">
</Button>
</Popconfirm>
{global.hasPermission(ClustersPermissionMap.CONNECTOR_STOP_RESUME) && (r.state === 'RUNNING' || r.state === 'PAUSED') && (
{(r.state === 'RUNNING' || r.state === 'PAUSED') && (
<Popconfirm
title={`是否${r.state === 'RUNNING' ? '暂停' : '继续'}当前任务?`}
onConfirm={() => arg?.optionConnect(r, r.state === 'RUNNING' ? 'stop' : 'resume')}
@@ -261,24 +252,16 @@ export const getConnectorsColumns = (arg?: any) => {
</Button>
</Popconfirm>
)}
{global.hasPermission(ClustersPermissionMap.CONNECTOR_CHANGE_CONFIG) ? (
<Button type="link" size="small" onClick={() => arg?.editConnector(r)}>
</Button>
) : (
<></>
)}
{global.hasPermission(ClustersPermissionMap.CONNECTOR_DELETE) ? (
<Delete record={r} onConfirm={arg?.deleteTesk}></Delete>
) : (
<></>
)}
<Button type="link" size="small" onClick={() => arg?.editConnector(r)}>
</Button>
<Delete record={r} onConfirm={arg?.deleteTesk}></Delete>
</div>
);
},
});
}
},
];
return columns;
};
@@ -315,7 +298,6 @@ export const getWorkersColumns = (arg?: any) => {
// Detail
export const getConnectorsDetailColumns = (arg?: any) => {
const [global] = AppContainer.useGlobalValue();
const columns = [
{
title: 'Task ID',
@@ -364,20 +346,16 @@ export const getConnectorsDetailColumns = (arg?: any) => {
render: (_t: any, r: any) => {
return (
<div>
{global.hasPermission(ClustersPermissionMap.CONNECTOR_RESTART) ? (
<Popconfirm
title="是否重试当前任务?"
onConfirm={() => arg?.retryOption(r.taskId)}
// onCancel={cancel}
okText=""
cancelText="否"
overlayClassName="connect-popconfirm"
>
<a></a>
</Popconfirm>
) : (
<></>
)}
<Popconfirm
title="是否重试当前任务?"
onConfirm={() => arg?.retryOption(r.taskId)}
// onCancel={cancel}
okText="是"
cancelText=""
overlayClassName="connect-popconfirm"
>
<a></a>
</Popconfirm>
</div>
);
},

View File

@@ -185,10 +185,9 @@
.operate-connector-drawer-use-json {
.CodeMirror.cm-s-default {
height: calc(100vh - 196px);
height: calc(100vh - 146px);
}
.dcloud-form-item {
margin-top: 16px;
margin-bottom: 0 !important;
}
}

View File

@@ -12,7 +12,6 @@ import notification from '@src/components/Notification';
import './index.less';
import AddConnectorUseJSON from './AddConnectorUseJSON';
import HasConnector from './HasConnector';
import { ClustersPermissionMap } from '../CommonConfig';
const { request } = Utils;
const rateMap: any = {
@@ -175,25 +174,21 @@ const Connectors: React.FC = () => {
maxLength: 128,
}}
/>
{global.hasPermission && global.hasPermission(ClustersPermissionMap.CONNECTOR_ADD) ? (
<span className="add-connect">
<Button
className="add-connect-btn"
icon={<IconFont type="icon-jiahao" />}
type="primary"
onClick={() => addConnectorRef.current?.onOpen('create', addConnectorJsonRef.current)}
>
Connector
<span className="add-connect">
<Button
className="add-connect-btn"
icon={<IconFont type="icon-jiahao" />}
type="primary"
onClick={() => addConnectorRef.current?.onOpen('create', addConnectorJsonRef.current)}
>
Connector
</Button>
<Dropdown overlayClassName="add-connect-dropdown-menu" overlay={menu}>
<Button className="add-connect-json" type="primary">
<IconFont type="icon-guanwangxiala" />
</Button>
<Dropdown overlayClassName="add-connect-dropdown-menu" overlay={menu}>
<Button className="add-connect-json" type="primary">
<IconFont type="icon-guanwangxiala" />
</Button>
</Dropdown>
</span>
) : (
<></>
)}
</Dropdown>
</span>
</div>
</div>
<ProTable

View File

@@ -1,104 +0,0 @@
import React, { useState } from 'react';
import { useParams } from 'react-router-dom';
import { Button, Form, Input, Modal, Utils } from 'knowdesign';
import notification from '@src/components/Notification';
import Api from '@src/api/index';
// eslint-disable-next-line react/display-name
export default (props: { record: any; onConfirm?: () => void }) => {
const { record, onConfirm } = props;
const routeParams = useParams<{
clusterId: string;
}>();
const [form] = Form.useForm();
const [delDialogVisible, setDelDialogVisble] = useState(false);
const handleDelOk = () => {
form.validateFields().then((e) => {
const formVal = form.getFieldsValue();
formVal.clusterPhyId = Number(routeParams.clusterId);
formVal.deleteType = 0;
Utils.delete(Api.deleteGroupOffset(), { data: formVal }).then((res: any) => {
if (res === null) {
notification.success({
message: '删除消费组成功',
});
setDelDialogVisble(false);
onConfirm && onConfirm();
} else {
notification.error({
message: '删除消费组失败',
});
}
});
});
};
return (
<>
<Button
style={{ paddingLeft: 0 }}
type="link"
onClick={(_) => {
setDelDialogVisble(true);
}}
>
</Button>
<Modal
className="custom-modal"
title="确定要删除此Topic吗"
centered={true}
visible={delDialogVisible}
wrapClassName="del-topic-modal"
destroyOnClose={true}
maskClosable={false}
onOk={handleDelOk}
onCancel={(_) => {
setDelDialogVisble(false);
}}
okText="删除"
okButtonProps={{
danger: true,
size: 'small',
style: {
paddingLeft: '16px',
paddingRight: '16px',
},
}}
cancelButtonProps={{
size: 'small',
style: {
paddingLeft: '16px',
paddingRight: '16px',
},
}}
>
{/* <div className="tip-info">
<IconFont type="icon-warning-circle"></IconFont>
<span>会删除Topic的全部消息数据和ACL权限请再次输入Topic名称进行确认</span>
</div> */}
<Form form={form} labelCol={{ span: 5 }} style={{ marginTop: 18 }}>
<Form.Item label="TopicName">{record.name}</Form.Item>
<Form.Item
name="groupName"
label="GroupName"
rules={[
// { required: true },
() => ({
validator(_, value) {
if (!value) {
return Promise.reject(new Error('请输入Group名称'));
} else if (value !== record.name) {
return Promise.reject(new Error('请输入正确的Group名称'));
}
return Promise.resolve();
},
}),
]}
>
<Input placeholder="请输入" size="small"></Input>
</Form.Item>
</Form>
</Modal>
</>
);
};

View File

@@ -1,13 +1,12 @@
import React, { useState, useEffect } from 'react';
import { useParams, useHistory } from 'react-router-dom';
import { Button, Space, Divider, Drawer, ProTable, Utils, notification } from 'knowdesign';
import { Drawer, ProTable, Utils } from 'knowdesign';
import { IconFont } from '@knowdesign/icons';
import API from '@src/api/index';
import { defaultPagination, hashDataParse } from '@src/constants/common';
import { getGtoupTopicColumns } from './config';
import { ExpandedRow } from './ExpandedRow';
import ResetOffsetDrawer from './ResetOffsetDrawer';
import { useForceRefresh } from '@src/components/utils';
const { request } = Utils;
export interface MetricLine {
@@ -64,7 +63,6 @@ const GroupDetail = (props: any) => {
const [openKeys, setOpenKeys] = useState();
const [resetOffsetVisible, setResetOffsetVisible] = useState(false);
const [resetOffsetArg, setResetOffsetArg] = useState({});
const [refreshKey, forceRefresh] = useForceRefresh();
const genData = async ({ pageNo, pageSize, groupName }: any) => {
if (urlParams?.clusterId === undefined) return;
@@ -112,23 +110,6 @@ const GroupDetail = (props: any) => {
groupName: record?.groupName,
});
};
// 删除消费组Topic
const deleteOffset = (record: any) => {
const params = {
clusterPhyId: +urlParams?.clusterId,
deleteType: 1, // 0:group纬度1Topic纬度2Partition纬度
groupName: record.groupName,
topicName: record.topicName,
};
Utils.delete(API.deleteGroupOffset(), { data: params }).then((data: any) => {
if (data === null) {
notification.success({
message: '删除Topic成功!',
});
genData({ pageNo: 1, pageSize: pagination.pageSize, groupName: hashData.groupName });
}
});
};
const onTableChange = (pagination: any, filters: any, sorter: any) => {
genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, sorter, groupName: hashData.groupName });
@@ -179,7 +160,7 @@ const GroupDetail = (props: any) => {
// // 获取Consumer列表 表格模式
// getTopicGroupMetric(hashData);
// });
}, [hashDataParse(location.hash).groupName, refreshKey]);
}, [hashDataParse(location.hash).groupName]);
return (
<Drawer
@@ -201,14 +182,6 @@ const GroupDetail = (props: any) => {
// <Divider type="vertical" />
// </Space>
// }
extra={
<Space>
<span style={{ display: 'inline-block', fontSize: '15px' }} onClick={forceRefresh as () => void}>
<i className="iconfont icon-shuaxin1" style={{ cursor: 'pointer' }} />
</span>
<Divider type="vertical" />
</Space>
}
>
<ProTable
showQueryForm={false}
@@ -216,7 +189,7 @@ const GroupDetail = (props: any) => {
showHeader: false,
rowKey: 'key',
loading: loading,
columns: getGtoupTopicColumns({ resetOffset, deleteOffset }),
columns: getGtoupTopicColumns({ resetOffset }),
dataSource: topicData,
paginationProps: { ...pagination },
// noPagination: true,
@@ -236,7 +209,6 @@ const GroupDetail = (props: any) => {
chartData={chartData}
groupName={hashDataParse(location.hash).groupName}
loading={loadingObj}
refreshKey={refreshKey}
/>
),
// expandedRowRender,
@@ -269,12 +241,7 @@ const GroupDetail = (props: any) => {
},
}}
/>
<ResetOffsetDrawer
visible={resetOffsetVisible}
setVisible={setResetOffsetVisible}
record={resetOffsetArg}
resetOffsetFn={forceRefresh}
></ResetOffsetDrawer>
<ResetOffsetDrawer visible={resetOffsetVisible} setVisible={setResetOffsetVisible} record={resetOffsetArg}></ResetOffsetDrawer>
</Drawer>
);
};

View File

@@ -41,7 +41,7 @@ const metricWithType = [
{ metricName: 'Lag', metricType: 102 },
];
export const ExpandedRow: any = ({ record, groupName, refreshKey }: any) => {
export const ExpandedRow: any = ({ record, groupName }: any) => {
const params: any = useParams<{
clusterId: string;
}>();
@@ -193,7 +193,7 @@ export const ExpandedRow: any = ({ record, groupName, refreshKey }: any) => {
endTime: timeRange[1],
topNu: 0,
};
Utils.post(API.getTopicGroupMetricHistory(clusterId), params, { timeout: 300000 }).then((data: Array<MetricData>) => {
Utils.post(API.getTopicGroupMetricHistory(clusterId), params).then((data: Array<MetricData>) => {
// ! 替换接口返回
setAllGroupMetricsData(data);
});
@@ -210,6 +210,10 @@ export const ExpandedRow: any = ({ record, groupName, refreshKey }: any) => {
getTopicGroupMetric({ pagination, sorter });
};
// useEffect(() => {
// getTopicGroupMetric();
// }, [sortObj]);
useEffect(() => {
const hashData = hashDataParse(location.hash);
// if (!hashData.groupName) return;
@@ -238,7 +242,7 @@ export const ExpandedRow: any = ({ record, groupName, refreshKey }: any) => {
// 获取Consumer列表 表格模式
getTopicGroupMetric({});
});
}, [hashDataParse(location.hash).groupName, refreshKey]);
}, [hashDataParse(location.hash).groupName]);
useEffect(() => {
if (partitionList.length === 0) return;

View File

@@ -19,19 +19,18 @@ const CustomSelectResetTime = (props: { value?: string; onChange?: (val: Number
}}
onChange={(e) => {
setTimeSetMode(e.target.value);
if (e.target.value === 'newest' || e.target.value === 'oldest') {
onChange(e.target.value);
if (e.target.value === 'newest') {
onChange('newest');
}
}}
value={timeSetMode}
>
<Radio value={'newest'}>Offset</Radio>
<Radio value={'oldest'}>Offset</Radio>
<Radio value={'custom'}></Radio>
</Radio.Group>
{timeSetMode === 'custom' && (
<DatePicker
value={moment(value === 'newest' || value === 'oldest' ? Date.now() : value)}
value={moment(value === 'newest' ? Date.now() : value)}
style={{ width: '100%' }}
showTime={true}
onChange={(v) => {
@@ -44,7 +43,7 @@ const CustomSelectResetTime = (props: { value?: string; onChange?: (val: Number
};
export default (props: any) => {
const { record, visible, setVisible, resetOffsetFn } = props;
const { record, visible, setVisible } = props;
const routeParams = useParams<{
clusterId: string;
}>();
@@ -89,7 +88,7 @@ export default (props: any) => {
topicName: record.topicName,
};
if (formData.resetType === 'assignedTime') {
resetParams.resetType = formData.timestamp === 'newest' ? 0 : formData.timestamp === 'oldest' ? 1 : 2;
resetParams.resetType = formData.timestamp === 'newest' ? 0 : 2;
if (resetParams.resetType === 2) {
resetParams.timestamp = formData.timestamp;
}
@@ -106,8 +105,6 @@ export default (props: any) => {
message: '重置offset成功',
});
setVisible(false);
// 发布重置offset成功的消息
resetOffsetFn();
} else {
notification.error({
message: '重置offset失败',

View File

@@ -1,9 +1,8 @@
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
import React from 'react';
import { AppContainer, Button, Popconfirm } from 'knowdesign';
import { AppContainer } from 'knowdesign';
import TagsWithHide from '@src/components/TagsWithHide';
import { ClustersPermissionMap } from '../CommonConfig';
import Delete from './Delete';
export const runningStatusEnum: any = {
1: 'Doing',
@@ -22,8 +21,7 @@ export const defaultPagination = {
};
export const getGroupColumns = (arg?: any) => {
const [global] = AppContainer.useGlobalValue();
const columns: any = [
const columns = [
{
title: 'ConsumerGroup',
dataIndex: 'name',
@@ -65,23 +63,6 @@ export const getGroupColumns = (arg?: any) => {
render: (t: number) => (t ? t.toLocaleString() : '-'),
},
];
if (global.hasPermission && global.hasPermission(ClustersPermissionMap.GROUP_DELETE)) {
columns.push({
title: '操作',
dataIndex: 'options',
key: 'options',
width: 200,
filterTitle: true,
fixed: 'right',
render: (_t: any, r: any) => {
return (
<div>
<Delete record={r} onConfirm={arg?.deleteTesk}></Delete>
</div>
);
},
});
}
return columns;
};
@@ -117,33 +98,16 @@ export const getGtoupTopicColumns = (arg?: any) => {
render: (t: number) => (t ? t.toLocaleString() : '-'),
},
];
if (global.hasPermission) {
if (global.hasPermission && global.hasPermission(ClustersPermissionMap.CONSUMERS_RESET_OFFSET)) {
columns.push({
title: '操作',
dataIndex: 'desc',
key: 'desc',
width: 200,
width: 150,
render: (value: any, record: any) => {
return (
<div>
{global.hasPermission(ClustersPermissionMap.CONSUMERS_RESET_OFFSET) ? (
<a onClick={() => arg.resetOffset(record)}>Offset</a>
) : (
<></>
)}
{global.hasPermission(ClustersPermissionMap.GROUP_TOPIC_DELETE) ? (
<Popconfirm
placement="top"
title={`是否要删除当前Topic`}
onConfirm={() => arg.deleteOffset(record)}
okText="是"
cancelText="否"
>
<Button type="link"></Button>
</Popconfirm>
) : (
<></>
)}
<a onClick={() => arg.resetOffset(record)}>Offset</a>
</div>
);
},

View File

@@ -58,11 +58,6 @@ const BrokerList: React.FC = (props: any) => {
genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, sorter });
};
// 删除Group
const deleteTesk = () => {
genData({ pageNo: 1, pageSize: pagination.pageSize });
};
useEffect(() => {
genData({
pageNo: 1,
@@ -120,7 +115,7 @@ const BrokerList: React.FC = (props: any) => {
showHeader: false,
rowKey: 'group_list',
loading: loading,
columns: getGroupColumns(deleteTesk),
columns: getGroupColumns(),
dataSource: data,
paginationProps: { ...pagination },
attrs: {

View File

@@ -22,19 +22,18 @@ const CustomSelectResetTime = (props: { value?: string; onChange?: (val: number
}}
onChange={(e) => {
setTimeSetMode(e.target.value);
if (e.target.value === 'newest' || e.target.value === 'oldest') {
onChange(e.target.value);
if (e.target.value === 'newest') {
onChange('newest');
}
}}
value={timeSetMode}
>
<Radio value={'newest'}>Offset</Radio>
<Radio value={'oldest'}>Offset</Radio>
<Radio value={'custom'}></Radio>
</Radio.Group>
{timeSetMode === 'custom' && (
<DatePicker
value={moment(value === 'newest' || value === 'oldest' ? Date.now() : value)}
value={moment(value === 'newest' ? Date.now() : value)}
style={{ width: '100%' }}
showTime={true}
onChange={(v) => {
@@ -92,7 +91,7 @@ export default (props: any) => {
topicName: record.topicName,
};
if (formData.resetType === 'assignedTime') {
resetParams.resetType = formData.timestamp === 'newest' ? 0 : formData.timestamp === 'oldest' ? 1 : 2;
resetParams.resetType = formData.timestamp === 'newest' ? 0 : 2;
if (resetParams.resetType === 2) {
resetParams.timestamp = formData.timestamp;
}

View File

@@ -13,7 +13,7 @@ const carouselList = [
<img className="carousel-eg-ctr-two-img img-one" src={egTwoContent} />
<div className="carousel-eg-ctr-two-desc desc-one">
<span>Github: </span>
<span>6.8K</span>
<span>5.8K</span>
<span>+ Star的的实时流处理平台</span>
</div>
<div className="carousel-eg-ctr-two-desc desc-two">

View File

@@ -185,7 +185,7 @@
.operate-connector-drawer-use-json {
.CodeMirror.cm-s-default {
height: calc(100vh - 196px);
height: calc(100vh - 146px);
}
.dcloud-form-item {
margin-bottom: 0 !important;

View File

@@ -522,22 +522,28 @@ const ConnectorForm = (props: {
const params = {
...values,
id: initFieldsValue?.id,
jmxProperties: values.jmxProperties ? `{ "jmxPort": "${values.jmxProperties}" }` : undefined,
jmxProperties: values.jmxProperties ? `{ "jmxProperties": "${values.jmxProperties}" }` : undefined,
};
Utils.put(api.batchConnectClusters, [params]).then((res) => {
// setSelectedTabKey(undefined);
getConnectClustersList();
notification.success({
message: '修改Connect集群成功',
Utils.put(api.batchConnectClusters, [params])
.then((res) => {
// setSelectedTabKey(undefined);
getConnectClustersList();
notification.success({
message: '修改Connect集群成功',
});
})
.catch((error) => {
notification.success({
message: '修改Connect集群失败',
});
});
});
};
const onCancel = () => {
setSelectedTabKey(undefined);
try {
const jmxPortInfo = JSON.parse(initFieldsValue.jmxProperties) || {};
form.setFieldsValue({ ...initFieldsValue, jmxProperties: jmxPortInfo.jmxPort });
form.setFieldsValue({ ...initFieldsValue, jmxProperties: jmxPortInfo.jmxProperties });
} catch {
form.setFieldsValue({ ...initFieldsValue });
}
@@ -546,7 +552,7 @@ const ConnectorForm = (props: {
useLayoutEffect(() => {
try {
const jmxPortInfo = JSON.parse(initFieldsValue.jmxProperties) || {};
form.setFieldsValue({ ...initFieldsValue, jmxProperties: jmxPortInfo.jmxPort });
form.setFieldsValue({ ...initFieldsValue, jmxProperties: jmxPortInfo.jmxProperties });
} catch {
form.setFieldsValue({ ...initFieldsValue });
}

View File

@@ -252,6 +252,7 @@ const ClusterList = (props: { searchParams: SearchParams; showAccessCluster: any
const {
Brokers: brokers,
Zookeepers: zks,
// ConnectionsCount: connect,
HealthCheckPassed: healthCheckPassed,
HealthCheckTotal: healthCheckTotal,
HealthState: healthState,
@@ -352,6 +353,18 @@ const ClusterList = (props: { searchParams: SearchParams; showAccessCluster: any
<div className="indicator-left-item-value">{zookeepersAvailable === -1 ? '-' : zks}</div>
</div>
)}
{/* <div className="indicator-left-item">
<div className="indicator-left-item-title">
<span
className="indicator-left-item-title-dot"
style={{
background: itemData.latestMetrics?.metrics?.BrokersNotAlive ? '#FF7066' : '#34C38F',
}}
></span>
Connect
</div>
<div className="indicator-left-item-value">{connect}</div>
</div> */}
</div>
<div className="indicator-right">
{metricPoints.map((row, index) => {

Some files were not shown because too many files have changed in this diff Show More