diff --git a/README.md b/README.md
index aaa7e1d8..77729c25 100644
--- a/README.md
+++ b/README.md
@@ -67,11 +67,16 @@
- [滴滴Logi-KafkaManager 系列视频教程](https://mp.weixin.qq.com/s/9X7gH0tptHPtfjPPSdGO8g)
- [kafka实践(十五):滴滴开源Kafka管控平台 Logi-KafkaManager研究--A叶子叶来](https://blog.csdn.net/yezonggang/article/details/113106244)
-## 3 滴滴Logi开源用户钉钉交流群
+## 3 滴滴Logi开源用户交流群
+
+
+
+微信加群:关注公众号 Obsuite(官方公众号) 回复 "Logi加群"

- 钉钉群ID:32821440
-
+钉钉群ID:32821440
+
+
## 4 OCE认证
OCE是一个认证机制和交流平台,为滴滴Logi-KafkaManager生产用户量身打造,我们会为OCE企业提供更好的技术支持,比如专属的技术沙龙、企业一对一的交流机会、专属的答疑群等,如果贵司Logi-KafkaManager上了生产,[快来加入吧](http://obsuite.didiyun.com/open/openAuth)
diff --git a/Releases_Notes.md b/Releases_Notes.md
new file mode 100644
index 00000000..46b5753e
--- /dev/null
+++ b/Releases_Notes.md
@@ -0,0 +1,97 @@
+
+---
+
+
+
+**一站式`Apache Kafka`集群指标监控与运维管控平台**
+
+---
+
+## v2.3.0
+
+版本上线时间:2021-02-08
+
+
+### 能力提升
+
+- 新增支持docker化部署
+- 可指定Broker作为候选controller
+- 可新增并管理网关配置
+- 可获取消费组状态
+- 增加集群的JMX认证
+
+### 体验优化
+
+- 优化编辑用户角色、修改密码的流程
+- 新增consumerID的搜索功能
+- 优化“Topic连接信息”、“消费组重置消费偏移”、“修改Topic保存时间”的文案提示
+- 在相应位置增加《资源申请文档》链接
+
+### bug修复
+
+- 修复Broker监控图表时间轴展示错误的问题
+- 修复创建夜莺监控告警规则时,使用的告警周期的单位不正确的问题
+
+
+
+## v2.2.0
+
+版本上线时间:2021-01-25
+
+
+
+### 能力提升
+
+- 优化工单批量操作流程
+- 增加获取Topic75分位/99分位的实时耗时数据
+- 增加定时任务,可将无主未落DB的Topic定期写入DB
+
+### 体验优化
+
+- 在相应位置增加《集群接入文档》链接
+- 优化物理集群、逻辑集群含义
+- 在Topic详情页、Topic扩分区操作弹窗增加展示Topic所属Region的信息
+- 优化Topic审批时,Topic数据保存时间的配置流程
+- 优化Topic/应用申请、审批时的错误提示文案
+- 优化Topic数据采样的操作项文案
+- 优化运维人员删除Topic时的提示文案
+- 优化运维人员删除Region的删除逻辑与提示文案
+- 优化运维人员删除逻辑集群的提示文案
+- 优化上传集群配置文件时的文件类型限制条件
+
+### bug修复
+
+- 修复填写应用名称时校验特殊字符出错的问题
+- 修复普通用户越权访问应用详情的问题
+- 修复由于Kafka版本升级,导致的数据压缩格式无法获取的问题
+- 修复删除逻辑集群或Topic之后,界面依旧展示的问题
+- 修复进行Leader rebalance操作时执行结果重复提示的问题
+
+
+## v2.1.0
+
+版本上线时间:2020-12-19
+
+
+
+### 体验优化
+
+- 优化页面加载时的背景样式
+- 优化普通用户申请Topic权限的流程
+- 优化Topic申请配额、申请分区的权限限制
+- 优化取消Topic权限的文案提示
+- 优化申请配额表单的表单项名称
+- 优化重置消费偏移的操作流程
+- 优化创建Topic迁移任务的表单内容
+- 优化Topic扩分区操作的弹窗界面样式
+- 优化集群Broker监控可视化图表样式
+- 优化创建逻辑集群的表单内容
+- 优化集群安全协议的提示文案
+
+### bug修复
+
+- 修复偶发性重置消费偏移失败的问题
+
+
+
+
diff --git a/build.sh b/build.sh
index 03b1087e..a6e86d7d 100644
--- a/build.sh
+++ b/build.sh
@@ -4,7 +4,7 @@ cd $workspace
## constant
OUTPUT_DIR=./output
-KM_VERSION=2.2.0
+KM_VERSION=2.3.1
APP_NAME=kafka-manager
APP_DIR=${APP_NAME}-${KM_VERSION}
diff --git a/container/dockerfiles/Dockerfile b/container/dockerfiles/Dockerfile
index d8a3d158..1ffe27e4 100644
--- a/container/dockerfiles/Dockerfile
+++ b/container/dockerfiles/Dockerfile
@@ -1,14 +1,11 @@
-FROM openjdk:8-jdk-alpine3.9
+FROM openjdk:16-jdk-alpine3.13
LABEL author="yangvipguang"
-ENV VERSION 2.1.0
-ENV JAR_PATH kafka-manager-web/target
-COPY $JAR_PATH/kafka-manager-web-$VERSION-SNAPSHOT.jar /tmp/app.jar
-COPY $JAR_PATH/application.yml /km/
+ENV VERSION 2.3.1
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
-RUN apk add --no-cache --virtual .build-deps \
+RUN apk add --no-cache --virtual .build-deps \
font-adobe-100dpi \
ttf-dejavu \
fontconfig \
@@ -19,26 +16,28 @@ RUN apk add --no-cache --virtual .build-deps \
tomcat-native \
&& apk del .build-deps
+RUN apk add --no-cache tini
+
+
+
+
ENV AGENT_HOME /opt/agent/
WORKDIR /tmp
+
+COPY $JAR_PATH/kafka-manager.jar app.jar
+# COPY application.yml application.yml ##默认使用helm 挂载,防止敏感配置泄露
+
COPY docker-depends/config.yaml $AGENT_HOME
-COPY docker-depends/jmx_prometheus_javaagent-0.14.0.jar $AGENT_HOME
-
-ENV JAVA_AGENT="-javaagent:$AGENT_HOME/jmx_prometheus_javaagent-0.14.0.jar=9999:$AGENT_HOME/config.yaml"
+COPY docker-depends/jmx_prometheus_javaagent-0.15.0.jar $AGENT_HOME
+ENV JAVA_AGENT="-javaagent:$AGENT_HOME/jmx_prometheus_javaagent-0.15.0.jar=9999:$AGENT_HOME/config.yaml"
ENV JAVA_HEAP_OPTS="-Xms1024M -Xmx1024M -Xmn100M "
-
ENV JAVA_OPTS="-verbose:gc \
- -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintHeapAtGC -Xloggc:/tmp/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps \
-XX:MaxMetaspaceSize=256M -XX:+DisableExplicitGC -XX:+UseStringDeduplication \
-XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:-UseContainerSupport"
-#-Xlog:gc -Xlog:gc* -Xlog:gc+heap=trace -Xlog:safepoint
-
EXPOSE 8080 9999
-ENTRYPOINT ["sh","-c","java -jar $JAVA_HEAP_OPTS $JAVA_OPTS /tmp/app.jar --spring.config.location=/km/application.yml"]
-
-## 默认不带Prometheus JMX监控,需要可以自行取消以下注释并注释上面一行默认Entrypoint 命令。
-## ENTRYPOINT ["sh","-c","java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS /tmp/app.jar --spring.config.location=/km/application.yml"]
+ENTRYPOINT ["tini", "--"]
+CMD ["sh","-c","java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS app.jar --spring.config.location=application.yml"]
diff --git a/container/dockerfiles/docker-depends/jmx_prometheus_javaagent-0.14.0.jar b/container/dockerfiles/docker-depends/jmx_prometheus_javaagent-0.14.0.jar
new file mode 100644
index 00000000..54b633bf
Binary files /dev/null and b/container/dockerfiles/docker-depends/jmx_prometheus_javaagent-0.14.0.jar differ
diff --git a/container/dockerfiles/docker-depends/jmx_prometheus_javaagent-0.15.0.jar b/container/dockerfiles/docker-depends/jmx_prometheus_javaagent-0.15.0.jar
new file mode 100644
index 00000000..d896a217
Binary files /dev/null and b/container/dockerfiles/docker-depends/jmx_prometheus_javaagent-0.15.0.jar differ
diff --git a/docs/dev_guide/dynamic_config_manager.md b/docs/dev_guide/dynamic_config_manager.md
index 4e5d6528..0965cb48 100644
--- a/docs/dev_guide/dynamic_config_manager.md
+++ b/docs/dev_guide/dynamic_config_manager.md
@@ -9,6 +9,13 @@
# 动态配置管理
+## 0、目录
+
+- 1、Topic定时同步任务
+- 2、专家服务——Topic分区热点
+- 3、专家服务——Topic分区不足
+
+
## 1、Topic定时同步任务
### 1.1、配置的用途
@@ -63,3 +70,53 @@ task:
]
```
+---
+
+## 2、专家服务——Topic分区热点
+
+在`Region`所圈定的Broker范围内,某个Topic的Leader数在这些圈定的Broker上分布不均衡时,我们认为该Topic是存在热点的Topic。
+
+备注:单纯的查看Leader数的分布,确实存在一定的局限性,这块欢迎贡献更多的热点定义于代码。
+
+
+Topic分区热点相关的动态配置(页面在运维管控->平台管理->配置管理):
+
+配置Key:
+```
+REGION_HOT_TOPIC_CONFIG
+```
+
+配置Value:
+```json
+{
+ "maxDisPartitionNum": 2, # Region内Broker间的leader数差距超过2时,则认为是存在热点的Topic
+ "minTopicBytesInUnitB": 1048576, # 流量低于该值的Topic不做统计
+ "ignoreClusterIdList": [ # 忽略的集群
+ 50
+ ]
+}
+```
+
+---
+
+## 3、专家服务——Topic分区不足
+
+总流量除以分区数,超过指定值时,则我们认为存在Topic分区不足。
+
+Topic分区不足相关的动态配置(页面在运维管控->平台管理->配置管理):
+
+配置Key:
+```
+TOPIC_INSUFFICIENT_PARTITION_CONFIG
+```
+
+配置Value:
+```json
+{
+ "maxBytesInPerPartitionUnitB": 3145728, # 单分区流量超过该值, 则认为分区不去
+ "minTopicBytesInUnitB": 1048576, # 流量低于该值的Topic不做统计
+ "ignoreClusterIdList": [ # 忽略的集群
+ 50
+ ]
+}
+```
\ No newline at end of file
diff --git a/docs/dev_guide/gateway_config_manager.md b/docs/dev_guide/gateway_config_manager.md
new file mode 100644
index 00000000..8c656531
--- /dev/null
+++ b/docs/dev_guide/gateway_config_manager.md
@@ -0,0 +1,10 @@
+
+---
+
+
+
+**一站式`Apache Kafka`集群指标监控与运维管控平台**
+
+---
+
+# Kafka-Gateway 配置说明
\ No newline at end of file
diff --git a/docs/dev_guide/upgrade_manual/logi-km-v2.3.0.md b/docs/dev_guide/upgrade_manual/logi-km-v2.3.0.md
new file mode 100644
index 00000000..3a4196f8
--- /dev/null
+++ b/docs/dev_guide/upgrade_manual/logi-km-v2.3.0.md
@@ -0,0 +1,17 @@
+
+---
+
+
+
+**一站式`Apache Kafka`集群指标监控与运维管控平台**
+
+---
+
+# 升级至`2.3.0`版本
+
+`2.3.0`版本在`gateway_config`表增加了一个描述说明的字段,因此需要执行下面的sql进行字段的增加。
+
+```sql
+ALTER TABLE `gateway_config`
+ADD COLUMN `description` TEXT NULL COMMENT '描述信息' AFTER `version`;
+```
diff --git a/docs/install_guide/create_mysql_table.sql b/docs/install_guide/create_mysql_table.sql
index 2a015de1..065532eb 100644
--- a/docs/install_guide/create_mysql_table.sql
+++ b/docs/install_guide/create_mysql_table.sql
@@ -203,7 +203,8 @@ CREATE TABLE `gateway_config` (
`type` varchar(128) NOT NULL DEFAULT '' COMMENT '配置类型',
`name` varchar(128) NOT NULL DEFAULT '' COMMENT '配置名称',
`value` text COMMENT '配置值',
- `version` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '版本信息',
+ `version` bigint(20) unsigned NOT NULL DEFAULT '1' COMMENT '版本信息',
+ `description` text COMMENT '描述信息',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
diff --git a/docs/install_guide/install_guide_nginx_cn.md b/docs/install_guide/install_guide_nginx_cn.md
new file mode 100644
index 00000000..ad55f947
--- /dev/null
+++ b/docs/install_guide/install_guide_nginx_cn.md
@@ -0,0 +1,94 @@
+---
+
+
+
+**一站式`Apache Kafka`集群指标监控与运维管控平台**
+
+---
+
+## nginx配置-安装手册
+
+# 一、独立部署
+
+请参考参考:[kafka-manager 安装手册](install_guide_cn.md)
+
+# 二、nginx配置
+
+## 1、独立部署配置
+
+```
+ #nginx 根目录访问配置如下
+ location / {
+ proxy_pass http://ip:port;
+ }
+```
+
+## 2、前后端分离&配置多个静态资源
+
+以下配置解决`nginx代理多个静态资源`,实现项目前后端分离,版本更新迭代。
+
+### 1、源码下载
+
+根据所需版本下载对应代码,下载地址:[Github 下载地址](https://github.com/didi/Logi-KafkaManager)
+
+### 2、修改webpack.config.js 配置文件
+
+修改`kafka-manager-console`模块 `webpack.config.js`
+以下所有xxxx为nginx代理路径和打包静态文件加载前缀,xxxx可根据需求自行更改。
+
+```
+ cd kafka-manager-console
+ vi webpack.config.js
+
+ # publicPath默认打包方式根目录下,修改为nginx代理访问路径。
+ let publicPath = '/xxxx';
+```
+
+### 3、打包
+
+```
+
+ npm cache clean --force && npm install
+
+```
+
+ps:如果打包过程中报错,运行`npm install clipboard@2.0.6`,相反请忽略!
+
+### 4、部署
+
+#### 1、前段静态文件部署
+
+静态资源 `../kafka-manager-web/src/main/resources/templates`
+
+上传到指定目录,目前以`root目录`做demo
+
+#### 2、上传jar包并启动,请参考:[kafka-manager 安装手册](install_guide_cn.md)
+
+#### 3、修改nginx 配置
+
+```
+ location /xxxx {
+ # 静态文件存放位置
+ alias /root/templates;
+ try_files $uri $uri/ /xxxx/index.html;
+ index index.html;
+ }
+
+ location /api {
+ proxy_pass http://ip:port;
+ }
+ #后代端口建议使用/api,如果冲突可以使用以下配置
+ #location /api/v2 {
+ # proxy_pass http://ip:port;
+ #}
+ #location /api/v1 {
+ # proxy_pass http://ip:port;
+ #}
+```
+
+
+
+
+
+
+
diff --git a/docs/user_guide/faq.md b/docs/user_guide/faq.md
index 8ab9781a..c0c6b1a3 100644
--- a/docs/user_guide/faq.md
+++ b/docs/user_guide/faq.md
@@ -7,9 +7,9 @@
---
-# FAQ
+# FAQ
-- 0、Github图裂问题解决
+- 0、支持哪些Kafka版本?
- 1、Topic申请、新建监控告警等操作时没有可选择的集群?
- 2、逻辑集群 & Region的用途?
- 3、登录失败?
@@ -18,22 +18,16 @@
- 6、如何使用`MySQL 8`?
- 7、`Jmx`连接失败如何解决?
- 8、`topic biz data not exist`错误及处理方式
+- 9、进程启动后,如何查看API文档
+- 10、如何创建告警组?
+- 11、连接信息、耗时信息为什么没有数据?
+- 12、逻辑集群申请审批通过之后为什么看不到逻辑集群?
---
-### 0、Github图裂问题解决
+### 0、支持哪些Kafka版本?
-可以在本地机器`ping github.com`这个地址,获取到`github.com`地址的IP地址。
-
-然后将IP绑定到`/etc/hosts`文件中。
-
-例如
-
-```shell
-# 在 /etc/hosts文件中增加如下信息
-
-140.82.113.3 github.com
-```
+基本上只要所使用的Kafka还依赖于Zookeeper,那么该版本的主要功能基本上应该就是支持的。
---
@@ -43,7 +37,7 @@
逻辑集群的创建参看:
-- [kafka-manager 接入集群](docs/user_guide/add_cluster/add_cluster.md) 手册,这里的Region和逻辑集群都必须添加。
+- [kafka-manager 接入集群](add_cluster/add_cluster.md) 手册,这里的Region和逻辑集群都必须添加。
---
@@ -76,7 +70,7 @@
- 3、数据库时区问题。
-检查MySQL的topic表,查看是否有数据,如果有数据,那么再检查设置的时区是否正确。
+检查MySQL的topic_metrics表,查看是否有数据,如果有数据,那么再检查设置的时区是否正确。
---
@@ -109,3 +103,26 @@
可以在`运维管控->集群列表->Topic信息`下面,编辑申请权限的Topic,为Topic选择一个应用即可。
以上仅仅只是针对单个Topic的场景,如果你有非常多的Topic需要进行初始化的,那么此时可以在配置管理中增加一个配置,来定时的对无主的Topic进行同步,具体见:[动态配置管理 - 1、Topic定时同步任务](../dev_guide/dynamic_config_manager.md)
+
+---
+
+### 9、进程启动后,如何查看API文档
+
+- 滴滴Logi-KafkaManager采用Swagger-API工具记录API文档。Swagger-API地址: [http://IP:PORT/swagger-ui.html#/](http://IP:PORT/swagger-ui.html#/)
+
+
+### 10、如何创建告警组?
+
+这块需要配合监控系统进行使用,现在默认已经实现了夜莺的对接,当然也可以对接自己内部的监控系统,不过需要实现一些接口。
+
+具体的文档可见:[监控功能对接夜莺](../dev_guide/monitor_system_integrate_with_n9e.md)、[监控功能对接其他系统](../dev_guide/monitor_system_integrate_with_self.md)
+
+### 11、连接信息、耗时信息为什么没有数据?
+
+这块需要结合滴滴内部的kafka-gateway一同使用才会有数据,滴滴kafka-gateway暂未开源。
+
+### 12、逻辑集群申请审批通过之后为什么看不到逻辑集群?
+
+逻辑集群的申请与审批仅仅只是一个工单流程,并不会去实际创建逻辑集群,逻辑集群的创建还需要手动去创建。
+
+具体的操作可见:[kafka-manager 接入集群](add_cluster/add_cluster.md)。
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/AccountRoleEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/AccountRoleEnum.java
index 9c3cc06c..55412490 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/AccountRoleEnum.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/AccountRoleEnum.java
@@ -47,4 +47,13 @@ public enum AccountRoleEnum {
}
return AccountRoleEnum.UNKNOWN;
}
+
+ public static AccountRoleEnum getUserRoleEnum(String roleName) {
+ for (AccountRoleEnum elem: AccountRoleEnum.values()) {
+ if (elem.message.equalsIgnoreCase(roleName)) {
+ return elem;
+ }
+ }
+ return AccountRoleEnum.UNKNOWN;
+ }
}
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/SinkMonitorSystemEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/SinkMonitorSystemEnum.java
deleted file mode 100644
index b843a90c..00000000
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/SinkMonitorSystemEnum.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.xiaojukeji.kafka.manager.common.bizenum;
-
-/**
- * 是否上报监控系统
- * @author zengqiao
- * @date 20/9/25
- */
-public enum SinkMonitorSystemEnum {
- SINK_MONITOR_SYSTEM(0, "上报监控系统"),
- NOT_SINK_MONITOR_SYSTEM(1, "不上报监控系统"),
- ;
-
- private Integer code;
-
- private String message;
-
- SinkMonitorSystemEnum(Integer code, String message) {
- this.code = code;
- this.message = message;
- }
-
- public Integer getCode() {
- return code;
- }
-
- public void setCode(Integer code) {
- this.code = code;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
-
- @Override
- public String toString() {
- return "SinkMonitorSystemEnum{" +
- "code=" + code +
- ", message='" + message + '\'' +
- '}';
- }
-}
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java
new file mode 100644
index 00000000..bac44235
--- /dev/null
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java
@@ -0,0 +1,32 @@
+package com.xiaojukeji.kafka.manager.common.bizenum;
+
+/**
+ * 过期Topic状态
+ * @author zengqiao
+ * @date 21/01/25
+ */
+public enum TopicExpiredStatusEnum {
+ ALREADY_NOTIFIED_AND_DELETED(-2, "已通知, 已下线"),
+ ALREADY_NOTIFIED_AND_CAN_DELETE(-1, "已通知, 可下线"),
+ ALREADY_EXPIRED_AND_WAIT_NOTIFY(0, "已过期, 待通知"),
+ ALREADY_NOTIFIED_AND_WAIT_RESPONSE(1, "已通知, 待反馈"),
+
+ ;
+
+ private int status;
+
+ private String message;
+
+ TopicExpiredStatusEnum(int status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java
index 3690514f..b90918eb 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java
@@ -7,18 +7,18 @@ package com.xiaojukeji.kafka.manager.common.constant;
*/
public class ApiPrefix {
public static final String API_PREFIX = "/api/";
- public static final String API_V1_PREFIX = API_PREFIX + "v1/";
- public static final String API_V2_PREFIX = API_PREFIX + "v2/";
+ private static final String API_V1_PREFIX = API_PREFIX + "v1/";
+
+ // login
+ public static final String API_V1_SSO_PREFIX = API_V1_PREFIX + "sso/";
// console
- public static final String API_V1_SSO_PREFIX = API_V1_PREFIX + "sso/";
public static final String API_V1_NORMAL_PREFIX = API_V1_PREFIX + "normal/";
public static final String API_V1_RD_PREFIX = API_V1_PREFIX + "rd/";
public static final String API_V1_OP_PREFIX = API_V1_PREFIX + "op/";
// open
public static final String API_V1_THIRD_PART_PREFIX = API_V1_PREFIX + "third-part/";
- public static final String API_V2_THIRD_PART_PREFIX = API_V2_PREFIX + "third-part/";
// gateway
public static final String GATEWAY_API_V1_PREFIX = "/gateway" + API_V1_PREFIX;
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java
index 0fb38302..323e9ec9 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java
@@ -97,7 +97,7 @@ public class Result implements Serializable {
return result;
}
- public static Result buildFailure(String message) {
+ public static Result buildGatewayFailure(String message) {
Result result = new Result();
result.setCode(ResultStatus.GATEWAY_INVALID_REQUEST.getCode());
result.setMessage(message);
@@ -105,6 +105,14 @@ public class Result implements Serializable {
return result;
}
+ public static Result buildFailure(String message) {
+ Result result = new Result();
+ result.setCode(ResultStatus.FAIL.getCode());
+ result.setMessage(message);
+ result.setData(null);
+ return result;
+ }
+
public static Result buildFrom(ResultStatus resultStatus) {
Result result = new Result();
result.setCode(resultStatus.getCode());
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java
index 76e3aca8..0f8aebd6 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java
@@ -12,6 +12,8 @@ public enum ResultStatus {
SUCCESS(Constant.SUCCESS, "success"),
+ FAIL(1, "操作失败"),
+
/**
* 操作错误[1000, 2000)
* ------------------------------------------------------------------------------------------
@@ -23,6 +25,9 @@ public enum ResultStatus {
CHANGE_ZOOKEEPER_FORBIDDEN(1405, "change zookeeper forbidden"),
+ APP_OFFLINE_FORBIDDEN(1406, "先下线topic,才能下线应用~"),
+
+
TOPIC_OPERATION_PARAM_NULL_POINTER(1450, "参数错误"),
TOPIC_OPERATION_PARTITION_NUM_ILLEGAL(1451, "分区数错误"),
TOPIC_OPERATION_BROKER_NUM_NOT_ENOUGH(1452, "Broker数不足错误"),
@@ -91,6 +96,8 @@ public enum ResultStatus {
ZOOKEEPER_CONNECT_FAILED(8020, "zookeeper connect failed"),
ZOOKEEPER_READ_FAILED(8021, "zookeeper read failed"),
+ ZOOKEEPER_WRITE_FAILED(8022, "zookeeper write failed"),
+ ZOOKEEPER_DELETE_FAILED(8023, "zookeeper delete failed"),
// 调用集群任务里面的agent失败
CALL_CLUSTER_TASK_AGENT_FAILED(8030, " call cluster task agent failed"),
@@ -102,6 +109,7 @@ public enum ResultStatus {
STORAGE_UPLOAD_FILE_FAILED(8050, "upload file failed"),
STORAGE_FILE_TYPE_NOT_SUPPORT(8051, "File type not support"),
STORAGE_DOWNLOAD_FILE_FAILED(8052, "download file failed"),
+ LDAP_AUTHENTICATION_FAILED(8053, "ldap authentication failed"),
;
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/ClusterDetailDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/ClusterDetailDTO.java
index 937d9cf8..2e903485 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/ClusterDetailDTO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/ClusterDetailDTO.java
@@ -23,6 +23,8 @@ public class ClusterDetailDTO {
private String securityProperties;
+ private String jmxProperties;
+
private Integer status;
private Date gmtCreate;
@@ -103,6 +105,14 @@ public class ClusterDetailDTO {
this.securityProperties = securityProperties;
}
+ public String getJmxProperties() {
+ return jmxProperties;
+ }
+
+ public void setJmxProperties(String jmxProperties) {
+ this.jmxProperties = jmxProperties;
+ }
+
public Integer getStatus() {
return status;
}
@@ -176,8 +186,9 @@ public class ClusterDetailDTO {
", bootstrapServers='" + bootstrapServers + '\'' +
", kafkaVersion='" + kafkaVersion + '\'' +
", idc='" + idc + '\'' +
- ", mode='" + mode + '\'' +
+ ", mode=" + mode +
", securityProperties='" + securityProperties + '\'' +
+ ", jmxProperties='" + jmxProperties + '\'' +
", status=" + status +
", gmtCreate=" + gmtCreate +
", gmtModify=" + gmtModify +
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/SinkTopicRequestTimeMetricsConfig.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/SinkTopicRequestTimeMetricsConfig.java
deleted file mode 100644
index 91faaba1..00000000
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/config/SinkTopicRequestTimeMetricsConfig.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.xiaojukeji.kafka.manager.common.entity.ao.config;
-
-/**
- * @author zengqiao
- * @date 20/9/7
- */
-public class SinkTopicRequestTimeMetricsConfig {
- private Long clusterId;
-
- private String topicName;
-
- private Long startId;
-
- private Long step;
-
- public Long getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(Long clusterId) {
- this.clusterId = clusterId;
- }
-
- public String getTopicName() {
- return topicName;
- }
-
- public void setTopicName(String topicName) {
- this.topicName = topicName;
- }
-
- public Long getStartId() {
- return startId;
- }
-
- public void setStartId(Long startId) {
- this.startId = startId;
- }
-
- public Long getStep() {
- return step;
- }
-
- public void setStep(Long step) {
- this.step = step;
- }
-
- @Override
- public String toString() {
- return "SinkTopicRequestTimeMetricsConfig{" +
- "clusterId=" + clusterId +
- ", topicName='" + topicName + '\'' +
- ", startId=" + startId +
- ", step=" + step +
- '}';
- }
-}
\ No newline at end of file
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/ControllerPreferredCandidateDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/ControllerPreferredCandidateDTO.java
new file mode 100644
index 00000000..1b4c95b9
--- /dev/null
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/ControllerPreferredCandidateDTO.java
@@ -0,0 +1,45 @@
+package com.xiaojukeji.kafka.manager.common.entity.dto.op;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+import java.util.List;
+
+/**
+ * @author zengqiao
+ * @date 21/01/24
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@ApiModel(description="优选为Controller的候选者")
+public class ControllerPreferredCandidateDTO {
+ @ApiModelProperty(value="集群ID")
+ private Long clusterId;
+
+ @ApiModelProperty(value="优选为controller的BrokerId")
+ private List brokerIdList;
+
+ public Long getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(Long clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public List getBrokerIdList() {
+ return brokerIdList;
+ }
+
+ public void setBrokerIdList(List brokerIdList) {
+ this.brokerIdList = brokerIdList;
+ }
+
+ @Override
+ public String toString() {
+ return "ControllerPreferredCandidateDTO{" +
+ "clusterId=" + clusterId +
+ ", brokerIdList=" + brokerIdList +
+ '}';
+ }
+}
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/topic/TopicCreationDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/topic/TopicCreationDTO.java
index 66c26c5b..b92ef7c1 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/topic/TopicCreationDTO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/topic/TopicCreationDTO.java
@@ -40,6 +40,9 @@ public class TopicCreationDTO extends ClusterTopicDTO {
@ApiModelProperty(value = "Topic属性列表")
private Properties properties;
+ @ApiModelProperty(value = "最大写入字节数")
+ private Long peakBytesIn;
+
public String getAppId() {
return appId;
}
@@ -104,6 +107,14 @@ public class TopicCreationDTO extends ClusterTopicDTO {
this.properties = properties;
}
+ public Long getPeakBytesIn() {
+ return peakBytesIn;
+ }
+
+ public void setPeakBytesIn(Long peakBytesIn) {
+ this.peakBytesIn = peakBytesIn;
+ }
+
@Override
public String toString() {
return "TopicCreationDTO{" +
@@ -135,4 +146,4 @@ public class TopicCreationDTO extends ClusterTopicDTO {
}
return true;
}
-}
\ No newline at end of file
+}
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java
index 0b6fcebb..7afc09c6 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/ClusterDTO.java
@@ -102,12 +102,11 @@ public class ClusterDTO {
'}';
}
- public Boolean legal() {
+ public boolean legal() {
if (ValidateUtils.isNull(clusterName)
|| ValidateUtils.isNull(zookeeper)
|| ValidateUtils.isNull(idc)
- || ValidateUtils.isNull(bootstrapServers)
- ) {
+ || ValidateUtils.isNull(bootstrapServers)) {
return false;
}
return true;
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java
index 04ee265d..5ebebc75 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java
@@ -1,6 +1,7 @@
package com.xiaojukeji.kafka.manager.common.entity.pojo;
import java.util.Date;
+import java.util.Objects;
/**
* @author zengqiao
@@ -116,4 +117,22 @@ public class ClusterDO implements Comparable {
public int compareTo(ClusterDO clusterDO) {
return this.id.compareTo(clusterDO.id);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClusterDO clusterDO = (ClusterDO) o;
+ return Objects.equals(id, clusterDO.id)
+ && Objects.equals(clusterName, clusterDO.clusterName)
+ && Objects.equals(zookeeper, clusterDO.zookeeper)
+ && Objects.equals(bootstrapServers, clusterDO.bootstrapServers)
+ && Objects.equals(securityProperties, clusterDO.securityProperties)
+ && Objects.equals(jmxProperties, clusterDO.jmxProperties);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, clusterName, zookeeper, bootstrapServers, securityProperties, jmxProperties);
+ }
}
\ No newline at end of file
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/TopicDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/TopicDO.java
index b4b56712..ecb97e47 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/TopicDO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/TopicDO.java
@@ -1,6 +1,7 @@
package com.xiaojukeji.kafka.manager.common.entity.pojo;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicCreationDTO;
+import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import java.util.Date;
@@ -95,6 +96,7 @@ public class TopicDO {
topicDO.setClusterId(dto.getClusterId());
topicDO.setTopicName(dto.getTopicName());
topicDO.setDescription(dto.getDescription());
+ topicDO.setPeakBytesIn(ValidateUtils.isNull(dto.getPeakBytesIn()) ? -1L : dto.getPeakBytesIn());
return topicDO;
}
-}
\ No newline at end of file
+}
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/GatewayConfigDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/GatewayConfigDO.java
index c0e96000..fa29c7cf 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/GatewayConfigDO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/GatewayConfigDO.java
@@ -17,6 +17,8 @@ public class GatewayConfigDO {
private Long version;
+ private String description;
+
private Date createTime;
private Date modifyTime;
@@ -61,6 +63,14 @@ public class GatewayConfigDO {
this.version = version;
}
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
public Date getCreateTime() {
return createTime;
}
@@ -85,6 +95,7 @@ public class GatewayConfigDO {
", name='" + name + '\'' +
", value='" + value + '\'' +
", version=" + version +
+ ", description='" + description + '\'' +
", createTime=" + createTime +
", modifyTime=" + modifyTime +
'}';
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/op/expert/ExpiredTopicVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/op/expert/ExpiredTopicVO.java
index 46c7a3a2..c4921259 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/op/expert/ExpiredTopicVO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/op/expert/ExpiredTopicVO.java
@@ -28,7 +28,7 @@ public class ExpiredTopicVO {
@ApiModelProperty(value = "负责人")
private String principals;
- @ApiModelProperty(value = "状态, -1:可下线, 0:过期待通知, 1+:已通知待反馈")
+ @ApiModelProperty(value = "状态, -1:已通知可下线, 0:过期待通知, 1+:已通知待反馈")
private Integer status;
public Long getClusterId() {
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/GatewayConfigVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/GatewayConfigVO.java
index a0b402af..72314c31 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/GatewayConfigVO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/GatewayConfigVO.java
@@ -26,6 +26,9 @@ public class GatewayConfigVO {
@ApiModelProperty(value="版本")
private Long version;
+ @ApiModelProperty(value="描述说明")
+ private String description;
+
@ApiModelProperty(value="创建时间")
private Date createTime;
@@ -72,6 +75,14 @@ public class GatewayConfigVO {
this.version = version;
}
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
public Date getCreateTime() {
return createTime;
}
@@ -96,6 +107,7 @@ public class GatewayConfigVO {
", name='" + name + '\'' +
", value='" + value + '\'' +
", version=" + version +
+ ", description='" + description + '\'' +
", createTime=" + createTime +
", modifyTime=" + modifyTime +
'}';
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java
index 46d177ad..283d59c5 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java
@@ -60,6 +60,13 @@ public class JsonUtils {
return JSON.parseObject(src, clazz);
}
+ public static List stringToArrObj(String src, Class clazz) {
+ if (ValidateUtils.isBlank(src)) {
+ return null;
+ }
+ return JSON.parseArray(src, clazz);
+ }
+
public static List parseTopicConnections(Long clusterId, JSONObject jsonObject, long postTime) {
List connectionDOList = new ArrayList<>();
for (String clientType: jsonObject.keySet()) {
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java
index fc70c6b2..c7c69ca3 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java
@@ -79,7 +79,7 @@ public class JmxConnectorWrap {
try {
Map environment = new HashMap();
if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getPassword())) {
- environment.put(javax.management.remote.JMXConnector.CREDENTIALS, Arrays.asList(this.jmxConfig.getUsername(), this.jmxConfig.getPassword()));
+ environment.put(JMXConnector.CREDENTIALS, Arrays.asList(this.jmxConfig.getUsername(), this.jmxConfig.getPassword()));
}
if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) {
environment.put(Context.SECURITY_PROTOCOL, "ssl");
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java
index e0a5632a..6705f435 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java
@@ -33,7 +33,9 @@ public class ZkPathUtil {
private static final String D_METRICS_CONFIG_ROOT_NODE = CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + "KafkaExMetrics";
- public static final String D_CONTROLLER_CANDIDATES = CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + "extension/candidates";
+ public static final String D_CONFIG_EXTENSION_ROOT_NODE = CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + "extension";
+
+ public static final String D_CONTROLLER_CANDIDATES = D_CONFIG_EXTENSION_ROOT_NODE + ZOOKEEPER_SEPARATOR + "candidates";
public static String getBrokerIdNodePath(Integer brokerId) {
return BROKER_IDS_ROOT + ZOOKEEPER_SEPARATOR + String.valueOf(brokerId);
@@ -111,6 +113,10 @@ public class ZkPathUtil {
}
public static String getKafkaExtraMetricsPath(Integer brokerId) {
- return D_METRICS_CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + String.valueOf(brokerId);
+ return D_METRICS_CONFIG_ROOT_NODE + ZOOKEEPER_SEPARATOR + brokerId;
+ }
+
+ public static String getControllerCandidatePath(Integer brokerId) {
+ return D_CONTROLLER_CANDIDATES + ZOOKEEPER_SEPARATOR + brokerId;
}
}
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java
index 51c4b06b..3c179b4f 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java
@@ -1,8 +1,5 @@
package com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.List;
/**
@@ -18,12 +15,11 @@ import java.util.List;
* "host":null,
* "timestamp":"1546632983233",
* "port":-1,
- * "version":4
+ * "version":4,
+ * "rack": "CY"
* }
*/
public class BrokerMetadata implements Cloneable {
- private final static Logger LOGGER = LoggerFactory.getLogger(TopicMetadata.class);
-
private long clusterId;
private int brokerId;
@@ -43,6 +39,8 @@ public class BrokerMetadata implements Cloneable {
private long timestamp;
+ private String rack;
+
public long getClusterId() {
return clusterId;
}
@@ -107,14 +105,12 @@ public class BrokerMetadata implements Cloneable {
this.timestamp = timestamp;
}
- @Override
- public Object clone() {
- try {
- return super.clone();
- } catch (CloneNotSupportedException var3) {
- LOGGER.error("clone BrokerMetadata failed.", var3);
- }
- return null;
+ public String getRack() {
+ return rack;
+ }
+
+ public void setRack(String rack) {
+ this.rack = rack;
}
@Override
@@ -128,6 +124,7 @@ public class BrokerMetadata implements Cloneable {
", jmxPort=" + jmx_port +
", version='" + version + '\'' +
", timestamp=" + timestamp +
+ ", rack='" + rack + '\'' +
'}';
}
}
diff --git a/kafka-manager-console/package.json b/kafka-manager-console/package.json
index f06c4120..920fa613 100644
--- a/kafka-manager-console/package.json
+++ b/kafka-manager-console/package.json
@@ -1,6 +1,6 @@
{
- "name": "mobx-ts-example",
- "version": "1.0.0",
+ "name": "logi-kafka",
+ "version": "2.3.1",
"description": "",
"scripts": {
"start": "webpack-dev-server",
@@ -21,7 +21,7 @@
"@types/spark-md5": "^3.0.2",
"antd": "^3.26.15",
"clean-webpack-plugin": "^3.0.0",
- "clipboard": "^2.0.6",
+ "clipboard": "2.0.6",
"cross-env": "^7.0.2",
"css-loader": "^2.1.0",
"echarts": "^4.5.0",
@@ -56,4 +56,4 @@
"dependencies": {
"format-to-json": "^1.0.4"
}
-}
+}
\ No newline at end of file
diff --git a/kafka-manager-console/src/container/search-filter.tsx b/kafka-manager-console/src/container/search-filter.tsx
index f6ed09fa..6c199ab3 100644
--- a/kafka-manager-console/src/container/search-filter.tsx
+++ b/kafka-manager-console/src/container/search-filter.tsx
@@ -126,7 +126,7 @@ export class SearchAndFilterContainer extends React.Component
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java
index ce0753e4..921b13ba 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java
@@ -1,8 +1,8 @@
package com.xiaojukeji.kafka.manager.service.cache;
import com.alibaba.fastjson.JSONObject;
-import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
+import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
import kafka.admin.AdminClient;
import org.apache.commons.pool2.impl.GenericObjectPool;
@@ -103,6 +103,21 @@ public class KafkaClientPool {
}
}
+ public static void closeKafkaConsumerPool(Long clusterId) {
+ lock.lock();
+ try {
+ GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
+ if (objectPool == null) {
+ return;
+ }
+ objectPool.close();
+ } catch (Exception e) {
+ LOGGER.error("close kafka consumer pool failed, clusterId:{}.", clusterId, e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
if (ValidateUtils.isNull(clusterDO)) {
return null;
@@ -132,7 +147,11 @@ public class KafkaClientPool {
if (ValidateUtils.isNull(objectPool)) {
return;
}
- objectPool.returnObject(kafkaConsumer);
+ try {
+ objectPool.returnObject(kafkaConsumer);
+ } catch (Exception e) {
+ LOGGER.error("return kafka consumer client failed, clusterId:{}", physicalClusterId, e);
+ }
}
public static AdminClient getAdminClient(Long clusterId) {
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
index 59453919..631b254f 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
@@ -4,24 +4,23 @@ import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
-import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
-import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
-import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData;
-import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
-import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
-import com.xiaojukeji.kafka.manager.dao.ControllerDao;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
-import com.xiaojukeji.kafka.manager.dao.TopicDao;
-import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao;
-import com.xiaojukeji.kafka.manager.service.service.JmxService;
-import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
-import com.xiaojukeji.kafka.manager.service.zookeeper.*;
-import com.xiaojukeji.kafka.manager.service.service.ClusterService;
+import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil;
+import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData;
+import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
+import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
+import com.xiaojukeji.kafka.manager.dao.ControllerDao;
+import com.xiaojukeji.kafka.manager.service.service.ClusterService;
+import com.xiaojukeji.kafka.manager.service.service.JmxService;
+import com.xiaojukeji.kafka.manager.service.zookeeper.BrokerStateListener;
+import com.xiaojukeji.kafka.manager.service.zookeeper.ControllerStateListener;
+import com.xiaojukeji.kafka.manager.service.zookeeper.TopicStateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -49,15 +48,6 @@ public class PhysicalClusterMetadataManager {
@Autowired
private ClusterService clusterService;
- @Autowired
- private ConfigUtils configUtils;
-
- @Autowired
- private TopicDao topicDao;
-
- @Autowired
- private AuthorityDao authorityDao;
-
private final static Map CLUSTER_MAP = new ConcurrentHashMap<>();
private final static Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
@@ -133,7 +123,7 @@ public class PhysicalClusterMetadataManager {
zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener);
//增加Topic监控
- TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig, topicDao, authorityDao);
+ TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig);
topicListener.init();
zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener);
@@ -172,8 +162,12 @@ public class PhysicalClusterMetadataManager {
CLUSTER_MAP.remove(clusterId);
}
- public Set getClusterIdSet() {
- return CLUSTER_MAP.keySet();
+ public static Map getClusterMap() {
+ return CLUSTER_MAP;
+ }
+
+ public static void updateClusterMap(ClusterDO clusterDO) {
+ CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
}
public static ClusterDO getClusterFromCache(Long clusterId) {
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java
index b2c5f7b2..35c4be8d 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java
@@ -51,4 +51,20 @@ public interface ClusterService {
* @return void
*/
Result> getControllerPreferredCandidates(Long clusterId);
+
+ /**
+ * 增加优先被选举为controller的broker
+ * @param clusterId 集群ID
+ * @param brokerIdList brokerId列表
+ * @return
+ */
+ Result addControllerPreferredCandidates(Long clusterId, List brokerIdList);
+
+ /**
+ * 减少优先被选举为controller的broker
+ * @param clusterId 集群ID
+ * @param brokerIdList brokerId列表
+ * @return
+ */
+ Result deleteControllerPreferredCandidates(Long clusterId, List brokerIdList);
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/RegionService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/RegionService.java
index 8ab072fe..40c92a5c 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/RegionService.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/RegionService.java
@@ -1,7 +1,6 @@
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
-import com.xiaojukeji.kafka.manager.common.entity.dto.rd.RegionDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import java.util.List;
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java
index 5a1fc11b..cfa2920f 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java
@@ -22,6 +22,8 @@ import java.util.Map;
public interface TopicManagerService {
List listAll();
+ List getByClusterIdFromCache(Long clusterId);
+
List getByClusterId(Long clusterId);
TopicDO getByTopicName(Long clusterId, String topicName);
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ZookeeperService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ZookeeperService.java
index d24b2d24..d52d3bc7 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ZookeeperService.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ZookeeperService.java
@@ -26,4 +26,20 @@ public interface ZookeeperService {
* @return 操作结果
*/
Result> getControllerPreferredCandidates(Long clusterId);
+
+ /**
+ * 增加优先被选举为controller的broker
+ * @param clusterId 集群ID
+ * @param brokerId brokerId
+ * @return
+ */
+ Result addControllerPreferredCandidate(Long clusterId, Integer brokerId);
+
+ /**
+ * 减少优先被选举为controller的broker
+ * @param clusterId 集群ID
+ * @param brokerId brokerId
+ * @return
+ */
+ Result deleteControllerPreferredCandidate(Long clusterId, Integer brokerId);
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/AuthorityServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/AuthorityServiceImpl.java
index 4f804107..f5fad493 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/AuthorityServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/AuthorityServiceImpl.java
@@ -1,16 +1,17 @@
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
-import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.OperationStatusEnum;
+import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
+import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
+import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.KafkaAclDO;
-import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao;
-import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
-import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
+import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
+import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao;
import com.xiaojukeji.kafka.manager.dao.gateway.KafkaAclDao;
import com.xiaojukeji.kafka.manager.service.service.OperateRecordService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
@@ -20,10 +21,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.stream.Collectors;
/**
* @author zhongyuankai
@@ -120,7 +119,7 @@ public class AuthorityServiceImpl implements AuthorityService {
operateRecordDO.setModuleId(ModuleEnum.AUTHORITY.getCode());
operateRecordDO.setOperateId(OperateEnum.DELETE.getCode());
operateRecordDO.setResource(topicName);
- operateRecordDO.setContent(JSONObject.toJSONString(content));
+ operateRecordDO.setContent(JsonUtils.toJSONString(content));
operateRecordDO.setOperator(operator);
operateRecordService.insert(operateRecordDO);
} catch (Exception e) {
@@ -150,7 +149,7 @@ public class AuthorityServiceImpl implements AuthorityService {
} catch (Exception e) {
LOGGER.error("get authority failed, clusterId:{} topicName:{}.", clusterId, topicName, e);
}
- return null;
+ return Collections.emptyList();
}
@Override
@@ -164,7 +163,11 @@ public class AuthorityServiceImpl implements AuthorityService {
if (ValidateUtils.isEmptyList(doList)) {
return new ArrayList<>();
}
- return doList;
+
+ // 去除掉权限列表中无权限的数据
+ return doList.stream()
+ .filter(authorityDO -> !TopicAuthorityEnum.DENY.getCode().equals(authorityDO.getAccess()))
+ .collect(Collectors.toList());
}
@Override
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java
index fce7b605..18ee0a0d 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java
@@ -221,13 +221,24 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
if (ValidateUtils.isNull(oldGatewayConfigDO)) {
return Result.buildFrom(ResultStatus.RESOURCE_NOT_EXIST);
}
+
if (!oldGatewayConfigDO.getName().equals(newGatewayConfigDO.getName())
|| !oldGatewayConfigDO.getType().equals(newGatewayConfigDO.getType())
|| ValidateUtils.isBlank(newGatewayConfigDO.getValue())) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
- newGatewayConfigDO.setVersion(oldGatewayConfigDO.getVersion() + 1);
- if (gatewayConfigDao.updateById(oldGatewayConfigDO) > 0) {
+
+ // 获取当前同类配置, 插入之后需要增大这个version
+ List gatewayConfigDOList = gatewayConfigDao.getByConfigType(newGatewayConfigDO.getType());
+ Long version = 1L;
+ for (GatewayConfigDO elem: gatewayConfigDOList) {
+ if (elem.getVersion() > version) {
+ version = elem.getVersion() + 1L;
+ }
+ }
+
+ newGatewayConfigDO.setVersion(version);
+ if (gatewayConfigDao.updateById(newGatewayConfigDO) > 0) {
return Result.buildSuc();
}
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java
index b49e41a3..26d7ef4d 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java
@@ -340,10 +340,6 @@ public class AdminServiceImpl implements AdminService {
@Override
public ResultStatus modifyTopicConfig(ClusterDO clusterDO, String topicName, Properties properties, String operator) {
ResultStatus rs = TopicCommands.modifyTopicConfig(clusterDO, topicName, properties);
- if (!ResultStatus.SUCCESS.equals(rs)) {
- return rs;
- }
-
return rs;
}
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java
index 609c8cf9..b505bad0 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java
@@ -111,12 +111,13 @@ public class ClusterServiceImpl implements ClusterService {
// 不允许修改zk地址
return ResultStatus.CHANGE_ZOOKEEPER_FORBIDDEN;
}
- clusterDO.setStatus(originClusterDO.getStatus());
Map content = new HashMap<>();
content.put("cluster id", clusterDO.getId().toString());
content.put("security properties", clusterDO.getSecurityProperties());
content.put("jmx properties", clusterDO.getJmxProperties());
operateRecordService.insert(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.EDIT, content);
+
+ clusterDO.setStatus(originClusterDO.getStatus());
return updateById(clusterDO);
}
@@ -204,21 +205,31 @@ public class ClusterServiceImpl implements ClusterService {
}
private boolean isZookeeperLegal(String zookeeper) {
+ boolean status = false;
+
ZooKeeper zk = null;
try {
zk = new ZooKeeper(zookeeper, 1000, null);
- } catch (Throwable t) {
- return false;
+ for (int i = 0; i < 15; ++i) {
+ if (zk.getState().isConnected()) {
+ // 只有状态是connected的时候,才表示地址是合法的
+ status = true;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ LOGGER.error("class=ClusterServiceImpl||method=isZookeeperLegal||zookeeper={}||msg=zk address illegal||errMsg={}", zookeeper, e.getMessage());
} finally {
try {
if (zk != null) {
zk.close();
}
- } catch (Throwable t) {
- return false;
+ } catch (Exception e) {
+ LOGGER.error("class=ClusterServiceImpl||method=isZookeeperLegal||zookeeper={}||msg=close zk client failed||errMsg={}", zookeeper, e.getMessage());
}
}
- return true;
+ return status;
}
@Override
@@ -275,7 +286,7 @@ public class ClusterServiceImpl implements ClusterService {
try {
Map content = new HashMap<>();
content.put("cluster id", clusterId.toString());
- operateRecordService.insert(operator, ModuleEnum.CLUSTER, getClusterName(clusterId).getPhysicalClusterName(), OperateEnum.DELETE, content);
+ operateRecordService.insert(operator, ModuleEnum.CLUSTER, String.valueOf(clusterId), OperateEnum.DELETE, content);
if (clusterDao.deleteById(clusterId) <= 0) {
LOGGER.error("delete cluster failed, clusterId:{}.", clusterId);
return ResultStatus.MYSQL_ERROR;
@@ -289,8 +300,9 @@ public class ClusterServiceImpl implements ClusterService {
private ClusterDetailDTO getClusterDetailDTO(ClusterDO clusterDO, Boolean needDetail) {
if (ValidateUtils.isNull(clusterDO)) {
- return null;
+ return new ClusterDetailDTO();
}
+
ClusterDetailDTO dto = new ClusterDetailDTO();
dto.setClusterId(clusterDO.getId());
dto.setClusterName(clusterDO.getClusterName());
@@ -299,6 +311,7 @@ public class ClusterServiceImpl implements ClusterService {
dto.setKafkaVersion(physicalClusterMetadataManager.getKafkaVersionFromCache(clusterDO.getId()));
dto.setIdc(configUtils.getIdc());
dto.setSecurityProperties(clusterDO.getSecurityProperties());
+ dto.setJmxProperties(clusterDO.getJmxProperties());
dto.setStatus(clusterDO.getStatus());
dto.setGmtCreate(clusterDO.getGmtCreate());
dto.setGmtModify(clusterDO.getGmtModify());
@@ -337,4 +350,39 @@ public class ClusterServiceImpl implements ClusterService {
}
return Result.buildSuc(controllerPreferredCandidateList);
}
+
+ @Override
+ public Result addControllerPreferredCandidates(Long clusterId, List brokerIdList) {
+ if (ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(brokerIdList)) {
+ return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
+ }
+
+ // 增加的BrokerId需要判断是否存活
+ for (Integer brokerId: brokerIdList) {
+ if (!PhysicalClusterMetadataManager.isBrokerAlive(clusterId, brokerId)) {
+ return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST);
+ }
+
+ Result result = zookeeperService.addControllerPreferredCandidate(clusterId, brokerId);
+ if (result.failed()) {
+ return result;
+ }
+ }
+ return Result.buildSuc();
+ }
+
+ @Override
+ public Result deleteControllerPreferredCandidates(Long clusterId, List brokerIdList) {
+ if (ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(brokerIdList)) {
+ return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
+ }
+
+ for (Integer brokerId: brokerIdList) {
+ Result result = zookeeperService.deleteControllerPreferredCandidate(clusterId, brokerId);
+ if (result.failed()) {
+ return result;
+ }
+ }
+ return Result.buildSuc();
+ }
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java
index e228d36c..913316ef 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java
@@ -8,7 +8,6 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumeDetailDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroup;
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupSummary;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
-import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
import com.xiaojukeji.kafka.manager.common.exception.ConfigException;
@@ -129,7 +128,7 @@ public class ConsumerServiceImpl implements ConsumerService {
}
summary.setState(consumerGroupSummary.state());
- java.util.Iterator> it = JavaConversions.asJavaIterator(consumerGroupSummary.consumers().iterator());
+ Iterator> it = JavaConversions.asJavaIterator(consumerGroupSummary.consumers().iterator());
while (it.hasNext()) {
List consumerSummaryList = JavaConversions.asJavaList(it.next());
for (AdminClient.ConsumerSummary consumerSummary: consumerSummaryList) {
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java
index 6ee9a499..1d761eb8 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java
@@ -95,6 +95,14 @@ public class TopicManagerServiceImpl implements TopicManagerService {
return new ArrayList<>();
}
+ @Override
+ public List getByClusterIdFromCache(Long clusterId) {
+ if (clusterId == null) {
+ return new ArrayList<>();
+ }
+ return topicDao.getByClusterIdFromCache(clusterId);
+ }
+
@Override
public List getByClusterId(Long clusterId) {
if (clusterId == null) {
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
index 5dea0561..63191888 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
@@ -381,7 +381,7 @@ public class TopicServiceImpl implements TopicService {
return new ArrayList<>();
}
- List topicDOList = topicManagerService.getByClusterId(clusterId);
+ List topicDOList = topicManagerService.getByClusterIdFromCache(clusterId);
if (ValidateUtils.isNull(topicDOList)) {
topicDOList = new ArrayList<>();
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java
index aa31ed33..c4c89513 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ZookeeperServiceImpl.java
@@ -70,4 +70,58 @@ public class ZookeeperServiceImpl implements ZookeeperService {
}
return Result.buildFrom(ResultStatus.ZOOKEEPER_READ_FAILED);
}
+
+ @Override
+ public Result addControllerPreferredCandidate(Long clusterId, Integer brokerId) {
+ if (ValidateUtils.isNull(clusterId)) {
+ return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
+ }
+ ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterId);
+ if (ValidateUtils.isNull(zkConfig)) {
+ return Result.buildFrom(ResultStatus.ZOOKEEPER_CONNECT_FAILED);
+ }
+
+ try {
+ if (zkConfig.checkPathExists(ZkPathUtil.getControllerCandidatePath(brokerId))) {
+ // 节点已经存在, 则直接忽略
+ return Result.buildSuc();
+ }
+
+ if (!zkConfig.checkPathExists(ZkPathUtil.D_CONFIG_EXTENSION_ROOT_NODE)) {
+ zkConfig.setOrCreatePersistentNodeStat(ZkPathUtil.D_CONFIG_EXTENSION_ROOT_NODE, "");
+ }
+
+ if (!zkConfig.checkPathExists(ZkPathUtil.D_CONTROLLER_CANDIDATES)) {
+ zkConfig.setOrCreatePersistentNodeStat(ZkPathUtil.D_CONTROLLER_CANDIDATES, "");
+ }
+
+ zkConfig.setOrCreatePersistentNodeStat(ZkPathUtil.getControllerCandidatePath(brokerId), "");
+ return Result.buildSuc();
+ } catch (Exception e) {
+ LOGGER.error("class=ZookeeperServiceImpl||method=addControllerPreferredCandidate||clusterId={}||brokerId={}||errMsg={}||", clusterId, brokerId, e.getMessage());
+ }
+ return Result.buildFrom(ResultStatus.ZOOKEEPER_WRITE_FAILED);
+ }
+
+ @Override
+ public Result deleteControllerPreferredCandidate(Long clusterId, Integer brokerId) {
+ if (ValidateUtils.isNull(clusterId)) {
+ return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
+ }
+ ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterId);
+ if (ValidateUtils.isNull(zkConfig)) {
+ return Result.buildFrom(ResultStatus.ZOOKEEPER_CONNECT_FAILED);
+ }
+
+ try {
+ if (!zkConfig.checkPathExists(ZkPathUtil.getControllerCandidatePath(brokerId))) {
+ return Result.buildSuc();
+ }
+ zkConfig.delete(ZkPathUtil.getControllerCandidatePath(brokerId));
+ return Result.buildSuc();
+ } catch (Exception e) {
+ LOGGER.error("class=ZookeeperServiceImpl||method=deleteControllerPreferredCandidate||clusterId={}||brokerId={}||errMsg={}||", clusterId, brokerId, e.getMessage());
+ }
+ return Result.buildFrom(ResultStatus.ZOOKEEPER_DELETE_FAILED);
+ }
}
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java
index 58e5d98b..6995eb97 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommands.java
@@ -44,7 +44,7 @@ public class TopicCommands {
);
// 生成分配策略
- scala.collection.Map