diff --git a/README.md b/README.md index db8a1b52..e69e1688 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ 滴滴Logi-KafkaManager脱胎于滴滴内部多年的Kafka运营实践经验,是面向Kafka用户、Kafka运维人员打造的共享多租户Kafka云平台。专注于Kafka运维管控、监控告警、资源治理等核心场景,经历过大规模集群、海量大数据的考验。内部满意度高达90%的同时,还与多家知名企业达成商业化合作。 ### 1.1 快速体验地址 -- 体验地址 http://117.51.146.109:8080 账号密码 admin/admin + +- 体验地址 http://117.51.150.133:8080 账号密码 admin/admin ### 1.2 体验地图 相比较于同类产品的用户视角单一(大多为管理员视角),滴滴Logi-KafkaManager建立了基于分角色、多场景视角的体验地图。分别是:**用户体验地图、运维体验地图、运营体验地图** diff --git a/Releases_Notes.md b/Releases_Notes.md index 46b5753e..50a31c3b 100644 --- a/Releases_Notes.md +++ b/Releases_Notes.md @@ -7,6 +7,50 @@ --- +## v2.4.1+ + +版本上线时间:2021-05-21 + +### 能力提升 +- 增加直接增加权限和配额的接口(v2.4.1) +- 增加接口调用可绕过登录的功能(v2.4.1) + +### 体验优化 +- tomcat 版本提升至8.5.66(v2.4.2) +- op接口优化,拆分util接口为topic、leader两类接口(v2.4.1) +- 简化Gateway配置的Key长度(v2.4.1) + +### bug修复 +- 修复页面展示版本错误问题(v2.4.2) + + +## v2.4.0 + +版本上线时间:2021-05-18 + + +### 能力提升 + +- 增加App与Topic自动化审批开关 +- Broker元信息中增加Rack信息 +- 升级MySQL 驱动,支持MySQL 8+ +- 增加操作记录查询界面 + +### 体验优化 + +- FAQ告警组说明优化 +- 用户手册共享及 独享集群概念优化 +- 用户管理界面,前端限制用户删除自己 + +### bug修复 + +- 修复op-util类中创建Topic失败的接口 +- 周期同步Topic到DB的任务修复,将Topic列表查询从缓存调整为直接查DB +- 应用下线审批失败的功能修复,将权限为0(无权限)的数据进行过滤 +- 修复登录及权限绕过的漏洞 +- 修复研发角色展示接入集群、暂停监控等按钮的问题 + + ## v2.3.0 版本上线时间:2021-02-08 diff --git a/build.sh b/build.sh index 9c107907..b07c6623 100644 --- a/build.sh +++ b/build.sh @@ -4,7 +4,7 @@ cd $workspace ## constant OUTPUT_DIR=./output -KM_VERSION=2.4.0 +KM_VERSION=2.4.2 APP_NAME=kafka-manager APP_DIR=${APP_NAME}-${KM_VERSION} diff --git a/docs/dev_guide/dynamic_config_manager.md b/docs/dev_guide/dynamic_config_manager.md index 0965cb48..9e05839c 100644 --- a/docs/dev_guide/dynamic_config_manager.md +++ b/docs/dev_guide/dynamic_config_manager.md @@ -14,6 +14,8 @@ - 1、Topic定时同步任务 - 2、专家服务——Topic分区热点 - 3、专家服务——Topic分区不足 +- 4、专家服务——Topic资源治理 +- 5、账单配置 ## 1、Topic定时同步任务 @@ -119,4 +121,48 @@ TOPIC_INSUFFICIENT_PARTITION_CONFIG 50 ] } -``` \ No newline at end of file +``` +## 4、专家服务——Topic资源治理 + +首先,我们认为在一定的时间长度内,Topic的分区offset没有任何变化的Topic,即没有数据写入的Topic,为过期的Topic。 + +Topic分区不足相关的动态配置(页面在运维管控->平台管理->配置管理): + +配置Key: +``` +EXPIRED_TOPIC_CONFIG +``` + +配置Value: +```json +{ + "minExpiredDay": 30, #过期时间大于此值才显示 + "ignoreClusterIdList": [ # 忽略的集群 + 50 + ] +} +``` + +## 5、账单配置 + +Logi-KafkaManager除了作为Kafka运维管控平台之外,实际上还会有一些资源定价相关的功能。 + +当前定价方式:当月Topic的maxAvgDay天的峰值的均值流量作为Topic的使用额度。使用的额度 * 单价 * 溢价(预留buffer) 就等于当月的费用。 +详细的计算逻辑见:com.xiaojukeji.kafka.manager.task.dispatch.biz.CalKafkaTopicBill; 和 com.xiaojukeji.kafka.manager.task.dispatch.biz.CalTopicStatistics; + +这块在计算Topic的费用的配置如下所示: + +配置Key: +``` +KAFKA_TOPIC_BILL_CONFIG +``` + +配置Value: + +```json +{ + "maxAvgDay": 10, # 使用额度的计算规则 + "quotaRatio": 1.5, # 溢价率 + "priseUnitMB": 100 # 单价,即单MB/s流量多少钱 +} +``` diff --git a/docs/install_guide/create_mysql_table.sql b/docs/install_guide/create_mysql_table.sql index 065532eb..12910ae1 100644 --- a/docs/install_guide/create_mysql_table.sql +++ b/docs/install_guide/create_mysql_table.sql @@ -210,11 +210,11 @@ CREATE TABLE `gateway_config` ( PRIMARY KEY (`id`), UNIQUE KEY `uniq_type_name` (`type`,`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='gateway配置'; -INSERT INTO gateway_config(type, name, value, `version`) values('SERVICE_DISCOVERY_QUEUE_SIZE', 'SERVICE_DISCOVERY_QUEUE_SIZE', 100000000, 1); -INSERT INTO gateway_config(type, name, value, `version`) values('SERVICE_DISCOVERY_APPID_RATE', 'SERVICE_DISCOVERY_APPID_RATE', 100000000, 1); -INSERT INTO gateway_config(type, name, value, `version`) values('SERVICE_DISCOVERY_IP_RATE', 'SERVICE_DISCOVERY_IP_RATE', 100000000, 1); -INSERT INTO gateway_config(type, name, value, `version`) values('SERVICE_DISCOVERY_SP_RATE', 'app_01234567', 100000000, 1); -INSERT INTO gateway_config(type, name, value, `version`) values('SERVICE_DISCOVERY_SP_RATE', '192.168.0.1', 100000000, 1); +INSERT INTO gateway_config(type, name, value, `version`, `description`) values('SD_QUEUE_SIZE', 'SD_QUEUE_SIZE', 100000000, 1, '任意集群队列大小'); +INSERT INTO gateway_config(type, name, value, `version`, `description`) values('SD_APP_RATE', 'SD_APP_RATE', 100000000, 1, '任意一个App限速'); +INSERT INTO gateway_config(type, name, value, `version`, `description`) values('SD_IP_RATE', 'SD_IP_RATE', 100000000, 1, '任意一个IP限速'); +INSERT INTO gateway_config(type, name, value, `version`, `description`) values('SD_SP_RATE', 'app_01234567', 100000000, 1, '指定App限速'); +INSERT INTO gateway_config(type, name, value, `version`, `description`) values('SD_SP_RATE', '192.168.0.1', 100000000, 1, '指定IP限速'); -- -- Table structure for table `heartbeat` diff --git a/docs/user_guide/call_api_bypass_login.md b/docs/user_guide/call_api_bypass_login.md new file mode 100644 index 00000000..7a2feac8 --- /dev/null +++ b/docs/user_guide/call_api_bypass_login.md @@ -0,0 +1,49 @@ + +--- + +![kafka-manager-logo](../assets/images/common/logo_name.png) + +**一站式`Apache Kafka`集群指标监控与运维管控平台** + +--- + +# 登录绕过 + +## 背景 + +现在除了开放出来的第三方接口,其他接口都需要走登录认证。 + +但是第三方接口不多,开放出来的能力有限,但是登录的接口又需要登录,非常的麻烦。 + +因此,新增了一个登录绕过的功能,为一些紧急临时的需求,提供一个调用不需要登录的能力。 + +## 使用方式 + +步骤一:接口调用时,在header中,增加如下信息: +```shell +# 表示开启登录绕过 +Trick-Login-Switch : on + +# 登录绕过的用户, 这里可以是admin, 或者是其他的, 但是必须在运维管控->平台管理->用户管理中设置了该用户。 +Trick-Login-User : admin +``` + +  + +步骤二:在运维管控->平台管理->平台配置上,设置允许了该用户以绕过的方式登录 +```shell +# 设置的key,必须是这个 +SECURITY.TRICK_USERS + +# 设置的value,是json数组的格式,例如 +[ "admin", "logi"] +``` + +  + +步骤三:解释说明 + +设置完成上面两步之后,就可以直接调用需要登录的接口了。 + +但是还有一点需要注意,绕过的用户仅能调用他有权限的接口,比如一个普通用户,那么他就只能调用普通的接口,不能去调用运维人员的接口。 + diff --git a/docs/user_guide/faq.md b/docs/user_guide/faq.md index 5b0b103d..7d65b470 100644 --- a/docs/user_guide/faq.md +++ b/docs/user_guide/faq.md @@ -23,6 +23,14 @@ - 11、连接信息、耗时信息、磁盘信息为什么没有数据? - 12、逻辑集群申请审批通过之后为什么看不到逻辑集群? - 13、heartbeat表关联业务和使用场景是什么? +- 14、集群的删除,是否会真正的删除集群? +- 15、APP(应用)如何被使用起来? +- 16、为什么下线应用提示operation forbidden? +- 17、删除Topic成功,为什么过一会儿之后又出现了? +- 18、如何在不登录的情况下,调用一些需要登录的接口? +- 19、为什么无法看到连接信息、耗时信息等指标? +- 20、AppID鉴权、生产消费配额不起作用 + --- ### 0、支持哪些Kafka版本? @@ -129,3 +137,79 @@ 逻辑集群的申请与审批仅仅只是一个工单流程,并不会去实际创建逻辑集群,逻辑集群的创建还需要手动去创建。 具体的操作可见:[kafka-manager 接入集群](add_cluster/add_cluster.md)。 + + +### 13、heartbeat表关联业务和使用场景是什么? + +做任务抢占用的。 + +KM支持HA的方式部署,那么部署多台的时候,就会出现每一台都可能去做指标收集的事情,这块就使用heartbeat表做KM的存活性判断,然后进行任务的抢占或者是均衡。 + +更多详细的内容,可以看一下源码中,heartbeat表在哪里被使用了。 + + +### 14、集群的删除,是否会真正的删除集群? + +Logi-KM的运维管控,集群列表中的集群删除,仅仅只是将该集群从Logi-KM中进行删除,并不会对真正的物理集群做什么操作。 + + +### 15、APP(应用)如何被使用起来? + +app在Logi-KM中可以近似理解为租户,或者是kafka里面的一个账号的概念。 + +界面中显示的app信息、权限信息等,在平台层面仅仅只是控制Topic或集群在平台上的可见性,如果使用的是社区版本的Kafka,那么实际上是不能真正的管控到客户端对Topic的生产和消费。 + +但是如果是使用的滴滴的Kafka-Gateway,那么是可以做到对客户端的生产和消费的权限管控。滴滴的Kafka-Gateway暂未开源,属于企业服务,具体的可以入群交流,群地址在README中。 + + +### 16、为什么下线应用提示operation forbidden? + +**原因一:** + +该应用还存在对Topic的权限,因此导致下线失败。具体查看的位置在"Topic管理-》应用管理-》详情",可以看到应用对哪些Topic还有权限。 + +只有当权限全部去除之后,才可以下线应用。 + +**原因二:** + +使用的是2.4.0之前的旧版本,旧版本存在缓存更新的BUG,建议升级至最新的版本,或者简单粗暴的就是重启一下KM。 + + +### 17、删除Topic成功,为什么过一会儿之后又出现了? + +**原因说明:** + +Logi-KM会去请求Topic的endoffset信息,要获取这个信息就需要发送metadata请求,发送metadata请求的时候,如果集群允许自动创建Topic,那么当Topic不存在时,就会自动将该Topic创建出来。 + + +**问题解决:** + +因为在Logi-KM上,禁止Kafka客户端内部元信息获取这个动作非常的难做到,因此短时间内这个问题不好从Logi-KM上解决。 + +当然,对于不存在的Topic,Logi-KM是不会进行元信息请求的,因此也不用担心会莫名其妙的创建一个Topic出来。 + +但是,另外一点,对于开启允许Topic自动创建的集群,建议是关闭该功能,开启是非常危险的,如果关闭之后,Logi-KM也不会有这个问题。 + +最后这里举个开启这个配置后,非常危险的代码例子吧: + +```java +for (int i= 0; i < 100000; ++i) { + // 如果是客户端类似这样写的,那么一启动,那么将创建10万个Topic出来,集群元信息瞬间爆炸,controller可能就不可服务了。 + producer.send(new ProducerRecord("logi_km" + i,"hello logi_km")); +} +``` + +### 18、如何在不登录的情况下,调用一些需要登录的接口? + +具体见:[登录绕过](./call_api_bypass_login.md) + +### 19、为什么无法看到连接信息、耗时信息等指标? +连接信息、耗时信息等指标依赖于滴滴kafka-gateway和滴滴Kafka引擎,通过gateway可获取到连接到该Topic的应用情况,提高对Topic的管控能力。通过滴滴Kafka引擎的自带埋点,可获取到耗时信息,提升Topic生产消费时的可观测性。这部分内容是属于商业版的范畴,暂未开源。如有需要,可进行商业合作。 + +具体见:[滴滴Logi-KafkaManager开源版和商业版特性对比](../开源版与商业版特性对比.md) + +### 20、AppID鉴权、生产消费配额不起作用? +AppID鉴权、生产消费配额依赖于滴滴kafka-gateway,通过gateway进行身份鉴权和生产消费限流,可避免用户无限制的使用集群的流量,流量大的用户会耗尽系统资源从而影响其他用户的使用,造成集群的节点故障。这部分内容是属于商业版的范畴,暂未开源。如有需要,可进行商业合作。 + +具体见:[滴滴Logi-KafkaManager开源版和商业版特性对比](../开源版与商业版特性对比.md) + diff --git a/docs/user_guide/alarm_rules.md b/docs/user_guide/monitor_desc.md similarity index 75% rename from docs/user_guide/alarm_rules.md rename to docs/user_guide/monitor_desc.md index 57cba628..abd06209 100644 --- a/docs/user_guide/alarm_rules.md +++ b/docs/user_guide/monitor_desc.md @@ -4,11 +4,16 @@ --- +## 报警策略-监控指标说明 + +| 指标 | 含义 |备注 | +| --- | --- | --- | +| online-kafka-consumer-lag | 消费时,按照分区的维度进行监控lag数 | lag表示有多少数据没有被消费,因为按照分区的维度监控,所以告警时一般会有分区信息 | +| online-kafka-consumer-maxLag | 消费时,按照整个Topic的维度,监控Topic所有的分区里面的那个最大的lag | 比如每个分区的lag分别是3、5、7,那么maxLag的值就是max(3,5,7)=7 | +| online-kafka-consumer-maxDelayTime | 消费时,按照Topic维度监控预计的消费延迟 | 这块是按照lag和messagesIn之间的关系计算出来的,可能会有误差 | ## 报警策略-报警函数介绍 - - | 类别 | 函数 | 含义 |函数文案 |备注 | | --- | --- | --- | --- | --- | | 发生次数 |all,n | 最近$n个周期内,全发生 | 连续发生(all) | | diff --git a/docs/开源版与商业版特性对比.md b/docs/开源版与商业版特性对比.md new file mode 100644 index 00000000..e9d9dbd4 --- /dev/null +++ b/docs/开源版与商业版特性对比.md @@ -0,0 +1,55 @@ + +--- + +![kafka-manager-logo](assets/images/common/logo_name.png) + +**一站式`Apache Kafka`集群指标监控与运维管控平台** + +--- + +**开源版、商业版对比** + +纲要:Logi-KafakManager的商业特性是强依赖于滴滴Kafka Gateway和滴滴kafka引擎。 +滴滴KafkaGateway主要负责:服务发现、安全管控(身份鉴权、生产消费鉴权等)、流量管控(应用配额等)等; +滴滴Kafka引擎主要负责:更丰富的监控指标(broker实时耗时、压缩指标、分区落盘等)、磁盘过载保护等 +备注:两个版本的产品功能页面是一样的。区别在于开源版未使用滴滴KafkaGateway(滴滴Kafka引擎),部分产品功能/功能不起作用或者页面无数据 + + + + +| 模块 |对比指标 |底层依赖 |开源版 |商业版 |备注 | +| --- | --- | --- | --- | --- | --- | +| 服务发现 | bootstrap地址变更对客户端无影响 | | | 是| | +| 安全管控 | 身份鉴权(appID+password) | | | 是 | | +| | 权限鉴权(Topic+appID) | | | 是 | | +| 指标监控 | Topic实时流量、历史流量 | | 是 | 是 | | +| | Broker实时耗时、历史耗时 | 引擎 | | 是 | | +| | 分区落盘 | 引擎 | | 是 | | +| | Topic里的数据压缩格式 | 引擎 | | 是 | | +| | 连接信息(Topic上有哪些应用连接) | Gateway | | 是| | +| | 流量管控(应用配额、生产消费限流等) | Gateway | | 是 | | +| 监控报警 | | | 是 | 是 | 监控指标上报,需对接外部监控系统(夜莺or企业内部监控系统) | +| Topic运维 | 申请分区 | | 是 | 是 | | +| | 调整配额 | Gateway | | | 是 | +| | Topic数据采样 | | 是 | 是 | | +| | 消费组管理(重置消费偏移等) | | 是 | 是 | | +| 集群管理 | 集群接入(部署) | | 是 | 是 | 需手动部署集群,或借助外部的自动化部署系统(夜莺)来部署系统 | +| | 集群指标监控 | | 是 | 是 | | +| | 按照Region、逻辑集群进行管理 | | 是 | 是 | | +| | Topic迁移 | | 是 | 是| | +| | 集群任务(集群版本管理、升级、扩缩容、回滚等) | | 是 | 是 | 需借助夜莺或自动化部署系统来实现 | +| | 磁盘过载保护 | 引擎 | | 是 | | +| | 指定broker作为优选controller | Gateway | | 是 | | +| Gateway管理 | 管理 Gateway的配置文件 | Gateway | | 是 | | +| 资源治理 | 专家服务(Topic分区热点、Topic分区不足、Topic长期未使用、Topic流量异常) | | 是 | 是 | 开源版:具备问题发现与基础的问题解决能力;商业版:可在此基础上,融入滴滴内部的资源治理经验,提供更加专家化的问题解决方法 | +| | 健康分 | | 是 | 是 | 开源版:具备基础的健康分算法;商业版:可融入更多的指标统计,及定制化的健康分算法 | +| 运营管理 | 资源审批(应用申请、Topic申请、分区申请、配额申请、集群申请等,都需要通过工单进行审批) | |是 | 是 | | +| | 账单体系(根据流量核算Topic、集群费用) | | 是 | 是| | + + +**总结** + +Logi-KafkaManager的商业特性体现在在滴滴Kafka Gateway、滴滴Kafka引擎、内部沉淀出的资源治理专家经验、可定制化的健康分算法。 +从场景来看,滴滴Logi-KafkaManager的开源版本在kafka集群运维、的Topic管理、监控告警、资源治理等kafka核心场景都充分开源用户的使用需求并且有着出色的表现。而商业版相较于开源版在安全管控、流量管控、更丰富的指标监控、资源治理专家经验的具有明显提升,更加符合企业业务需求。 +除此之外,商业版还可根据企业实际需求对平台源码进行定制化改造,并提供运维保障,稳定性保障,运营保障等服务。 + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ApiLevelEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ApiLevelEnum.java deleted file mode 100644 index 73be0d16..00000000 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ApiLevelEnum.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.xiaojukeji.kafka.manager.common.bizenum; - -/** - * @author zengqiao - * @date 20/7/27 - */ -public enum ApiLevelEnum { - LEVEL_0(0), - LEVEL_1(1), - LEVEL_2(2), - LEVEL_3(3) - ; - - private int level; - - ApiLevelEnum(int level) { - this.level = level; - } -} \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ConsumeHealthEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ConsumeHealthEnum.java index 74d2d8ab..f5cda2ed 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ConsumeHealthEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ConsumeHealthEnum.java @@ -24,18 +24,10 @@ public enum ConsumeHealthEnum { return code; } - public void setCode(Integer code) { - this.code = code; - } - public String getMessage() { return message; } - public void setMessage(String message) { - this.message = message; - } - @Override public String toString() { return "ConsumeHealthEnum{" + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/DBStatusEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/DBStatusEnum.java index 4f6fb1cf..89518f83 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/DBStatusEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/DBStatusEnum.java @@ -19,7 +19,10 @@ public enum DBStatusEnum { return status; } - public void setStatus(int status) { - this.status = status; + @Override + public String toString() { + return "DBStatusEnum{" + + "status=" + status + + '}'; } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/IDCEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/IDCEnum.java index 73569d56..2b3cad7c 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/IDCEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/IDCEnum.java @@ -21,18 +21,10 @@ public enum IDCEnum { return idc; } - public void setIdc(String idc) { - this.idc = idc; - } - public String getName() { return name; } - public void setName(String name) { - this.name = name; - } - @Override public String toString() { return "IDCEnum{" + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/KafkaBrokerRoleEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/KafkaBrokerRoleEnum.java index befd5257..246b4b5e 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/KafkaBrokerRoleEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/KafkaBrokerRoleEnum.java @@ -21,10 +21,6 @@ public enum KafkaBrokerRoleEnum { return role; } - public void setRole(String role) { - this.role = role; - } - @Override public String toString() { return "KafkaBrokerRoleEnum{" + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/KafkaClientEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/KafkaClientEnum.java index 6e5bea6f..0b35277e 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/KafkaClientEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/KafkaClientEnum.java @@ -24,18 +24,10 @@ public enum KafkaClientEnum { return code; } - public void setCode(Integer code) { - this.code = code; - } - public String getName() { return name; } - public void setName(String name) { - this.name = name; - } - @Override public String toString() { return "KafkaClientEnum{" + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OffsetResetTypeEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OffsetResetTypeEnum.java index b69a8a25..170946e8 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OffsetResetTypeEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OffsetResetTypeEnum.java @@ -18,4 +18,11 @@ public enum OffsetResetTypeEnum { public Integer getCode() { return code; } + + @Override + public String toString() { + return "OffsetResetTypeEnum{" + + "code=" + code + + '}'; + } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperationStatusEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperationStatusEnum.java index cf8f53d2..b88135a5 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperationStatusEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperationStatusEnum.java @@ -27,4 +27,12 @@ public enum OperationStatusEnum { public String getMessage() { return message; } + + @Override + public String toString() { + return "OperationStatusEnum{" + + "code=" + code + + ", message='" + message + '\'' + + '}'; + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/PeakFlowStatusEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/PeakFlowStatusEnum.java index 9b71f038..f39ac91a 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/PeakFlowStatusEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/PeakFlowStatusEnum.java @@ -15,9 +15,9 @@ public enum PeakFlowStatusEnum { ; - public Integer code; + private Integer code; - public String message; + private String message; PeakFlowStatusEnum(Integer code, String message) { this.code = code; @@ -28,18 +28,10 @@ public enum PeakFlowStatusEnum { return code; } - public void setCode(Integer code) { - this.code = code; - } - public String getMessage() { return message; } - public void setMessage(String message) { - this.message = message; - } - @Override public String toString() { return "PeakFlowStatusEnum{" + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/RebalanceDimensionEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/RebalanceDimensionEnum.java index e196e8c2..c5259461 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/RebalanceDimensionEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/RebalanceDimensionEnum.java @@ -29,4 +29,12 @@ public enum RebalanceDimensionEnum { public String getMessage() { return message; } + + @Override + public String toString() { + return "RebalanceDimensionEnum{" + + "code=" + code + + ", message='" + message + '\'' + + '}'; + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TaskStatusEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TaskStatusEnum.java index ebf3dc82..a478eafe 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TaskStatusEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TaskStatusEnum.java @@ -43,18 +43,10 @@ public enum TaskStatusEnum { return code; } - public void setCode(Integer code) { - this.code = code; - } - public String getMessage() { return message; } - public void setMessage(String message) { - this.message = message; - } - @Override public String toString() { return "TaskStatusEnum{" + @@ -64,9 +56,6 @@ public enum TaskStatusEnum { } public static Boolean isFinished(Integer code) { - if (code >= FINISHED.getCode()) { - return true; - } - return false; + return code >= FINISHED.getCode(); } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TaskStatusReassignEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TaskStatusReassignEnum.java index 7fd6ef8f..fc8adcc1 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TaskStatusReassignEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TaskStatusReassignEnum.java @@ -45,11 +45,6 @@ public enum TaskStatusReassignEnum { } public static Boolean isFinished(Integer code) { - if (SUCCEED.getCode().equals(code) - || FAILED.getCode().equals(code) - || CANCELED.getCode().equals(code)) { - return true; - } - return false; + return SUCCEED.getCode().equals(code) || FAILED.getCode().equals(code) || CANCELED.getCode().equals(code); } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicAuthorityEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicAuthorityEnum.java index 2cfba027..7abafb8c 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicAuthorityEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicAuthorityEnum.java @@ -33,4 +33,12 @@ public enum TopicAuthorityEnum { public String getMessage() { return message; } + + @Override + public String toString() { + return "TopicAuthorityEnum{" + + "code=" + code + + ", message='" + message + '\'' + + '}'; + } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java index bac44235..6a2f32c1 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicExpiredStatusEnum.java @@ -29,4 +29,12 @@ public enum TopicExpiredStatusEnum { public String getMessage() { return message; } + + @Override + public String toString() { + return "TopicExpiredStatusEnum{" + + "status=" + status + + ", message='" + message + '\'' + + '}'; + } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicOffsetChangedEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicOffsetChangedEnum.java index ecb6b2f1..4c88f25c 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicOffsetChangedEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/TopicOffsetChangedEnum.java @@ -23,18 +23,10 @@ public enum TopicOffsetChangedEnum { return code; } - public void setCode(Integer code) { - this.code = code; - } - public String getMessage() { return message; } - public void setMessage(String message) { - this.message = message; - } - @Override public String toString() { return "TopicOffsetChangedEnum{" + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/gateway/GatewayConfigKeyEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/gateway/GatewayConfigKeyEnum.java index 226cc5c6..b3403e69 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/gateway/GatewayConfigKeyEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/gateway/GatewayConfigKeyEnum.java @@ -5,11 +5,11 @@ package com.xiaojukeji.kafka.manager.common.bizenum.gateway; * @date 20/7/28 */ public enum GatewayConfigKeyEnum { - SD_CLUSTER_ID("SERVICE_DISCOVERY_CLUSTER_ID", "SERVICE_DISCOVERY_CLUSTER_ID"), - SD_QUEUE_SIZE("SERVICE_DISCOVERY_QUEUE_SIZE", "SERVICE_DISCOVERY_QUEUE_SIZE"), - SD_APP_ID_RATE("SERVICE_DISCOVERY_APPID_RATE", "SERVICE_DISCOVERY_APPID_RATE"), - SD_IP_RATE("SERVICE_DISCOVERY_IP_RATE", "SERVICE_DISCOVERY_IP_RATE"), - SD_SP_RATE("SERVICE_DISCOVERY_SP_RATE", "SERVICE_DISCOVERY_SP_RATE"), + SD_CLUSTER_ID("SD_CLUSTER_ID", "SD_CLUSTER_ID"), + SD_QUEUE_SIZE("SD_QUEUE_SIZE", "SD_QUEUE_SIZE"), + SD_APP_RATE("SD_APP_RATE", "SD_APP_RATE"), + SD_IP_RATE("SD_IP_RATE", "SD_IP_RATE"), + SD_SP_RATE("SD_SP_RATE", "SD_SP_RATE"), ; @@ -26,18 +26,10 @@ public enum GatewayConfigKeyEnum { return configType; } - public void setConfigType(String configType) { - this.configType = configType; - } - public String getConfigName() { return configName; } - public void setConfigName(String configName) { - this.configName = configName; - } - @Override public String toString() { return "GatewayConfigKeyEnum{" + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiLevelContent.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiLevelContent.java index 2447564f..8136cd16 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiLevelContent.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiLevelContent.java @@ -12,4 +12,7 @@ public class ApiLevelContent { public static final int LEVEL_NORMAL_3 = 3; public static final int LEVEL_DEFAULT_4 = 4; + + private ApiLevelContent() { + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java index 1aec18f2..b0f84405 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java @@ -28,4 +28,7 @@ public class ApiPrefix { // gateway public static final String GATEWAY_API_V1_PREFIX = "/gateway" + API_V1_PREFIX; + + private ApiPrefix() { + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ConfigConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ConfigConstant.java index faca17b0..361c841f 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ConfigConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ConfigConstant.java @@ -30,4 +30,7 @@ public class ConfigConstant { public static final String BROKER_CAPACITY_LIMIT_CONFIG_KEY = "BROKER_CAPACITY_LIMIT_CONFIG"; public static final String KAFKA_CLUSTER_DO_CONFIG_KEY = "KAFKA_CLUSTER_DO_CONFIG"; + + private ConfigConstant() { + } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/Constant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/Constant.java index 81c1dc89..7ecc295b 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/Constant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/Constant.java @@ -45,4 +45,7 @@ public class Constant { public static final Integer DEFAULT_MAX_CAL_TOPIC_EXPIRED_DAY = 90; public static final Integer INVALID_CODE = -1; + + private Constant() { + } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java index 92425303..4d69f914 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java @@ -16,4 +16,7 @@ public class KafkaConstant { public static final String CLIENT_VERSION_NAME_UNKNOWN = "unknown"; public static final String RETENTION_MS_KEY = "retention.ms"; + + private KafkaConstant() { + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaMetricsCollections.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaMetricsCollections.java index 9c7f59e3..be82317a 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaMetricsCollections.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaMetricsCollections.java @@ -39,4 +39,7 @@ public class KafkaMetricsCollections { * Broker信息 */ public static final int BROKER_VERSION = 400; + + private KafkaMetricsCollections() { + } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LogConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LogConstant.java index 55fa756f..bba7670c 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LogConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LogConstant.java @@ -10,4 +10,7 @@ public class LogConstant { public static final String API_METRICS_LOGGER = "API_METRICS_LOGGER"; public static final String SCHEDULED_TASK_LOGGER = "SCHEDULED_TASK_LOGGER"; + + private LogConstant() { + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LoginConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LoginConstant.java index bc95dbc2..8c9b47aa 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LoginConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/LoginConstant.java @@ -11,4 +11,7 @@ public class LoginConstant { public static final String COOKIE_CHINESE_USERNAME_KEY = "chineseName"; public static final Integer COOKIE_OR_SESSION_MAX_AGE_UNIT_MS = 24 * 60 * 60 * 1000; + + private LoginConstant() { + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/SystemCodeConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/SystemCodeConstant.java index c3162a4b..510a90c1 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/SystemCodeConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/SystemCodeConstant.java @@ -6,4 +6,7 @@ package com.xiaojukeji.kafka.manager.common.constant; */ public class SystemCodeConstant { public static final String KAFKA_MANAGER = "kafka-manager"; + + private SystemCodeConstant() { + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java index b8c361b3..4d569907 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java @@ -25,6 +25,8 @@ public class TopicCreationConstant { public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms"; + public static final String TOPIC_RETENTION_BYTES_KEY_NAME = "retention.bytes"; + public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L; public static Properties createNewProperties(Long retentionTime) { @@ -54,4 +56,7 @@ public class TopicCreationConstant { * 单次自动化审批, 最多允许的通过单子 */ public static final Integer MAX_PASSED_ORDER_NUM_PER_TASK = 200; + + private TopicCreationConstant() { + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicSampleConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicSampleConstant.java index 5ee15331..d409e862 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicSampleConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicSampleConstant.java @@ -16,4 +16,7 @@ public class TopicSampleConstant { public static final Integer MAX_TIMEOUT_UNIT_MS = 10000; public static final Integer POLL_TIME_OUT_UNIT_MS = 2000; public static final Integer MAX_DATA_LENGTH_UNIT_BYTE = 2048; + + private TopicSampleConstant() { + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TrickLoginConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TrickLoginConstant.java new file mode 100644 index 00000000..0bb92d2e --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TrickLoginConstant.java @@ -0,0 +1,24 @@ +package com.xiaojukeji.kafka.manager.common.constant; + +public class TrickLoginConstant { + /** + * HTTP Header key + */ + public static final String TRICK_LOGIN_SWITCH = "Trick-Login-Switch"; + + public static final String TRICK_LOGIN_USER = "Trick-Login-User"; + + /** + * 配置允许 trick 登录用户名单 + */ + public static final String TRICK_LOGIN_LEGAL_USER_CONFIG_KEY = "SECURITY.TRICK_USERS"; + + /** + * 开关状态值 + */ + public static final String TRICK_LOGIN_SWITCH_ON = "on"; + public static final String TRICK_LOGIN_SWITCH_OFF = "off"; + + private TrickLoginConstant() { + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java index 7b3bc979..6b734348 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java @@ -1,5 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.ao.gateway; +import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO; + /** * @author zhongyuankai * @date 2020/4/27 @@ -65,4 +67,15 @@ public class TopicQuota { ", consumeQuota=" + consumeQuota + '}'; } + + public static TopicQuota buildFrom(TopicQuotaDTO dto) { + TopicQuota topicQuota = new TopicQuota(); + topicQuota.setAppId(dto.getAppId()); + topicQuota.setClusterId(dto.getClusterId()); + topicQuota.setTopicName(dto.getTopicName()); + topicQuota.setProduceQuota(dto.getProduceQuota()); + topicQuota.setConsumeQuota(dto.getConsumeQuota()); + return topicQuota; + } + } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java index e3ea08ed..9150569b 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java @@ -37,6 +37,8 @@ public class TopicBasicDTO { private Long retentionTime; + private Long retentionBytes; + public Long getClusterId() { return clusterId; } @@ -157,6 +159,14 @@ public class TopicBasicDTO { this.retentionTime = retentionTime; } + public Long getRetentionBytes() { + return retentionBytes; + } + + public void setRetentionBytes(Long retentionBytes) { + this.retentionBytes = retentionBytes; + } + @Override public String toString() { return "TopicBasicDTO{" + @@ -166,7 +176,7 @@ public class TopicBasicDTO { ", principals='" + principals + '\'' + ", topicName='" + topicName + '\'' + ", description='" + description + '\'' + - ", regionNameList='" + regionNameList + '\'' + + ", regionNameList=" + regionNameList + ", score=" + score + ", topicCodeC='" + topicCodeC + '\'' + ", partitionNum=" + partitionNum + @@ -175,6 +185,7 @@ public class TopicBasicDTO { ", modifyTime=" + modifyTime + ", createTime=" + createTime + ", retentionTime=" + retentionTime + + ", retentionBytes=" + retentionBytes + '}'; } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java new file mode 100644 index 00000000..5719cd28 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicQuotaDTO.java @@ -0,0 +1,47 @@ +package com.xiaojukeji.kafka.manager.common.entity.dto.gateway; + +import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel(description = "配额调整") +public class TopicQuotaDTO extends ClusterTopicDTO { + @ApiModelProperty(value = "appId") + private String appId; + + @ApiModelProperty(value = "发送数据速率B/s") + private Long produceQuota; + + @ApiModelProperty(value = "消费数据速率B/s") + private Long consumeQuota; + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public Long getProduceQuota() { + return produceQuota; + } + + public void setProduceQuota(Long produceQuota) { + this.produceQuota = produceQuota; + } + + public Long getConsumeQuota() { + return consumeQuota; + } + + public void setConsumeQuota(Long consumeQuota) { + this.consumeQuota = consumeQuota; + } + + @Override + public boolean paramLegal() { + return !ValidateUtils.isNullOrLessThanZero(clusterId) && !ValidateUtils.isBlank(topicName) && !ValidateUtils.isBlank(appId); + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java index 946a9997..b200a150 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java @@ -33,6 +33,9 @@ public class TopicBasicVO { @ApiModelProperty(value = "存储时间(ms)") private Long retentionTime; + @ApiModelProperty(value = "单分区数据保存大小(Byte)") + private Long retentionBytes; + @ApiModelProperty(value = "创建时间") private Long createTime; @@ -62,12 +65,20 @@ public class TopicBasicVO { this.clusterId = clusterId; } - public String getTopicCodeC() { - return topicCodeC; + public String getAppId() { + return appId; } - public void setTopicCodeC(String topicCodeC) { - this.topicCodeC = topicCodeC; + public void setAppId(String appId) { + this.appId = appId; + } + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; } public Integer getPartitionNum() { @@ -86,22 +97,6 @@ public class TopicBasicVO { this.replicaNum = replicaNum; } - public Long getModifyTime() { - return modifyTime; - } - - public void setModifyTime(Long modifyTime) { - this.modifyTime = modifyTime; - } - - public Long getCreateTime() { - return createTime; - } - - public void setCreateTime(Long createTime) { - this.createTime = createTime; - } - public String getPrincipals() { return principals; } @@ -110,30 +105,6 @@ public class TopicBasicVO { this.principals = principals; } - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - public void setBootstrapServers(String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - } - - public String getAppId() { - return appId; - } - - public String getBootstrapServers() { - return bootstrapServers; - } - public Long getRetentionTime() { return retentionTime; } @@ -142,12 +113,28 @@ public class TopicBasicVO { this.retentionTime = retentionTime; } - public String getAppName() { - return appName; + public Long getRetentionBytes() { + return retentionBytes; } - public void setAppName(String appName) { - this.appName = appName; + public void setRetentionBytes(Long retentionBytes) { + this.retentionBytes = retentionBytes; + } + + public Long getCreateTime() { + return createTime; + } + + public void setCreateTime(Long createTime) { + this.createTime = createTime; + } + + public Long getModifyTime() { + return modifyTime; + } + + public void setModifyTime(Long modifyTime) { + this.modifyTime = modifyTime; } public Integer getScore() { @@ -158,6 +145,30 @@ public class TopicBasicVO { this.score = score; } + public String getTopicCodeC() { + return topicCodeC; + } + + public void setTopicCodeC(String topicCodeC) { + this.topicCodeC = topicCodeC; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + public List getRegionNameList() { return regionNameList; } @@ -176,6 +187,7 @@ public class TopicBasicVO { ", replicaNum=" + replicaNum + ", principals='" + principals + '\'' + ", retentionTime=" + retentionTime + + ", retentionBytes=" + retentionBytes + ", createTime=" + createTime + ", modifyTime=" + modifyTime + ", score=" + score + diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/SpringTool.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/SpringTool.java index de0783d2..50723ebf 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/SpringTool.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/SpringTool.java @@ -2,6 +2,7 @@ package com.xiaojukeji.kafka.manager.common.utils; import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.LoginConstant; +import com.xiaojukeji.kafka.manager.common.constant.TrickLoginConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -53,13 +54,6 @@ public class SpringTool implements ApplicationContextAware, DisposableBean { return getApplicationContext().getBeansOfType(type); } -// /** -// * 从静态变量applicationContext中去的Bean,自动转型为所复制对象的类型 -// */ -// public static T getBean(Class requiredType) { -// return (T) applicationContext.getBean(requiredType); -// } - /** * 清除SpringContextHolder中的ApplicationContext为Null */ @@ -87,10 +81,18 @@ public class SpringTool implements ApplicationContextAware, DisposableBean { } public static String getUserName(){ - HttpServletRequest request = - ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); - HttpSession session = request.getSession(); - String username = (String) session.getAttribute(LoginConstant.SESSION_USERNAME_KEY); + HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); + + String username = null; + if (TrickLoginConstant.TRICK_LOGIN_SWITCH_ON.equals(request.getHeader(TrickLoginConstant.TRICK_LOGIN_SWITCH))) { + // trick登录方式的获取用户 + username = request.getHeader(TrickLoginConstant.TRICK_LOGIN_USER); + } else { + // 走页面登录方式登录的获取用户 + HttpSession session = request.getSession(); + username = (String) session.getAttribute(LoginConstant.SESSION_USERNAME_KEY); + } + if (ValidateUtils.isNull(username)) { return Constant.DEFAULT_USER_NAME; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java index c7c69ca3..5625b37f 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java @@ -79,7 +79,8 @@ public class JmxConnectorWrap { try { Map environment = new HashMap(); if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getPassword())) { - environment.put(JMXConnector.CREDENTIALS, Arrays.asList(this.jmxConfig.getUsername(), this.jmxConfig.getPassword())); + // fixed by riyuetianmu + environment.put(JMXConnector.CREDENTIALS, new String[]{this.jmxConfig.getUsername(), this.jmxConfig.getPassword()}); } if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) { environment.put(Context.SECURITY_PROTOCOL, "ssl"); diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java index 6705f435..0410a553 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/ZkPathUtil.java @@ -119,4 +119,7 @@ public class ZkPathUtil { public static String getControllerCandidatePath(Integer brokerId) { return D_CONTROLLER_CANDIDATES + ZOOKEEPER_SEPARATOR + brokerId; } + + private ZkPathUtil() { + } } diff --git a/kafka-manager-console/package.json b/kafka-manager-console/package.json index 920fa613..3be33c21 100644 --- a/kafka-manager-console/package.json +++ b/kafka-manager-console/package.json @@ -1,18 +1,18 @@ { "name": "logi-kafka", - "version": "2.3.1", + "version": "2.4.3", "description": "", "scripts": { "start": "webpack-dev-server", "daily-build": "cross-env NODE_ENV=production webpack", "pre-build": "cross-env NODE_ENV=production webpack", - "prod-build": "cross-env NODE_ENV=production webpack" + "prod-build": "cross-env NODE_ENV=production webpack", + "fix-memory": "cross-env LIMIT=4096 increase-memory-limit" }, "author": "", "license": "ISC", "devDependencies": { "@hot-loader/react-dom": "^16.8.6", - "@types/clipboard": "^2.0.1", "@types/echarts": "^4.4.1", "@types/lodash.debounce": "^4.0.6", "@types/react": "^16.8.8", @@ -21,12 +21,13 @@ "@types/spark-md5": "^3.0.2", "antd": "^3.26.15", "clean-webpack-plugin": "^3.0.0", - "clipboard": "2.0.6", + "clipboard": "^2.0.8", "cross-env": "^7.0.2", "css-loader": "^2.1.0", "echarts": "^4.5.0", "file-loader": "^5.0.2", "html-webpack-plugin": "^3.2.0", + "increase-memory-limit": "^1.0.7", "less": "^3.9.0", "less-loader": "^4.1.0", "mini-css-extract-plugin": "^0.6.0", diff --git a/kafka-manager-console/src/container/alarm/add-alarm/alarm-select.tsx b/kafka-manager-console/src/container/alarm/add-alarm/alarm-select.tsx index 5cd1f4f0..04540287 100644 --- a/kafka-manager-console/src/container/alarm/add-alarm/alarm-select.tsx +++ b/kafka-manager-console/src/container/alarm/add-alarm/alarm-select.tsx @@ -54,7 +54,7 @@ export class AlarmSelect extends React.Component { 新增规则组? diff --git a/kafka-manager-console/src/container/header/index.tsx b/kafka-manager-console/src/container/header/index.tsx index ebda9760..e12e397c 100644 --- a/kafka-manager-console/src/container/header/index.tsx +++ b/kafka-manager-console/src/container/header/index.tsx @@ -145,7 +145,7 @@ export const Header = observer((props: IHeader) => {
Kafka Manager - v2.4.0 + v2.4.2 {/* 添加版本超链接 */}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index 631b254f..a7142fa9 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -3,10 +3,12 @@ package com.xiaojukeji.kafka.manager.service.cache; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum; import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; +import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ListUtils; +import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap; @@ -56,7 +58,7 @@ public class PhysicalClusterMetadataManager { private final static Map> TOPIC_METADATA_MAP = new ConcurrentHashMap<>(); - private final static Map> TOPIC_RETENTION_TIME_MAP = new ConcurrentHashMap<>(); + private final static Map> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>(); private final static Map> BROKER_METADATA_MAP = new ConcurrentHashMap<>(); @@ -95,7 +97,7 @@ public class PhysicalClusterMetadataManager { // 初始化topic-map TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); - TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); + TOPIC_PROPERTIES_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); // 初始化cluster-map CLUSTER_MAP.put(clusterDO.getId(), clusterDO); @@ -158,7 +160,7 @@ public class PhysicalClusterMetadataManager { KAFKA_VERSION_MAP.remove(clusterId); TOPIC_METADATA_MAP.remove(clusterId); - TOPIC_RETENTION_TIME_MAP.remove(clusterId); + TOPIC_PROPERTIES_MAP.remove(clusterId); CLUSTER_MAP.remove(clusterId); } @@ -262,24 +264,45 @@ public class PhysicalClusterMetadataManager { //---------------------------配置相关元信息-------------- - public static void putTopicRetentionTime(Long clusterId, String topicName, Long retentionTime) { - Map timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId); - if (timeMap == null) { + public static void putTopicProperties(Long clusterId, String topicName, Properties properties) { + if (ValidateUtils.isNull(clusterId) || ValidateUtils.isBlank(topicName) || ValidateUtils.isNull(properties)) { return; } - timeMap.put(topicName, retentionTime); + + Map propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId); + if (ValidateUtils.isNull(propertiesMap)) { + return; + } + propertiesMap.put(topicName, properties); } public static Long getTopicRetentionTime(Long clusterId, String topicName) { - Map timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId); - if (timeMap == null) { + Map propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId); + if (ValidateUtils.isNull(propertiesMap)) { return null; } - return timeMap.get(topicName); + + Properties properties = propertiesMap.get(topicName); + if (ValidateUtils.isNull(properties)) { + return null; + } + + return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_TIME_KEY_NAME)); } + public static Long getTopicRetentionBytes(Long clusterId, String topicName) { + Map propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId); + if (ValidateUtils.isNull(propertiesMap)) { + return null; + } + Properties properties = propertiesMap.get(topicName); + if (ValidateUtils.isNull(properties)) { + return null; + } + return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_BYTES_KEY_NAME)); + } //---------------------------Broker元信息相关-------------- diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java index 8dc0e0c1..79524204 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java @@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import java.util.Date; import java.util.List; @@ -122,5 +123,12 @@ public interface TopicManagerService { List getTopicStatistic(Long clusterId, String topicName, Date startTime, Date endTime); TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName); + + /** + * topic权限调整 + * @param authorityDO topic权限 + * @return + */ + ResultStatus addAuthority(AuthorityDO authorityDO); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java index dacba4b0..9e4c244c 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java @@ -105,4 +105,5 @@ public interface TopicService { List getTopicBrokerList(Long clusterId, String topicName); Result checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime); + } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java index 6a78e4f4..225c67b0 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.service.service.gateway; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; /** @@ -34,4 +35,11 @@ public interface QuotaService { TopicQuota getQuotaFromZk(Long clusterId, String topicName, String appId); Boolean modifyProduceQuota(Long clusterId, String topicName, String appId, Long produceQuota); + + /** + * topic配额调整 + * @param topicQuota topic配额 + * @return + */ + ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java index 18ee0a0d..0ceb3b30 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java @@ -86,8 +86,8 @@ public class GatewayConfigServiceImpl implements GatewayConfigService { GatewayConfigDO configDO = null; try { configDO = gatewayConfigDao.getByConfigTypeAndName( - GatewayConfigKeyEnum.SD_APP_ID_RATE.getConfigType(), - GatewayConfigKeyEnum.SD_APP_ID_RATE.getConfigName() + GatewayConfigKeyEnum.SD_APP_RATE.getConfigType(), + GatewayConfigKeyEnum.SD_APP_RATE.getConfigName() ); if (ValidateUtils.isNull(configDO) || configDO.getVersion() <= requestVersion) { return new AppRateConfig(Long.MIN_VALUE, null); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java index 2ce5facf..8baf61cc 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java @@ -1,11 +1,16 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl; +import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService; import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; import com.xiaojukeji.kafka.manager.service.strategy.AbstractAllocateQuotaStrategy; @@ -28,6 +33,12 @@ public class QuotaServiceImpl implements QuotaService { @Autowired private AbstractAllocateQuotaStrategy allocateQuotaStrategy; + @Autowired + private LogicalClusterMetadataManager logicalClusterMetadataManager; + + @Autowired + private AuthorityService authorityService; + @Override public int addTopicQuota(TopicQuota topicQuotaDO) { return KafkaZookeeperUtils.setTopicQuota( @@ -78,4 +89,34 @@ public class QuotaServiceImpl implements QuotaService { } return Boolean.TRUE; } + + @Override + public ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota) { + // 获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + return ResultStatus.CLUSTER_NOT_EXIST; + } + // 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理) + AuthorityDO authority = authorityService.getAuthority(physicalClusterId, + topicQuota.getTopicName(), topicQuota.getAppId()); + if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) { + return ResultStatus.USER_WITHOUT_AUTHORITY; + } + if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) { + // 可以消费 + topicQuota.setProduceQuota(null); + } + if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) { + // 可以生产 + topicQuota.setConsumeQuota(null); + } + // 设置物理集群id + topicQuota.setClusterId(physicalClusterId); + // 添加配额 + if (addTopicQuota(topicQuota) > 0) { + return ResultStatus.SUCCESS; + } + return ResultStatus.ZOOKEEPER_WRITE_FAILED; + } } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java index 12af2e18..24eea55f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java @@ -91,7 +91,7 @@ public class BrokerServiceImpl implements BrokerService { for (BrokerDO brokerDO : brokerDOList) { PeakFlowStatusEnum peakFlowStatus = getPeakFlowStatus(brokerDO.getMaxAvgBytesIn(), peakFlow); peakFlowStatusMap.put( - peakFlowStatus.code, + peakFlowStatus.getCode(), peakFlowStatusMap.getOrDefault(peakFlowStatus.getCode(), 0) + 1 ); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index bce5fbe7..4a8f501f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -20,6 +20,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; import com.xiaojukeji.kafka.manager.common.utils.DateUtils; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; +import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData; @@ -618,6 +619,38 @@ public class TopicManagerServiceImpl implements TopicManagerService { return topicBusinessInfo; } + @Override + public ResultStatus addAuthority(AuthorityDO authorityDO) { + // 查询该用户拥有的应用 + List appDOs = appService.getByPrincipal(SpringTool.getUserName()); + if (ValidateUtils.isEmptyList(appDOs)) { + // 该用户无应用,需要先申请应用 + return ResultStatus.APP_NOT_EXIST; + } + List appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList()); + if (!appIds.contains(authorityDO.getAppId())) { + // 入参中的appId,该用户未拥有 + return ResultStatus.APP_NOT_EXIST; + } + // 获取物理集群id + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(authorityDO.getClusterId()); + if (ValidateUtils.isNull(physicalClusterId)) { + // 集群不存在 + return ResultStatus.CLUSTER_NOT_EXIST; + } + TopicDO topic = getByTopicName(physicalClusterId, authorityDO.getTopicName()); + if (ValidateUtils.isNull(topic)) { + // topic不存在 + return ResultStatus.TOPIC_NOT_EXIST; + } + // 设置物理集群id + authorityDO.setClusterId(physicalClusterId); + if (authorityService.addAuthority(authorityDO) > 0) { + return ResultStatus.SUCCESS; + } + return ResultStatus.MYSQL_ERROR; + } + private RdTopicBasic convert2RdTopicBasic(ClusterDO clusterDO, String topicName, TopicDO topicDO, diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 63191888..154faf77 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -223,6 +223,7 @@ public class TopicServiceImpl implements TopicService { basicDTO.setCreateTime(topicMetadata.getCreateTime()); basicDTO.setModifyTime(topicMetadata.getModifyTime()); basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName)); + basicDTO.setRetentionBytes(PhysicalClusterMetadataManager.getTopicRetentionBytes(clusterId, topicName)); TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName); if (!ValidateUtils.isNull(topicDO)) { @@ -648,10 +649,11 @@ public class TopicServiceImpl implements TopicService { List dataList = new ArrayList<>(); int currentSize = dataList.size(); while (dataList.size() < maxMsgNum) { + if (remainingWaitMs <= 0) { + break; + } + try { - if (remainingWaitMs <= 0) { - break; - } ConsumerRecords records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS); for (ConsumerRecord record : records) { String value = (String) record.value(); @@ -661,20 +663,22 @@ public class TopicServiceImpl implements TopicService { : value ); } - // 当前批次一条数据都没拉取到,则结束拉取 - if (dataList.size() - currentSize == 0) { - break; - } - currentSize = dataList.size(); - // 检查是否超时 - long elapsed = System.currentTimeMillis() - begin; - if (elapsed >= maxWaitMs) { - break; - } - remainingWaitMs = maxWaitMs - elapsed; } catch (Exception e) { LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e); } + + // 当前批次一条数据都没拉取到,则结束拉取 + if (dataList.size() - currentSize == 0) { + break; + } + currentSize = dataList.size(); + + // 检查是否超时 + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) { + break; + } + remainingWaitMs = maxWaitMs - elapsed; } return dataList.subList(0, Math.min(dataList.size(), maxMsgNum)); } @@ -698,14 +702,15 @@ public class TopicServiceImpl implements TopicService { : value ); } - if (System.currentTimeMillis() - timestamp > timeout - || dataList.size() >= maxMsgNum) { - break; - } Thread.sleep(10); } catch (Exception e) { LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e); } + + if (System.currentTimeMillis() - timestamp > timeout || dataList.size() >= maxMsgNum) { + // 超时或者是数据已采集足够时, 直接返回 + break; + } } return dataList.subList(0, Math.min(dataList.size(), maxMsgNum)); } diff --git a/kafka-manager-dao/src/main/resources/mapper/HeartbeatDao.xml b/kafka-manager-dao/src/main/resources/mapper/HeartbeatDao.xml index 900e13a9..bea686ff 100644 --- a/kafka-manager-dao/src/main/resources/mapper/HeartbeatDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/HeartbeatDao.xml @@ -11,7 +11,7 @@ - REPLACE heartbeat (ip, hostname) VALUES (#{ip}, #{hostname}) + REPLACE heartbeat (ip, hostname, modify_time) VALUES (#{ip}, #{hostname}, #{modifyTime})