mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-25 04:32:12 +08:00
Compare commits
34 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28d985aaf1 | ||
|
|
2397cbf80b | ||
|
|
a13d9daae3 | ||
|
|
c23870e020 | ||
|
|
dd2e29dd40 | ||
|
|
74b5700573 | ||
|
|
ba6abea6d8 | ||
|
|
33b231d512 | ||
|
|
61f0b67a92 | ||
|
|
4b679be310 | ||
|
|
a969795677 | ||
|
|
4f4e7e80fc | ||
|
|
2f72cbb627 | ||
|
|
a460e169ab | ||
|
|
27ce4d6a0d | ||
|
|
ac86f8aded | ||
|
|
93eca239cb | ||
|
|
dc5949d497 | ||
|
|
5e24f6b044 | ||
|
|
0cd31e0545 | ||
|
|
d4dc4b9d0a | ||
|
|
8c6fe40de1 | ||
|
|
e4dc4bae30 | ||
|
|
d99c21f4d7 | ||
|
|
8ef549de80 | ||
|
|
1b57758102 | ||
|
|
553fe30662 | ||
|
|
b6138afe8b | ||
|
|
64d64fe6fe | ||
|
|
f29b356b74 | ||
|
|
b5621d1ffd | ||
|
|
66f0da934d | ||
|
|
13a90fdd57 | ||
|
|
47265bb8d3 |
7
Dockerfile
Normal file
7
Dockerfile
Normal file
@@ -0,0 +1,7 @@
|
||||
FROM fabric8/java-alpine-openjdk8-jdk
|
||||
MAINTAINER xuzhengxi
|
||||
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
|
||||
ADD ./web/target/kafka-manager-web-1.1.0-SNAPSHOT.jar kafka-manager-web.jar
|
||||
ADD ./docker/kafka-manager/application-standalone.yml application.yml
|
||||
ENTRYPOINT ["java","-jar","/kafka-manager-web.jar","--spring.config.location=./application.yml"]
|
||||
EXPOSE 8080
|
||||
48
README.md
48
README.md
@@ -28,15 +28,15 @@
|
||||
|
||||
- 管理员用户与普通用户视角区分;
|
||||
- 管理员用户与普通用户权限区分;
|
||||
|
||||
|
||||
---
|
||||
|
||||
## kafka-manager架构图
|
||||
|
||||

|
||||
|
||||
|
||||
---
|
||||
|
||||
---
|
||||
|
||||
## 安装手册
|
||||
|
||||
@@ -45,12 +45,14 @@
|
||||
- `Maven 3.5.0+`(后端打包依赖)
|
||||
- `node v8.12.0+`(前端打包依赖)
|
||||
- `Java 8+`(运行环境需要)
|
||||
- `MySQL`(数据存储)
|
||||
- `MySQL` 或 `PostgreSQL`(数据存储)
|
||||
|
||||
---
|
||||
|
||||
### 环境初始化
|
||||
|
||||
**MySQL**
|
||||
|
||||
执行[create_mysql_table.sql](doc/create_mysql_table.sql)中的SQL命令,从而创建所需的MySQL库及表,默认创建的库名是`kafka_manager`。
|
||||
|
||||
```
|
||||
@@ -58,32 +60,50 @@
|
||||
mysql -uXXXX -pXXX -h XXX.XXX.XXX.XXX -PXXXX < ./create_mysql_table.sql
|
||||
```
|
||||
|
||||
**PostgreSQL**
|
||||
|
||||
执行[create_postgresql_table.sql](doc/create_postgresql_table.sql)中的SQL命令,从而创建所需的PostgreSQL表。
|
||||
|
||||
```
|
||||
############# 示例:
|
||||
psql -h XXX.XXX.XXX.XXX -U XXXX -d kafka_manager -f ./create_postgresql_table.sql
|
||||
```
|
||||
|
||||
*PostgreSQL 用户、数据库创建方式*
|
||||
|
||||
```sql
|
||||
create user admin encrypted password 'admin';
|
||||
create database kafka_manager owner=admin template=template0 encoding='UTF-8' lc_collate='zh_CN.UTF-8' lc_ctype='zh_CN.UTF-8';
|
||||
```
|
||||
|
||||
***默认配置使用 MySQL 数据库,若要使用 PostgreSQL 数据库,使用 `-Dspring.profiles.active=pg` 指定 `application-pg.yml` 配置文件。***
|
||||
|
||||
---
|
||||
|
||||
|
||||
|
||||
### 打包
|
||||
|
||||
执行`mvn install`命令即可。
|
||||
|
||||
执行`mvn install`命令即可。
|
||||
|
||||
备注:每一次执行`mvn install`命令,都将在`web/src/main/resources/templates`下面生成最新的前端资源文件,如果`console`模块下的代码没有变更,可以修改`./pom.xml`文件,忽略对`console`模块的打包。
|
||||
|
||||
---
|
||||
|
||||
### 启动
|
||||
|
||||
```
|
||||
```
|
||||
############# application.yml 是配置文件
|
||||
cp web/src/main/resources/application.yml web/target/
|
||||
cd web/target/
|
||||
nohup java -jar kafka-manager-web-1.0.0-SNAPSHOT.jar --spring.config.location=./application.yml > /dev/null 2>&1 &
|
||||
nohup java -jar kafka-manager-web-1.1.0-SNAPSHOT.jar --spring.config.location=./application.yml > /dev/null 2>&1 &
|
||||
```
|
||||
|
||||
### 使用
|
||||
|
||||
本地启动的话,访问`http://localhost:8080`,输入帐号及密码进行登录。更多参考:[kafka-manager使用手册](doc/user_cn_guide.md)
|
||||
|
||||
|
||||
---
|
||||
|
||||
---
|
||||
|
||||
## 相关文档
|
||||
|
||||
@@ -91,9 +111,9 @@ nohup java -jar kafka-manager-web-1.0.0-SNAPSHOT.jar --spring.config.location=./
|
||||
|
||||
|
||||
## 钉钉交流群
|
||||
|
||||
搜索群号:`32821440` 或者扫码可入群交流
|
||||
|
||||
|
||||
搜索群号:`32821440` 或者扫码可入群交流. 备注:在钉钉搜索框搜索`32821440`,然后搜索结果中点击 "网络查找手机/邮箱/钉钉号" 即可看到我们的钉钉群:滴滴KafkaManager开源用户群。
|
||||
|
||||
|
||||

|
||||
|
||||
|
||||
@@ -5,13 +5,13 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<artifactId>kafka-manager-common</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<artifactId>kafka-manager</artifactId>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
package com.xiaojukeji.kafka.manager.common.entity.dto.consumer;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@@ -11,20 +8,33 @@ import java.util.Map;
|
||||
* @date 2015/11/12
|
||||
*/
|
||||
public class ConsumerDTO {
|
||||
/**
|
||||
* 消费group名
|
||||
*/
|
||||
private Long clusterId;
|
||||
|
||||
private String topicName;
|
||||
|
||||
private String consumerGroup;
|
||||
|
||||
/**
|
||||
* 消费类型,一般为static
|
||||
*/
|
||||
private String location;
|
||||
|
||||
/**
|
||||
* 订阅的每个topic的partition状态列表
|
||||
*/
|
||||
private Map<String, List<PartitionState>> topicPartitionMap;
|
||||
private Map<Integer, Long> partitionOffsetMap;
|
||||
|
||||
private Map<Integer, Long> consumerOffsetMap;
|
||||
|
||||
public Long getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public void setClusterId(Long clusterId) {
|
||||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
public String getTopicName() {
|
||||
return topicName;
|
||||
}
|
||||
|
||||
public void setTopicName(String topicName) {
|
||||
this.topicName = topicName;
|
||||
}
|
||||
|
||||
public String getConsumerGroup() {
|
||||
return consumerGroup;
|
||||
@@ -42,20 +52,31 @@ public class ConsumerDTO {
|
||||
this.location = location;
|
||||
}
|
||||
|
||||
public Map<String, List<PartitionState>> getTopicPartitionMap() {
|
||||
return topicPartitionMap;
|
||||
public Map<Integer, Long> getPartitionOffsetMap() {
|
||||
return partitionOffsetMap;
|
||||
}
|
||||
|
||||
public void setTopicPartitionMap(Map<String, List<PartitionState>> topicPartitionMap) {
|
||||
this.topicPartitionMap = topicPartitionMap;
|
||||
public void setPartitionOffsetMap(Map<Integer, Long> partitionOffsetMap) {
|
||||
this.partitionOffsetMap = partitionOffsetMap;
|
||||
}
|
||||
|
||||
public Map<Integer, Long> getConsumerOffsetMap() {
|
||||
return consumerOffsetMap;
|
||||
}
|
||||
|
||||
public void setConsumerOffsetMap(Map<Integer, Long> consumerOffsetMap) {
|
||||
this.consumerOffsetMap = consumerOffsetMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Consumer{" +
|
||||
"consumerGroup='" + consumerGroup + '\'' +
|
||||
return "ConsumerDTO{" +
|
||||
"clusterId=" + clusterId +
|
||||
", topicName='" + topicName + '\'' +
|
||||
", consumerGroup='" + consumerGroup + '\'' +
|
||||
", location='" + location + '\'' +
|
||||
", topicPartitionMap=" + topicPartitionMap +
|
||||
", partitionOffsetMap=" + partitionOffsetMap +
|
||||
", consumerOffsetMap=" + consumerOffsetMap +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,10 @@ public class OrderPartitionDO extends BaseDO{
|
||||
|
||||
private String applicant;
|
||||
|
||||
private Integer partitionNum;
|
||||
|
||||
private String brokerList;
|
||||
|
||||
private Long peakBytesIn;
|
||||
|
||||
private String description;
|
||||
@@ -51,6 +55,22 @@ public class OrderPartitionDO extends BaseDO{
|
||||
this.applicant = applicant;
|
||||
}
|
||||
|
||||
public Integer getPartitionNum() {
|
||||
return partitionNum;
|
||||
}
|
||||
|
||||
public void setPartitionNum(Integer partitionNum) {
|
||||
this.partitionNum = partitionNum;
|
||||
}
|
||||
|
||||
public String getBrokerList() {
|
||||
return brokerList;
|
||||
}
|
||||
|
||||
public void setBrokerList(String brokerList) {
|
||||
this.brokerList = brokerList;
|
||||
}
|
||||
|
||||
public Long getPeakBytesIn() {
|
||||
return peakBytesIn;
|
||||
}
|
||||
@@ -98,6 +118,8 @@ public class OrderPartitionDO extends BaseDO{
|
||||
", clusterName='" + clusterName + '\'' +
|
||||
", topicName='" + topicName + '\'' +
|
||||
", applicant='" + applicant + '\'' +
|
||||
", partitionNum=" + partitionNum +
|
||||
", brokerList='" + brokerList + '\'' +
|
||||
", peakBytesIn=" + peakBytesIn +
|
||||
", description='" + description + '\'' +
|
||||
", orderStatus=" + orderStatus +
|
||||
|
||||
57
console/package-lock.json
generated
57
console/package-lock.json
generated
@@ -751,8 +751,7 @@
|
||||
"version": "1.0.0",
|
||||
"resolved": "http://registry.npm.taobao.org/assert-plus/download/assert-plus-1.0.0.tgz",
|
||||
"integrity": "sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU=",
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"assign-symbols": {
|
||||
"version": "1.0.0",
|
||||
@@ -1505,7 +1504,6 @@
|
||||
"resolved": "http://registry.npm.taobao.org/combined-stream/download/combined-stream-1.0.7.tgz",
|
||||
"integrity": "sha1-LR0kMXr7ir6V1tLAsHtXgTU52Cg=",
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"delayed-stream": "~1.0.0"
|
||||
}
|
||||
@@ -2222,8 +2220,7 @@
|
||||
"version": "1.0.0",
|
||||
"resolved": "http://registry.npm.taobao.org/delayed-stream/download/delayed-stream-1.0.0.tgz",
|
||||
"integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=",
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"depd": {
|
||||
"version": "1.1.2",
|
||||
@@ -2941,8 +2938,7 @@
|
||||
"version": "1.3.0",
|
||||
"resolved": "http://registry.npm.taobao.org/extsprintf/download/extsprintf-1.3.0.tgz",
|
||||
"integrity": "sha1-lpGEQOMEGnpBT4xS48V06zw+HgU=",
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"fast-deep-equal": {
|
||||
"version": "2.0.1",
|
||||
@@ -3358,8 +3354,7 @@
|
||||
"ansi-regex": {
|
||||
"version": "2.1.1",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"aproba": {
|
||||
"version": "1.2.0",
|
||||
@@ -3380,14 +3375,12 @@
|
||||
"balanced-match": {
|
||||
"version": "1.0.0",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"brace-expansion": {
|
||||
"version": "1.1.11",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"balanced-match": "^1.0.0",
|
||||
"concat-map": "0.0.1"
|
||||
@@ -3402,20 +3395,17 @@
|
||||
"code-point-at": {
|
||||
"version": "1.1.0",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"concat-map": {
|
||||
"version": "0.0.1",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"console-control-strings": {
|
||||
"version": "1.1.0",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"core-util-is": {
|
||||
"version": "1.0.2",
|
||||
@@ -3532,8 +3522,7 @@
|
||||
"inherits": {
|
||||
"version": "2.0.3",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"ini": {
|
||||
"version": "1.3.5",
|
||||
@@ -3545,7 +3534,6 @@
|
||||
"version": "1.0.0",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"number-is-nan": "^1.0.0"
|
||||
}
|
||||
@@ -3560,7 +3548,6 @@
|
||||
"version": "3.0.4",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"brace-expansion": "^1.1.7"
|
||||
}
|
||||
@@ -3568,14 +3555,12 @@
|
||||
"minimist": {
|
||||
"version": "0.0.8",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"minipass": {
|
||||
"version": "2.3.5",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"safe-buffer": "^5.1.2",
|
||||
"yallist": "^3.0.0"
|
||||
@@ -3594,7 +3579,6 @@
|
||||
"version": "0.5.1",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"minimist": "0.0.8"
|
||||
}
|
||||
@@ -3675,8 +3659,7 @@
|
||||
"number-is-nan": {
|
||||
"version": "1.0.1",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"object-assign": {
|
||||
"version": "4.1.1",
|
||||
@@ -3688,7 +3671,6 @@
|
||||
"version": "1.4.0",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"wrappy": "1"
|
||||
}
|
||||
@@ -3774,8 +3756,7 @@
|
||||
"safe-buffer": {
|
||||
"version": "5.1.2",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"safer-buffer": {
|
||||
"version": "2.1.2",
|
||||
@@ -3811,7 +3792,6 @@
|
||||
"version": "1.0.2",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"code-point-at": "^1.0.0",
|
||||
"is-fullwidth-code-point": "^1.0.0",
|
||||
@@ -3831,7 +3811,6 @@
|
||||
"version": "3.0.1",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true,
|
||||
"requires": {
|
||||
"ansi-regex": "^2.0.0"
|
||||
}
|
||||
@@ -3875,14 +3854,12 @@
|
||||
"wrappy": {
|
||||
"version": "1.0.2",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"yallist": {
|
||||
"version": "3.0.3",
|
||||
"bundled": true,
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -4864,8 +4841,7 @@
|
||||
"version": "0.1.1",
|
||||
"resolved": "http://registry.npm.taobao.org/jsbn/download/jsbn-0.1.1.tgz",
|
||||
"integrity": "sha1-peZUwuWi3rXyAdls77yoDA7y9RM=",
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"json-parse-better-errors": {
|
||||
"version": "1.0.2",
|
||||
@@ -8883,8 +8859,7 @@
|
||||
"version": "0.14.5",
|
||||
"resolved": "http://registry.npm.taobao.org/tweetnacl/download/tweetnacl-0.14.5.tgz",
|
||||
"integrity": "sha1-WuaBd/GS1EViadEIr6k/+HQ/T2Q=",
|
||||
"dev": true,
|
||||
"optional": true
|
||||
"dev": true
|
||||
},
|
||||
"type-is": {
|
||||
"version": "1.6.16",
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>kafka-manager</artifactId>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>kafka-manager-console</artifactId>
|
||||
|
||||
@@ -10,7 +10,7 @@ import { IClusterData } from 'types/base-type';
|
||||
|
||||
const TabPane = Tabs.TabPane;
|
||||
|
||||
const detailUrl ='/admin/cluster_detail?clusterId=';
|
||||
const detailUrl = '/admin/cluster_detail?clusterId=';
|
||||
|
||||
const collectionColumns: Array<ColumnProps<IClusterData>> = [
|
||||
{
|
||||
@@ -24,7 +24,8 @@ const collectionColumns: Array<ColumnProps<IClusterData>> = [
|
||||
key: 'clusterName',
|
||||
sorter: (a: IClusterData, b: IClusterData) => a.clusterName.charCodeAt(0) - b.clusterName.charCodeAt(0),
|
||||
render: (text, record) => {
|
||||
return <a href={`${detailUrl}${record.clusterId}`}>{record.clusterName}</a>;
|
||||
const url = `${detailUrl}${record.clusterId}&clusterName=${record.clusterName}`;
|
||||
return <a href={encodeURI(url)}>{record.clusterName}</a>;
|
||||
},
|
||||
},
|
||||
{
|
||||
|
||||
@@ -39,10 +39,10 @@ export class UserManage extends SearchAndFilter {
|
||||
public renderColumns = () => {
|
||||
const role = Object.assign({
|
||||
title: '角色',
|
||||
key: 'role',
|
||||
dataIndex: 'role',
|
||||
key: 'roleName',
|
||||
dataIndex: 'roleName',
|
||||
filters: users.filterRole,
|
||||
onFilter: (value: string, record: any) => record.role.indexOf(value) === 0,
|
||||
onFilter: (value: string, record: any) => record.roleName.indexOf(value) === 0,
|
||||
}, this.renderColumnsFilter('filterVisible'));
|
||||
|
||||
return [
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import * as React from 'react';
|
||||
import './index.less';
|
||||
import { Table, Modal, notification, PaginationConfig, Button } from 'component/antd';
|
||||
import { Table, Modal, notification, PaginationConfig, Button, Spin } from 'component/antd';
|
||||
import { broker, IBroker, IBrokerNetworkInfo, IBrokerPartition } from 'store/broker';
|
||||
import { observer } from 'mobx-react';
|
||||
import { StatusGraghCom } from 'component/flow-table';
|
||||
@@ -49,10 +49,19 @@ export class BrokerList extends SearchAndFilter {
|
||||
|
||||
const status = Object.assign({
|
||||
title: '已同步',
|
||||
dataIndex: 'status',
|
||||
key: 'status',
|
||||
filters: [{ text: '是', value: '是' }, { text: '否', value: '否' }],
|
||||
onFilter: (value: string, record: IBrokerPartition) => record.status === value,
|
||||
dataIndex: 'underReplicatedPartitionCount',
|
||||
key: 'underReplicatedPartitionCount',
|
||||
filters: [{ text: '是', value: '1' }, { text: '否', value: '0' }],
|
||||
onFilter: (value: string, record: IBrokerPartition) => {
|
||||
// underReplicatedPartitionCount > 0 表示未同步完成
|
||||
const syncStatus = record.underReplicatedPartitionCount ? '0' : '1';
|
||||
return syncStatus === value;
|
||||
},
|
||||
render: (text: number) => (
|
||||
<>
|
||||
<span style={{ marginRight: 8 }}>{text ? '否' : '是'}</span>
|
||||
</>
|
||||
),
|
||||
}, this.renderColumnsFilter('filterVisible'));
|
||||
|
||||
return [{
|
||||
@@ -80,7 +89,8 @@ export class BrokerList extends SearchAndFilter {
|
||||
title: '未同步副本数量',
|
||||
dataIndex: 'notUnderReplicatedPartitionCount',
|
||||
key: 'notUnderReplicatedPartitionCount',
|
||||
sorter: (a: IBrokerPartition, b: IBrokerPartition) => a.notUnderReplicatedPartitionCount - b.notUnderReplicatedPartitionCount,
|
||||
sorter: (a: IBrokerPartition, b: IBrokerPartition) =>
|
||||
a.notUnderReplicatedPartitionCount - b.notUnderReplicatedPartitionCount,
|
||||
},
|
||||
status,
|
||||
region,
|
||||
@@ -205,7 +215,7 @@ export class BrokerList extends SearchAndFilter {
|
||||
const dataPartitions = this.state.searchId !== '' ?
|
||||
broker.partitions.filter((d) => d.brokerId === +this.state.searchId) : broker.partitions;
|
||||
return (
|
||||
<>
|
||||
<Spin spinning={broker.loading}>
|
||||
<div className="k-row">
|
||||
<ul className="k-tab">
|
||||
<li>Broker概览</li>
|
||||
@@ -239,7 +249,7 @@ export class BrokerList extends SearchAndFilter {
|
||||
pagination={pagination}
|
||||
/>
|
||||
</div>
|
||||
</>
|
||||
</Spin>
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,14 +45,19 @@ class LeaderRebalance extends React.Component<any> {
|
||||
constructor(props: any) {
|
||||
super(props);
|
||||
const url = Url();
|
||||
this.clusterName = decodeURI(atob(url.search.clusterName));
|
||||
if (url.search.clusterName) {
|
||||
this.clusterName = decodeURI(url.search.clusterName);
|
||||
}
|
||||
this.clusterId = Number(url.search.clusterId);
|
||||
}
|
||||
|
||||
public handleSubmit = (e: React.MouseEvent<any, MouseEvent>) => {
|
||||
e.preventDefault();
|
||||
this.setState({ loading: true });
|
||||
this.props.form.validateFieldsAndScroll((err: any, values: any) => {
|
||||
if (err) {
|
||||
return;
|
||||
}
|
||||
this.setState({ loading: true });
|
||||
this.brokerId = Number(values.brokerId);
|
||||
addRebalance({ brokerId: this.brokerId, clusterId: this.clusterId, dimension: 0 }).then(() => {
|
||||
cluster.getRebalance(this.clusterId).then(() => {
|
||||
|
||||
@@ -35,6 +35,7 @@ export interface IBrokerPartition extends IBroker {
|
||||
leaderCount: number;
|
||||
partitionCount: number;
|
||||
notUnderReplicatedPartitionCount: number;
|
||||
underReplicatedPartitionCount?: number;
|
||||
regionName: string;
|
||||
bytesInPerSec: number;
|
||||
}
|
||||
@@ -74,6 +75,9 @@ interface IBrokerOption {
|
||||
}
|
||||
|
||||
class Broker {
|
||||
@observable
|
||||
public loading: boolean = false;
|
||||
|
||||
@observable
|
||||
public brokerBaseInfo: IBrokerBaseInfo = {} as IBrokerBaseInfo;
|
||||
|
||||
@@ -119,6 +123,11 @@ class Broker {
|
||||
@observable
|
||||
public BrokerOptions: IValueLabel[] = [{ value: null, label: '请选择Broker' }];
|
||||
|
||||
@action.bound
|
||||
public setLoading(value: boolean) {
|
||||
this.loading = value;
|
||||
}
|
||||
|
||||
@action.bound
|
||||
public setBrokerBaseInfo(data: IBrokerBaseInfo) {
|
||||
data.startTime = moment(data.startTime).format('YYYY-MM-DD HH:mm:ss'),
|
||||
@@ -216,7 +225,8 @@ class Broker {
|
||||
}
|
||||
|
||||
public getBrokerList(clusterId: number) {
|
||||
getBrokerList(clusterId).then(this.setBrokerList);
|
||||
this.setLoading(true);
|
||||
getBrokerList(clusterId).then(this.setBrokerList).finally(() => this.setLoading(false));
|
||||
}
|
||||
|
||||
public getBrokerNetwork(clusterId: number) {
|
||||
@@ -224,7 +234,8 @@ class Broker {
|
||||
}
|
||||
|
||||
public getBrokerPartition(clusterId: number) {
|
||||
getBrokerPartition(clusterId).then(this.setBrokerPartition);
|
||||
this.setLoading(true);
|
||||
getBrokerPartition(clusterId).then(this.setBrokerPartition).finally(() => this.setLoading(false));
|
||||
}
|
||||
|
||||
public getOneBrokerNetwork(clusterId: number, brokerId: number) {
|
||||
|
||||
@@ -17,7 +17,7 @@ export class Users {
|
||||
@action.bound
|
||||
public setUserData(data: []) {
|
||||
this.userData = data.map((d: any) => {
|
||||
d.role = this.roleMap[d.role];
|
||||
d.roleName = this.roleMap[d.role];
|
||||
return d;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -5,13 +5,13 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<artifactId>kafka-manager-dao</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<artifactId>kafka-manager</artifactId>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
@@ -43,5 +43,9 @@
|
||||
<artifactId>mariadb-java-client</artifactId>
|
||||
<version>2.5.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -18,6 +18,9 @@ public class AccountDaoImpl implements AccountDao {
|
||||
@Autowired
|
||||
private SqlSessionTemplate sqlSession;
|
||||
|
||||
@Autowired
|
||||
private KafkaManagerProperties kafkaManagerProperties;
|
||||
|
||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||
this.sqlSession = sqlSession;
|
||||
}
|
||||
@@ -25,7 +28,7 @@ public class AccountDaoImpl implements AccountDao {
|
||||
@Override
|
||||
public int addNewAccount(AccountDO accountDO) {
|
||||
accountDO.setStatus(DBStatusEnum.NORMAL.getStatus());
|
||||
return sqlSession.insert("AccountDao.insert", accountDO);
|
||||
return updateAccount(accountDO);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -35,6 +38,9 @@ public class AccountDaoImpl implements AccountDao {
|
||||
|
||||
@Override
|
||||
public int updateAccount(AccountDO accountDO) {
|
||||
if (kafkaManagerProperties.hasPG()) {
|
||||
return sqlSession.insert("AccountDao.insertOnPG", accountDO);
|
||||
}
|
||||
return sqlSession.insert("AccountDao.insert", accountDO);
|
||||
}
|
||||
|
||||
|
||||
@@ -19,12 +19,18 @@ public class BrokerDaoImpl implements BrokerDao {
|
||||
@Autowired
|
||||
private SqlSessionTemplate sqlSession;
|
||||
|
||||
@Autowired
|
||||
private KafkaManagerProperties kafkaManagerProperties;
|
||||
|
||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||
this.sqlSession = sqlSession;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int replace(BrokerDO brokerInfoDO) {
|
||||
if (kafkaManagerProperties.hasPG()) {
|
||||
return sqlSession.insert("BrokerDao.replaceOnPG", brokerInfoDO);
|
||||
}
|
||||
return sqlSession.insert("BrokerDao.replace", brokerInfoDO);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.xiaojukeji.kafka.manager.dao.impl;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ConfigurationProperties("spring.datasource.kafka-manager")
|
||||
public class KafkaManagerProperties {
|
||||
private String jdbcUrl;
|
||||
|
||||
public String getJdbcUrl() {
|
||||
return jdbcUrl;
|
||||
}
|
||||
|
||||
public void setJdbcUrl(String jdbcUrl) {
|
||||
this.jdbcUrl = jdbcUrl;
|
||||
}
|
||||
|
||||
public boolean hasPG() {
|
||||
return jdbcUrl.startsWith("jdbc:postgres");
|
||||
}
|
||||
}
|
||||
@@ -17,12 +17,18 @@ public class RegionDaoImpl implements RegionDao {
|
||||
@Autowired
|
||||
private SqlSessionTemplate sqlSession;
|
||||
|
||||
@Autowired
|
||||
private KafkaManagerProperties kafkaManagerProperties;
|
||||
|
||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||
this.sqlSession = sqlSession;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int insert(RegionDO regionDO) {
|
||||
if (kafkaManagerProperties.hasPG()) {
|
||||
return sqlSession.insert("RegionDao.insertOnPG", regionDO);
|
||||
}
|
||||
return sqlSession.insert("RegionDao.insert", regionDO);
|
||||
}
|
||||
|
||||
|
||||
@@ -19,12 +19,18 @@ public class TopicDaoImpl implements TopicDao {
|
||||
@Autowired
|
||||
private SqlSessionTemplate sqlSession;
|
||||
|
||||
@Autowired
|
||||
private KafkaManagerProperties kafkaManagerProperties;
|
||||
|
||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||
this.sqlSession = sqlSession;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int replace(TopicDO topicDO) {
|
||||
if (kafkaManagerProperties.hasPG()) {
|
||||
return sqlSession.insert("TopicDao.replaceOnPG", topicDO);
|
||||
}
|
||||
return sqlSession.insert("TopicDao.replace", topicDO);
|
||||
}
|
||||
|
||||
|
||||
@@ -25,12 +25,18 @@ public class TopicFavoriteDaoImpl implements TopicFavoriteDao {
|
||||
@Autowired
|
||||
private TransactionTemplate transactionTemplate;
|
||||
|
||||
@Autowired
|
||||
private KafkaManagerProperties kafkaManagerProperties;
|
||||
|
||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||
this.sqlSession = sqlSession;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int batchAdd(List<TopicFavoriteDO> topicFavoriteDOList) {
|
||||
if (kafkaManagerProperties.hasPG()) {
|
||||
return sqlSession.insert("TopicFavoriteDao.batchAddOnPG", topicFavoriteDOList);
|
||||
}
|
||||
return sqlSession.insert("TopicFavoriteDao.batchAdd", topicFavoriteDOList);
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,18 @@
|
||||
]]>
|
||||
</insert>
|
||||
|
||||
<insert id="insertOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.AccountDO">
|
||||
<![CDATA[
|
||||
insert into account
|
||||
(username, password, role, status)
|
||||
values
|
||||
(#{username}, #{password}, #{role}, #{status})
|
||||
on conflict (username) do update set password = excluded.password,
|
||||
role = excluded.role,
|
||||
status = excluded.status
|
||||
]]>
|
||||
</insert>
|
||||
|
||||
<delete id="deleteByName" parameterType="java.lang.String">
|
||||
DELETE FROM account WHERE username = #{username}
|
||||
</delete>
|
||||
|
||||
@@ -20,6 +20,16 @@
|
||||
(#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
|
||||
</insert>
|
||||
|
||||
<insert id="replaceOnPG" parameterType="BrokerDO">
|
||||
insert into broker
|
||||
(cluster_id, broker_id, host, port, timestamp, status)
|
||||
values (#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
|
||||
on conflict (cluster_id, broker_id) do update set host = excluded.host,
|
||||
port = excluded.port,
|
||||
timestamp = excluded.timestamp,
|
||||
status = excluded.status
|
||||
</insert>
|
||||
|
||||
<delete id="deleteById" parameterType="java.util.Map">
|
||||
DELETE FROM broker WHERE cluster_id = #{clusterId} AND broker_id = #{brokerId}
|
||||
</delete>
|
||||
|
||||
@@ -11,6 +11,8 @@
|
||||
<result column="cluster_name" property="clusterName" />
|
||||
<result column="topic_name" property="topicName" />
|
||||
<result column="applicant" property="applicant" />
|
||||
<result column="partition_num" property="partitionNum" />
|
||||
<result column="broker_list" property="brokerList" />
|
||||
<result column="peak_bytes_in" property="peakBytesIn" />
|
||||
<result column="description" property="description" />
|
||||
<result column="order_status" property="orderStatus" />
|
||||
@@ -38,6 +40,16 @@
|
||||
cluster_name=#{clusterName},
|
||||
topic_name=#{topicName},
|
||||
applicant=#{applicant},
|
||||
<trim>
|
||||
<if test="partitionNum!=null">
|
||||
partition_num=#{partitionNum},
|
||||
</if>
|
||||
</trim>
|
||||
<trim>
|
||||
<if test="brokerList!=null">
|
||||
broker_list=#{brokerList},
|
||||
</if>
|
||||
</trim>
|
||||
peak_bytes_in=#{peakBytesIn},
|
||||
description=#{description},
|
||||
order_status=#{orderStatus},
|
||||
|
||||
@@ -21,6 +21,15 @@
|
||||
(#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
|
||||
</insert>
|
||||
|
||||
<insert id="insertOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.RegionDO">
|
||||
insert into region
|
||||
(region_name, cluster_id, broker_list, description, operator)
|
||||
values (#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
|
||||
on conflict (region_name, cluster_id) do update set broker_list = excluded.broker_list,
|
||||
description = excluded.description,
|
||||
operator = excluded.operator
|
||||
</insert>
|
||||
|
||||
<delete id="deleteById" parameterType="java.lang.Long">
|
||||
DELETE FROM region WHERE id = #{id}
|
||||
</delete>
|
||||
|
||||
@@ -19,6 +19,15 @@
|
||||
(#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
|
||||
</insert>
|
||||
|
||||
<insert id="replaceOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.TopicDO">
|
||||
insert into topic
|
||||
(cluster_id, topic_name, principals, description, status)
|
||||
values (#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
|
||||
on conflict (cluster_id, topic_name) do update set principals = excluded.principals,
|
||||
description = excluded.description,
|
||||
status = excluded.status
|
||||
</insert>
|
||||
|
||||
<delete id="deleteById" parameterType="java.lang.Long">
|
||||
DELETE FROM topic WHERE id = #{id}
|
||||
</delete>
|
||||
|
||||
@@ -21,6 +21,15 @@
|
||||
</foreach>
|
||||
</insert>
|
||||
|
||||
<insert id="batchAddOnPG" parameterType="java.util.List">
|
||||
insert into topic_favorite (cluster_id, topic_name, username)
|
||||
values
|
||||
<foreach item="TopicFavoriteDO" index="index" collection="list" separator=",">
|
||||
(#{TopicFavoriteDO.clusterId}, #{TopicFavoriteDO.topicName}, #{TopicFavoriteDO.username})
|
||||
</foreach>
|
||||
on conflict (cluster_id, topic_name, username) do update set gmt_modify = now();
|
||||
</insert>
|
||||
|
||||
<delete id="deleteById" parameterType="java.lang.Long">
|
||||
DELETE FROM topic_favorite WHERE id=#{id}
|
||||
</delete>
|
||||
|
||||
@@ -149,6 +149,8 @@ CREATE TABLE `order_partition` (
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
|
||||
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||
`broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表, 逗号分割',
|
||||
`partition_num` int(11) NOT NULL DEFAULT 0 COMMENT '新增分区数',
|
||||
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
|
||||
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量',
|
||||
`description` text COMMENT '备注信息',
|
||||
|
||||
325
doc/create_postgresql_table.sql
Normal file
325
doc/create_postgresql_table.sql
Normal file
@@ -0,0 +1,325 @@
|
||||
-- CREATE DATABASE kafka_manager;
|
||||
-- \c kafka_manager;
|
||||
SET TIME ZONE 'Asia/Chongqing';
|
||||
SET CLIENT_ENCODING TO 'UTF-8';
|
||||
|
||||
CREATE OR REPLACE FUNCTION on_update_timestamp() RETURNS TRIGGER AS
|
||||
$$
|
||||
BEGIN
|
||||
new.gmt_modify = current_timestamp;
|
||||
return new;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- 账号表
|
||||
CREATE TABLE account
|
||||
(
|
||||
id bigserial NOT NULL, -- 'ID',
|
||||
username varchar(64) NOT NULL UNIQUE DEFAULT '', -- '用户名',
|
||||
password varchar(128) NOT NULL DEFAULT '', -- '密码',
|
||||
role int NOT NULL DEFAULT 0, -- '角色类型, 0:普通用户',
|
||||
status int NOT NULL DEFAULT 0, -- '0标识使用中,-1标识已废弃',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||
CONSTRAINT account_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE UNIQUE INDEX account_uniq_username ON account (username);
|
||||
INSERT INTO account(username, password, role)
|
||||
VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2);
|
||||
CREATE TRIGGER account_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON account
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
-- 告警规则表
|
||||
CREATE TABLE alarm_rule
|
||||
(
|
||||
id bigserial, -- '自增ID',
|
||||
alarm_name varchar(128) NOT NULL DEFAULT '', -- '告警名字',
|
||||
strategy_expressions text, -- '表达式',
|
||||
strategy_filters text, -- '过滤条件',
|
||||
strategy_actions text, -- '响应',
|
||||
principals varchar(512) NOT NULL DEFAULT '', -- '负责人',
|
||||
status int2 NOT NULL DEFAULT 1, -- '-1:逻辑删除, 0:关闭, 1:正常',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||
CONSTRAINT alarm_rule_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE UNIQUE INDEX alarm_rule_uniq_alarm_name ON alarm_rule (alarm_name);
|
||||
CREATE TRIGGER alarm_rule_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON alarm_rule
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
-- Broker信息表
|
||||
CREATE TABLE broker
|
||||
(
|
||||
id bigserial, -- 'id',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID',
|
||||
host varchar(128) NOT NULL DEFAULT '', -- 'Broker主机名',
|
||||
port int NOT NULL DEFAULT '-1', -- 'Broker端口',
|
||||
timestamp bigint NOT NULL DEFAULT '-1', -- '启动时间',
|
||||
status int NOT NULL DEFAULT '0', -- '状态0有效,-1无效',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||
CONSTRAINT broker_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE UNIQUE INDEX broker_uniq_cluster_id_broker_id ON broker (cluster_id, broker_id);
|
||||
CREATE TRIGGER broker_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON broker
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
-- BrokerMetric信息表
|
||||
CREATE TABLE broker_metrics
|
||||
(
|
||||
id bigserial, -- '自增id',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID',
|
||||
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入',
|
||||
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出',
|
||||
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒被拒绝字节数',
|
||||
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数流入',
|
||||
fail_fetch_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费失败数',
|
||||
fail_produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒失败生产数',
|
||||
fetch_consumer_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费请求数',
|
||||
produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒生产数',
|
||||
request_handler_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '请求处理器繁忙百分比',
|
||||
network_processor_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '网络处理器繁忙百分比',
|
||||
request_queue_size bigint NOT NULL DEFAULT '0', -- '请求列表大小',
|
||||
response_queue_size bigint NOT NULL DEFAULT '0', -- '响应列表大小',
|
||||
log_flush_time decimal(53, 2) NOT NULL DEFAULT '0.00', -- '刷日志时间',
|
||||
total_time_produce_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-平均值',
|
||||
total_time_produce_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-99分位',
|
||||
total_time_fetch_consumer_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-平均值',
|
||||
total_time_fetch_consumer_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-99分位',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
CONSTRAINT broker_metrics_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE INDEX broker_metrics_idx_cluster_id_broker_id_gmt_create ON broker_metrics (cluster_id, broker_id, gmt_create);
|
||||
|
||||
-- Cluster表
|
||||
CREATE TABLE cluster
|
||||
(
|
||||
id bigserial, -- '集群ID',
|
||||
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
|
||||
zookeeper varchar(512) NOT NULL DEFAULT '', -- 'ZK地址',
|
||||
bootstrap_servers varchar(512) NOT NULL DEFAULT '', -- 'Server地址',
|
||||
kafka_version varchar(32) NOT NULL DEFAULT '', -- 'Kafka版本',
|
||||
alarm_flag int2 NOT NULL DEFAULT '0', -- '0:不开启告警, 1开启告警',
|
||||
security_protocol varchar(512) NOT NULL DEFAULT '', -- '安全协议',
|
||||
sasl_mechanism varchar(512) NOT NULL DEFAULT '', -- '安全机制',
|
||||
sasl_jaas_config varchar(512) NOT NULL DEFAULT '', -- 'Jaas配置',
|
||||
status int2 NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||
CONSTRAINT cluster_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE UNIQUE INDEX cluster_uniq_cluster_name ON cluster (cluster_name);
|
||||
CREATE TRIGGER cluster_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON cluster
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
-- ClusterMetrics信息
|
||||
CREATE TABLE cluster_metrics
|
||||
(
|
||||
id bigserial, -- '自增id',
|
||||
cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID',
|
||||
topic_num int NOT NULL DEFAULT '0', -- 'Topic数',
|
||||
partition_num int NOT NULL DEFAULT '0', -- '分区数',
|
||||
broker_num int NOT NULL DEFAULT '0', -- 'Broker数',
|
||||
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流入(B)',
|
||||
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流出(B)',
|
||||
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝(B)',
|
||||
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数(条)',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
CONSTRAINT cluster_metrics_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE INDEX cluster_metrics_idx_cluster_id_gmt_create ON cluster_metrics (cluster_id, gmt_create);
|
||||
|
||||
-- Controller历史变更记录表
|
||||
CREATE TABLE controller
|
||||
(
|
||||
id bigserial, -- '自增id',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerId',
|
||||
host varchar(256) NOT NULL DEFAULT '', -- '主机名',
|
||||
timestamp bigint NOT NULL DEFAULT '-1', -- 'Controller变更时间',
|
||||
version int NOT NULL DEFAULT '-1', -- 'Controller格式版本',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
CONSTRAINT controller_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE UNIQUE INDEX controller_uniq_cluster_id_broker_id_timestamp ON controller (cluster_id, broker_id, timestamp);
|
||||
|
||||
-- Topic迁移信息
|
||||
CREATE TABLE migration_task
|
||||
(
|
||||
id bigserial, -- '自增id',
|
||||
cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID',
|
||||
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||
reassignment_json text, -- '任务参数',
|
||||
real_throttle bigint NOT NULL DEFAULT '0', -- '实际限流值(B/s)',
|
||||
operator varchar(128) NOT NULL DEFAULT '', -- '操作人',
|
||||
description varchar(256) NOT NULL DEFAULT '', -- '备注说明',
|
||||
status int NOT NULL DEFAULT '0', -- '任务状态',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务修改时间',
|
||||
CONSTRAINT migration_task_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE TRIGGER migration_task_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON migration_task
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
CREATE TABLE operation_history
|
||||
(
|
||||
id bigserial, -- 'id',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||
operator varchar(128) NOT NULL DEFAULT '', -- '操作人',
|
||||
operation varchar(256) NOT NULL DEFAULT '', -- '操作描述',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
--='操作记录表';
|
||||
|
||||
-- 分区申请工单
|
||||
CREATE TABLE order_partition
|
||||
(
|
||||
id bigserial, -- 'id',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
|
||||
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||
applicant varchar(128) NOT NULL DEFAULT '', -- '申请人',
|
||||
partition_num int NOT NULL DEFAULT '0', -- '分区数',
|
||||
broker_list varchar(128) NOT NULL DEFAULT '', -- 'Broker列表',
|
||||
peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流量',
|
||||
description text, -- '备注信息',
|
||||
order_status int NOT NULL DEFAULT '0', -- '工单状态',
|
||||
approver varchar(128) NOT NULL DEFAULT '', -- '审批人',
|
||||
opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见',
|
||||
status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||
CONSTRAINT order_partition_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE TRIGGER order_partition_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON order_partition
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
-- Topic申请工单
|
||||
CREATE TABLE order_topic
|
||||
(
|
||||
id bigserial, -- 'ID',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
|
||||
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||
retention_time bigint NOT NULL DEFAULT '-1', -- '保留时间(ms)',
|
||||
partition_num int NOT NULL DEFAULT '-1', -- '分区数',
|
||||
replica_num int NOT NULL DEFAULT '-1', -- '副本数',
|
||||
regions varchar(128) NOT NULL DEFAULT '', -- 'RegionId列表',
|
||||
brokers varchar(128) NOT NULL DEFAULT '', -- 'Broker列表',
|
||||
peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流入流量(KB)',
|
||||
applicant varchar(128) NOT NULL DEFAULT '', -- '申请人',
|
||||
principals varchar(256) NOT NULL DEFAULT '', -- '负责人',
|
||||
description text, -- '备注信息',
|
||||
order_status int NOT NULL DEFAULT '0', -- '工单状态',
|
||||
approver varchar(128) NOT NULL DEFAULT '', -- '审批人',
|
||||
opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见',
|
||||
status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||
CONSTRAINT order_topic_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE TRIGGER order_topic_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON order_topic
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
-- Region信息表
|
||||
CREATE TABLE region
|
||||
(
|
||||
id bigserial, -- 'id',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
region_name varchar(128) NOT NULL DEFAULT '', -- 'Region名称',
|
||||
broker_list varchar(256) NOT NULL DEFAULT '', -- 'Broker列表',
|
||||
level int NOT NULL DEFAULT '0', -- 'Region重要等级, 0级普通, 1极重要,2级极重要',
|
||||
operator varchar(45) NOT NULL DEFAULT '', -- '操作人',
|
||||
description text, -- '备注说明',
|
||||
status int NOT NULL DEFAULT '0', -- '状态,0正常,-1废弃,1容量已满',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||
CONSTRAINT region_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE UNIQUE INDEX region_uniq_cluster_id_region_name ON region (cluster_id, region_name);
|
||||
CREATE TRIGGER region_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON region
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
-- Topic信息表
|
||||
CREATE TABLE topic
|
||||
(
|
||||
id bigserial, -- 'ID',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||
applicant varchar(256) NOT NULL DEFAULT '', -- '申请人',
|
||||
principals varchar(256) NOT NULL DEFAULT '', -- '负责人',
|
||||
description text, -- '备注信息',
|
||||
status int NOT NULL DEFAULT '0', -- '0标识使用中,-1标识已废弃',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||
CONSTRAINT topic_pk PRIMARY KEY (id)
|
||||
); --='';
|
||||
CREATE UNIQUE INDEX topic_uniq_cluster_id_topic_name ON topic (cluster_id, topic_name);
|
||||
CREATE TRIGGER topic_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON topic
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
-- 用户收藏的Topic表
|
||||
CREATE TABLE topic_favorite
|
||||
(
|
||||
id bigserial, -- '自增Id',
|
||||
username varchar(64) NOT NULL DEFAULT '', -- '用户名',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||
status int NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||
CONSTRAINT topic_favorite_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE UNIQUE INDEX topic_favorite_uniq_username_cluster_id_topic_name ON topic_favorite (username, cluster_id, topic_name);
|
||||
CREATE TRIGGER topic_favorite_trig_gmt_modify
|
||||
BEFORE UPDATE
|
||||
ON topic_favorite
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE on_update_timestamp();
|
||||
|
||||
-- TopicMetrics表
|
||||
CREATE TABLE topic_metrics
|
||||
(
|
||||
id bigserial, -- '自增id',
|
||||
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒进入消息条数',
|
||||
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入',
|
||||
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出',
|
||||
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝字节数',
|
||||
total_produce_requests decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒请求数',
|
||||
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||
CONSTRAINT topic_metrics_pk PRIMARY KEY (id)
|
||||
);
|
||||
CREATE INDEX topic_metrics_idx_cluster_id_topic_name_gmt_create ON topic_metrics (cluster_id, topic_name, gmt_create);
|
||||
32
docker-compose.yml
Normal file
32
docker-compose.yml
Normal file
@@ -0,0 +1,32 @@
|
||||
version: '2'
|
||||
services:
|
||||
mysqldbserver:
|
||||
container_name: mysqldbserver
|
||||
image: mysql:5.7
|
||||
build:
|
||||
context: .
|
||||
dockerfile: docker/mysql/Dockerfile
|
||||
ports:
|
||||
- "3306:3306"
|
||||
command: [
|
||||
'mysqld',
|
||||
'--character-set-server=utf8',
|
||||
'--collation-server=utf8_general_ci',
|
||||
'--default-time-zone=+8:00'
|
||||
]
|
||||
environment:
|
||||
MYSQL_ROOT_PASSWORD: 12345678
|
||||
MYSQL_DATABASE: kafka_manager
|
||||
MYSQL_USER: admin
|
||||
MYSQL_PASSWORD: 12345678
|
||||
restart: always
|
||||
kafka-manager-web:
|
||||
container_name: kafka-manager-web
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
ports:
|
||||
- "8080:8080"
|
||||
links:
|
||||
- mysqldbserver
|
||||
restart: always
|
||||
7
docker/kafka-manager/Dockerfile
Normal file
7
docker/kafka-manager/Dockerfile
Normal file
@@ -0,0 +1,7 @@
|
||||
FROM java:8
|
||||
MAINTAINER xuzhengxi
|
||||
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
|
||||
ADD ../../web/target/kafka-manager-web-1.1.0-SNAPSHOT.jar kafka-manager-web.jar
|
||||
ADD ./application.yml application.yml
|
||||
ENTRYPOINT ["java","-jar","/kafka-manager-web.jar","--spring.config.location=./application.yml"]
|
||||
EXPOSE 8080
|
||||
32
docker/kafka-manager/application-standalone.yml
Normal file
32
docker/kafka-manager/application-standalone.yml
Normal file
@@ -0,0 +1,32 @@
|
||||
server:
|
||||
port: 8080
|
||||
tomcat:
|
||||
accept-count: 100
|
||||
max-connections: 1000
|
||||
max-threads: 20
|
||||
min-spare-threads: 20
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: kafkamanager
|
||||
datasource:
|
||||
kafka-manager:
|
||||
jdbc-url: jdbc:mysql://mysqldbserver:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
|
||||
username: admin
|
||||
password: 12345678
|
||||
driver-class-name: org.mariadb.jdbc.Driver
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
profiles:
|
||||
active: dev
|
||||
|
||||
logging:
|
||||
config: classpath:logback-spring.xml
|
||||
|
||||
# kafka监控
|
||||
kafka-monitor:
|
||||
enabled: true
|
||||
notify-kafka:
|
||||
cluster-id: 95
|
||||
topic-name: kmo_monitor
|
||||
32
docker/kafka-manager/application.yml
Normal file
32
docker/kafka-manager/application.yml
Normal file
@@ -0,0 +1,32 @@
|
||||
server:
|
||||
port: 8080
|
||||
tomcat:
|
||||
accept-count: 100
|
||||
max-connections: 1000
|
||||
max-threads: 20
|
||||
min-spare-threads: 20
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: kafkamanager
|
||||
datasource:
|
||||
kafka-manager:
|
||||
jdbc-url: jdbc:mysql://mysqldbserver:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
|
||||
username: admin
|
||||
password: 12345678
|
||||
driver-class-name: org.mariadb.jdbc.Driver
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
profiles:
|
||||
active: dev
|
||||
|
||||
logging:
|
||||
config: classpath:logback-spring.xml
|
||||
|
||||
# kafka监控
|
||||
kafka-monitor:
|
||||
enabled: true
|
||||
notify-kafka:
|
||||
cluster-id: 95
|
||||
topic-name: kmo_monitor
|
||||
3
docker/mysql/Dockerfile
Normal file
3
docker/mysql/Dockerfile
Normal file
@@ -0,0 +1,3 @@
|
||||
FROM mysql:5.7
|
||||
MAINTAINER xuzhengxi
|
||||
COPY ./docker/mysql/create_mysql_table.sql /docker-entrypoint-initdb.d/
|
||||
243
docker/mysql/create_mysql_table.sql
Normal file
243
docker/mysql/create_mysql_table.sql
Normal file
@@ -0,0 +1,243 @@
|
||||
CREATE DATABASE IF NOT EXISTS `kafka_manager`;
|
||||
|
||||
USE `kafka_manager`;
|
||||
|
||||
CREATE TABLE `account` (
|
||||
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
|
||||
`username` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '用户名',
|
||||
`password` varchar(128) NOT NULL DEFAULT '' COMMENT '密码',
|
||||
`role` int(16) NOT NULL DEFAULT '0' COMMENT '角色类型, 0:普通用户',
|
||||
`status` int(16) NOT NULL DEFAULT '0' COMMENT '0标识使用中,-1标识已废弃',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_username` (`username`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='账号表';
|
||||
INSERT INTO account(username, password, role) VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2);
|
||||
|
||||
|
||||
CREATE TABLE `alarm_rule` (
|
||||
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
|
||||
`alarm_name` varchar(128) NOT NULL DEFAULT '' COMMENT '告警名字',
|
||||
`strategy_expressions` text COMMENT '表达式',
|
||||
`strategy_filters` text COMMENT '过滤条件',
|
||||
`strategy_actions` text COMMENT '响应',
|
||||
`principals` varchar(512) NOT NULL DEFAULT '' COMMENT '负责人',
|
||||
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '-1:逻辑删除, 0:关闭, 1:正常',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_alarm_name` (`alarm_name`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='告警规则表';
|
||||
|
||||
|
||||
CREATE TABLE `broker` (
|
||||
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerID',
|
||||
`host` varchar(128) NOT NULL DEFAULT '' COMMENT 'Broker主机名',
|
||||
`port` int(32) NOT NULL DEFAULT '-1' COMMENT 'Broker端口',
|
||||
`timestamp` bigint(128) NOT NULL DEFAULT '-1' COMMENT '启动时间',
|
||||
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态0有效,-1无效',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_cluster_id_broker_id` (`cluster_id`,`broker_id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Broker信息表';
|
||||
|
||||
|
||||
CREATE TABLE `broker_metrics` (
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerID',
|
||||
`bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流入',
|
||||
`bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流出',
|
||||
`bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒被拒绝字节数',
|
||||
`messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消息数流入',
|
||||
`fail_fetch_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消费失败数',
|
||||
`fail_produce_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒失败生产数',
|
||||
`fetch_consumer_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消费请求数',
|
||||
`produce_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒生产数',
|
||||
`request_handler_idl_percent` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '请求处理器繁忙百分比',
|
||||
`network_processor_idl_percent` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '网络处理器繁忙百分比',
|
||||
`request_queue_size` bigint(20) NOT NULL DEFAULT '0' COMMENT '请求列表大小',
|
||||
`response_queue_size` bigint(20) NOT NULL DEFAULT '0' COMMENT '响应列表大小',
|
||||
`log_flush_time` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '刷日志时间',
|
||||
`total_time_produce_mean` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'produce请求处理总时间-平均值',
|
||||
`total_time_produce_99th` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'produce请求处理总时间-99分位',
|
||||
`total_time_fetch_consumer_mean` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'fetch请求总时间-平均值',
|
||||
`total_time_fetch_consumer_99th` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'fetch请求总时间-99分位',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_cluster_id_broker_id_gmt_create` (`cluster_id`,`broker_id`,`gmt_create`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='BrokerMetric信息表';
|
||||
|
||||
|
||||
CREATE TABLE `cluster` (
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '集群ID',
|
||||
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
|
||||
`zookeeper` varchar(512) NOT NULL DEFAULT '' COMMENT 'ZK地址',
|
||||
`bootstrap_servers` varchar(512) NOT NULL DEFAULT '' COMMENT 'Server地址',
|
||||
`kafka_version` varchar(32) NOT NULL DEFAULT '' COMMENT 'Kafka版本',
|
||||
`alarm_flag` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启告警, 1开启告警',
|
||||
`security_protocol` varchar(512) NOT NULL DEFAULT '' COMMENT '安全协议',
|
||||
`sasl_mechanism` varchar(512) NOT NULL DEFAULT '' COMMENT '安全机制',
|
||||
`sasl_jaas_config` varchar(512) NOT NULL DEFAULT '' COMMENT 'Jaas配置',
|
||||
`status` int(4) NOT NULL DEFAULT '0' COMMENT '删除标记, 0表示未删除, -1表示删除',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_cluster_name` (`cluster_name`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Cluster表';
|
||||
|
||||
|
||||
CREATE TABLE `cluster_metrics` (
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '集群ID',
|
||||
`topic_num` int(11) NOT NULL DEFAULT '0' COMMENT 'Topic数',
|
||||
`partition_num` int(11) NOT NULL DEFAULT '0' COMMENT '分区数',
|
||||
`broker_num` int(11) NOT NULL DEFAULT '0' COMMENT 'Broker数',
|
||||
`bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒流入(B)',
|
||||
`bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒流出(B)',
|
||||
`bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒拒绝(B)',
|
||||
`messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消息数(条)',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_cluster_id_gmt_create` (`cluster_id`,`gmt_create`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='ClusterMetrics信息';
|
||||
|
||||
|
||||
CREATE TABLE `controller` (
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerId',
|
||||
`host` varchar(256) NOT NULL DEFAULT '' COMMENT '主机名',
|
||||
`timestamp` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Controller变更时间',
|
||||
`version` int(11) NOT NULL DEFAULT '-1' COMMENT 'Controller格式版本',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_cluster_id_broker_id_timestamp` (`cluster_id`,`broker_id`,`timestamp`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Controller历史变更记录表';
|
||||
|
||||
CREATE TABLE `migration_task` (
|
||||
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '集群ID',
|
||||
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||
`reassignment_json` text COMMENT '任务参数',
|
||||
`real_throttle` bigint(20) NOT NULL DEFAULT '0' COMMENT '实际限流值(B/s)',
|
||||
`operator` varchar(128) NOT NULL DEFAULT '' COMMENT '操作人',
|
||||
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '备注说明',
|
||||
`status` int(11) NOT NULL DEFAULT '0' COMMENT '任务状态',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '任务创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '任务修改时间',
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic迁移信息';
|
||||
|
||||
CREATE TABLE `operation_history` (
|
||||
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||
`operator` varchar(128) NOT NULL DEFAULT '' COMMENT '操作人',
|
||||
`operation` varchar(256) NOT NULL DEFAULT '' COMMENT '操作描述',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='操作记录表';
|
||||
|
||||
|
||||
CREATE TABLE `order_partition` (
|
||||
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
|
||||
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||
`broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表, 逗号分割',
|
||||
`partition_num` int(11) NOT NULL DEFAULT 0 COMMENT '新增分区数',
|
||||
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
|
||||
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量',
|
||||
`description` text COMMENT '备注信息',
|
||||
`order_status` int(16) NOT NULL DEFAULT '0' COMMENT '工单状态',
|
||||
`approver` varchar(128) NOT NULL DEFAULT '' COMMENT '审批人',
|
||||
`opinion` varchar(256) NOT NULL DEFAULT '' COMMENT '审批意见',
|
||||
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0标识有效,-1无效',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='分区申请工单';
|
||||
|
||||
CREATE TABLE `order_topic` (
|
||||
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
|
||||
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||
`retention_time` bigint(20) NOT NULL DEFAULT '-1' COMMENT '保留时间(ms)',
|
||||
`partition_num` int(16) NOT NULL DEFAULT '-1' COMMENT '分区数',
|
||||
`replica_num` int(16) NOT NULL DEFAULT '-1' COMMENT '副本数',
|
||||
`regions` varchar(128) NOT NULL DEFAULT '' COMMENT 'RegionId列表',
|
||||
`brokers` varchar(128) NOT NULL DEFAULT '' COMMENT 'Broker列表',
|
||||
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流入流量(KB)',
|
||||
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
|
||||
`principals` varchar(256) NOT NULL DEFAULT '' COMMENT '负责人',
|
||||
`description` text COMMENT '备注信息',
|
||||
`order_status` int(16) NOT NULL DEFAULT '0' COMMENT '工单状态',
|
||||
`approver` varchar(128) NOT NULL DEFAULT '' COMMENT '审批人',
|
||||
`opinion` varchar(256) NOT NULL DEFAULT '' COMMENT '审批意见',
|
||||
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态,0标识有效,-1无效',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic申请工单';
|
||||
|
||||
CREATE TABLE `region` (
|
||||
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`region_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Region名称',
|
||||
`broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表',
|
||||
`level` int(16) NOT NULL DEFAULT '0' COMMENT 'Region重要等级, 0级普通, 1极重要,2级极重要',
|
||||
`operator` varchar(45) NOT NULL DEFAULT '' COMMENT '操作人',
|
||||
`description` text COMMENT '备注说明',
|
||||
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0正常,-1废弃,1容量已满',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_cluster_id_region_name` (`cluster_id`,`region_name`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Region信息表';
|
||||
|
||||
CREATE TABLE `topic` (
|
||||
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`topic_name` varchar(192) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||
`applicant` varchar(256) NOT NULL DEFAULT '' COMMENT '申请人',
|
||||
`principals` varchar(256) NOT NULL DEFAULT '' COMMENT '负责人',
|
||||
`description` text COMMENT '备注信息',
|
||||
`status` int(16) NOT NULL DEFAULT '0' COMMENT '0标识使用中,-1标识已废弃',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_cluster_id_topic_name` (`cluster_id`,`topic_name`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic信息表';
|
||||
|
||||
|
||||
CREATE TABLE `topic_favorite` (
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增Id',
|
||||
`username` varchar(64) NOT NULL DEFAULT '' COMMENT '用户名',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||
`status` int(16) NOT NULL DEFAULT '0' COMMENT '删除标记, 0表示未删除, -1表示删除',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uniq_username_cluster_id_topic_name` (`username`,`cluster_id`,`topic_name`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户收藏的Topic表';
|
||||
|
||||
CREATE TABLE `topic_metrics` (
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||
`messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒进入消息条数',
|
||||
`bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流入',
|
||||
`bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流出',
|
||||
`bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒拒绝字节数',
|
||||
`total_produce_requests` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒请求数',
|
||||
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_cluster_id_topic_name_gmt_create` (`cluster_id`,`topic_name`,`gmt_create`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='TopicMetrics表';
|
||||
8
pom.xml
8
pom.xml
@@ -6,7 +6,7 @@
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<artifactId>kafka-manager</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
@@ -15,7 +15,7 @@
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
<kafka-manager.revision>1.0.0-SNAPSHOT</kafka-manager.revision>
|
||||
<kafka-manager.revision>1.1.0-SNAPSHOT</kafka-manager.revision>
|
||||
<jackson.version>2.9.0</jackson.version>
|
||||
|
||||
<!-- maven properties -->
|
||||
@@ -77,12 +77,12 @@
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.58</version>
|
||||
<version>1.2.68</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
<version>3.4.6</version>
|
||||
<version>3.4.14</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
||||
@@ -5,13 +5,13 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<artifactId>kafka-manager-service</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<artifactId>kafka-manager</artifactId>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
@@ -31,7 +31,7 @@
|
||||
<dependency>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<artifactId>kafka-manager-dao</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<!-- spring -->
|
||||
|
||||
@@ -2,12 +2,12 @@ package com.xiaojukeji.kafka.manager.service.collector;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -34,8 +34,8 @@ public class CollectConsumerMetricsTask extends BaseCollectTask {
|
||||
if (clusterDO == null) {
|
||||
return;
|
||||
}
|
||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap = new HashMap<>();
|
||||
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, topicNamePartitionStateListMap);
|
||||
Map<TopicPartition, Long> allPartitionOffsetMap = new HashMap<>();
|
||||
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, allPartitionOffsetMap);
|
||||
|
||||
List<ConsumerMetrics> consumerMetricsList = convert2ConsumerMetrics(consumerDTOList);
|
||||
KafkaMetricsCache.putConsumerMetricsToCache(clusterId, consumerMetricsList);
|
||||
@@ -47,23 +47,27 @@ public class CollectConsumerMetricsTask extends BaseCollectTask {
|
||||
private List<ConsumerMetrics> convert2ConsumerMetrics(List<ConsumerDTO> consumerDTOList) {
|
||||
List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();
|
||||
for (ConsumerDTO consumerDTO : consumerDTOList) {
|
||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap = consumerDTO.getTopicPartitionMap();
|
||||
for(Map.Entry<String, List<PartitionState>> entry : topicNamePartitionStateListMap.entrySet()){
|
||||
String topicName = entry.getKey();
|
||||
List<PartitionState> partitionStateList = entry.getValue();
|
||||
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
|
||||
consumerMetrics.setClusterId(clusterId);
|
||||
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
|
||||
consumerMetrics.setLocation(consumerDTO.getLocation());
|
||||
consumerMetrics.setTopicName(topicName);
|
||||
long sumLag = 0;
|
||||
for (PartitionState partitionState : partitionStateList) {
|
||||
Map.Entry<Long, Long> offsetEntry = new AbstractMap.SimpleEntry<>(partitionState.getOffset(), partitionState.getConsumeOffset());
|
||||
sumLag += (offsetEntry.getKey() - offsetEntry.getValue() > 0 ? offsetEntry.getKey() - offsetEntry.getValue(): 0);
|
||||
}
|
||||
consumerMetrics.setSumLag(sumLag);
|
||||
consumerMetricsList.add(consumerMetrics);
|
||||
if (consumerDTO.getPartitionOffsetMap() == null || consumerDTO.getConsumerOffsetMap() == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
|
||||
consumerMetrics.setClusterId(consumerDTO.getClusterId());
|
||||
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
|
||||
consumerMetrics.setLocation(consumerDTO.getLocation());
|
||||
consumerMetrics.setTopicName(consumerDTO.getTopicName());
|
||||
|
||||
long sumLag = 0;
|
||||
for(Map.Entry<Integer, Long> entry : consumerDTO.getPartitionOffsetMap().entrySet()){
|
||||
Long partitionOffset = entry.getValue();
|
||||
Long consumerOffset = consumerDTO.getConsumerOffsetMap().get(entry.getKey());
|
||||
if (partitionOffset == null || consumerOffset == null) {
|
||||
continue;
|
||||
}
|
||||
sumLag += Math.max(partitionOffset - consumerOffset, 0);
|
||||
}
|
||||
consumerMetrics.setSumLag(sumLag);
|
||||
consumerMetricsList.add(consumerMetrics);
|
||||
}
|
||||
return consumerMetricsList;
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerGroupDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.PartitionOffsetDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -57,7 +58,7 @@ public interface ConsumerService {
|
||||
* @return
|
||||
*/
|
||||
List<ConsumerDTO> getMonitoredConsumerList(ClusterDO clusterDO,
|
||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap);
|
||||
Map<TopicPartition, Long> partitionOffsetMap);
|
||||
|
||||
/**
|
||||
* 重置offset
|
||||
|
||||
@@ -45,7 +45,7 @@ public interface OrderService {
|
||||
* @date 19/6/23
|
||||
* @return Result
|
||||
*/
|
||||
Result modifyOrderPartition(OrderPartitionDO orderPartitionDO, String operator);
|
||||
Result modifyOrderPartition(OrderPartitionDO orderPartitionDO, String operator, boolean admin);
|
||||
|
||||
/**
|
||||
* 查询Topic工单
|
||||
|
||||
@@ -76,6 +76,7 @@ public class AdminTopicServiceImpl implements AdminTopicService {
|
||||
OperationHistoryDO operationHistoryDO = OperationHistoryDO.newInstance(topicDO.getClusterId(), topicDO.getTopicName(), operator, OperationEnum.CREATE_TOPIC.message);
|
||||
operationHistoryDao.insert(operationHistoryDO);
|
||||
topicDao.replace(topicDO);
|
||||
|
||||
} catch (Exception e) {
|
||||
return AdminTopicStatusEnum.REPLACE_DB_FAILED;
|
||||
}
|
||||
@@ -188,4 +189,5 @@ public class AdminTopicServiceImpl implements AdminTopicService {
|
||||
}
|
||||
return AdminTopicStatusEnum.SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.dao.ClusterDao;
|
||||
import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao;
|
||||
import com.xiaojukeji.kafka.manager.dao.ControllerDao;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.schedule.ScheduleCollectDataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.slf4j.Logger;
|
||||
@@ -37,6 +38,9 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
@Autowired
|
||||
private ClusterMetadataManager clusterMetadataManager;
|
||||
|
||||
@Autowired
|
||||
private ScheduleCollectDataManager scheduleCollectDataManager;
|
||||
|
||||
@Autowired
|
||||
private ControllerDao controllerDao;
|
||||
|
||||
@@ -57,6 +61,7 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
if (!status) {
|
||||
return new Result(StatusCode.OPERATION_ERROR, "add zookeeper watch failed");
|
||||
}
|
||||
scheduleCollectDataManager.start(clusterDO);
|
||||
|
||||
if (clusterDO.getAlarmFlag() == null || clusterDO.getAlarmFlag() <= 0) {
|
||||
return new Result();
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.constant.OffsetStoreLocation;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.StatusCode;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
|
||||
@@ -18,7 +17,6 @@ import com.xiaojukeji.kafka.manager.service.cache.ConsumerMetadataCache;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientCache;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.TopicService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ZookeeperService;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.zk.ZkPathUtil;
|
||||
import kafka.admin.AdminClient;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
@@ -49,9 +47,6 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperService zkService;
|
||||
|
||||
private final ExecutorService consumerListThreadPool = Executors.newFixedThreadPool(50, new DefaultThreadFactory("ConsumerPool"));
|
||||
|
||||
@Override
|
||||
@@ -135,20 +130,20 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
|
||||
@Override
|
||||
public List<ConsumerDTO> getMonitoredConsumerList(final ClusterDO clusterDO,
|
||||
final Map<String, List<PartitionState>> partitionStateListMap) {
|
||||
final Map<TopicPartition, Long> allPartitionOffsetMap) {
|
||||
List<ConsumerGroupDTO> consumerGroupDTOList = getConsumerGroupList(clusterDO.getId());
|
||||
if (consumerGroupDTOList == null || consumerGroupDTOList.isEmpty()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
FutureTask<ConsumerDTO>[] taskList = new FutureTask[consumerGroupDTOList.size()];
|
||||
FutureTask<List<ConsumerDTO>>[] taskList = new FutureTask[consumerGroupDTOList.size()];
|
||||
for (int i = 0; i < consumerGroupDTOList.size(); i++) {
|
||||
final ConsumerGroupDTO consumerGroupDTO = consumerGroupDTOList.get(i);
|
||||
taskList[i] = new FutureTask<>(new Callable<ConsumerDTO>() {
|
||||
taskList[i] = new FutureTask<>(new Callable<List<ConsumerDTO>>() {
|
||||
@Override
|
||||
public ConsumerDTO call() throws Exception {
|
||||
public List<ConsumerDTO> call() throws Exception {
|
||||
try {
|
||||
return getMonitoredConsumer(clusterDO, consumerGroupDTO, partitionStateListMap);
|
||||
return getMonitoredConsumer(clusterDO, consumerGroupDTO, allPartitionOffsetMap);
|
||||
} catch (Exception e) {
|
||||
logger.error("get monitored consumer error, group:{}", consumerGroupDTO.getConsumerGroup(), e);
|
||||
}
|
||||
@@ -159,31 +154,70 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
}
|
||||
|
||||
List<ConsumerDTO> consumerList = new ArrayList<>();
|
||||
for (FutureTask<ConsumerDTO> task : taskList) {
|
||||
ConsumerDTO consumer = null;
|
||||
for (FutureTask<List<ConsumerDTO>> task : taskList) {
|
||||
List<ConsumerDTO> dtoList = null;
|
||||
try {
|
||||
consumer = task.get();
|
||||
dtoList = task.get();
|
||||
} catch (Exception e) {
|
||||
logger.error("getMonitoredConsumerList@ConsumeServiceImpl, ", e);
|
||||
}
|
||||
if (consumer == null) {
|
||||
if (dtoList == null) {
|
||||
continue;
|
||||
}
|
||||
consumerList.add(consumer);
|
||||
consumerList.addAll(dtoList);
|
||||
}
|
||||
return consumerList;
|
||||
}
|
||||
|
||||
private ConsumerDTO getMonitoredConsumer(ClusterDO cluster, ConsumerGroupDTO consumerGroupDTO, Map<String, List<PartitionState>> globalTopicNamePartitionStateListMap) {
|
||||
// 获取当前consumerGroup下的所有的topic的partitionState信息
|
||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap = getConsumerGroupPartitionStateList(cluster, consumerGroupDTO, globalTopicNamePartitionStateListMap);
|
||||
private List<ConsumerDTO> getMonitoredConsumer(ClusterDO clusterDO,
|
||||
ConsumerGroupDTO consumerGroupDTO,
|
||||
Map<TopicPartition, Long> allPartitionOffsetMap) {
|
||||
List<ConsumerDTO> dtoList = new ArrayList<>();
|
||||
|
||||
//将没有对应consumer的partition信息统一放到一个consumer中
|
||||
ConsumerDTO consumerDTO = new ConsumerDTO();
|
||||
consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
|
||||
consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().name());
|
||||
consumerDTO.setTopicPartitionMap(topicNamePartitionStateListMap);
|
||||
return consumerDTO;
|
||||
List<String> topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(
|
||||
clusterDO.getId(),
|
||||
consumerGroupDTO.getOffsetStoreLocation().getLocation(),
|
||||
consumerGroupDTO.getConsumerGroup()
|
||||
);
|
||||
for (String topicName : topicNameList) {
|
||||
TopicMetadata metadata = ClusterMetadataManager.getTopicMetaData(clusterDO.getId(), topicName);
|
||||
if (metadata == null || metadata.getPartitionNum() <= 0) {
|
||||
continue;
|
||||
}
|
||||
if (!allPartitionOffsetMap.containsKey(new TopicPartition(topicName, 0))) {
|
||||
Map<TopicPartition, Long> offsetMap = topicService.getTopicPartitionOffset(clusterDO, topicName);
|
||||
if (offsetMap == null) {
|
||||
offsetMap = new HashMap<>();
|
||||
}
|
||||
allPartitionOffsetMap.putAll(offsetMap);
|
||||
}
|
||||
|
||||
Map<Integer, Long> consumerOffsetMap = null;
|
||||
if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) {
|
||||
consumerOffsetMap = getTopicConsumerOffsetInZK(clusterDO, metadata, consumerGroupDTO);
|
||||
} else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) {
|
||||
consumerOffsetMap = getTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO);
|
||||
}
|
||||
|
||||
Map<Integer, Long> partitionOffsetMap = new HashMap<>();
|
||||
for (int partitionId = 0; partitionId < metadata.getPartitionNum(); ++partitionId) {
|
||||
Long offset = allPartitionOffsetMap.get(new TopicPartition(topicName, partitionId));
|
||||
if (offset == null) {
|
||||
continue;
|
||||
}
|
||||
partitionOffsetMap.put(partitionId, offset);
|
||||
}
|
||||
|
||||
ConsumerDTO consumerDTO = new ConsumerDTO();
|
||||
consumerDTO.setClusterId(clusterDO.getId());
|
||||
consumerDTO.setTopicName(topicName);
|
||||
consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
|
||||
consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().getLocation());
|
||||
consumerDTO.setPartitionOffsetMap(partitionOffsetMap);
|
||||
consumerDTO.setConsumerOffsetMap(consumerOffsetMap);
|
||||
dtoList.add(consumerDTO);
|
||||
}
|
||||
return dtoList;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -264,52 +298,15 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
kafkaConsumer.commitSync();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取属于该集群和consumerGroup下的所有topic的信息
|
||||
*/
|
||||
private Map<String, List<PartitionState>> getConsumerGroupPartitionStateList(ClusterDO clusterDO,
|
||||
ConsumerGroupDTO consumerGroupDTO,
|
||||
Map<String, List<PartitionState>> globalTopicNamePartitionStateListMap) {
|
||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap = new HashMap<>(2);
|
||||
private Map<Integer, Long> getTopicConsumerOffsetInZK(ClusterDO clusterDO,
|
||||
TopicMetadata topicMetadata,
|
||||
ConsumerGroupDTO consumerGroupDTO) {
|
||||
Map<Integer, Long> offsetMap = new HashMap<>();
|
||||
|
||||
List<String> topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(clusterDO.getId(),consumerGroupDTO.getOffsetStoreLocation().getLocation(), consumerGroupDTO.getConsumerGroup());
|
||||
for (String topicName : topicNameList) {
|
||||
if (!ClusterMetadataManager.isTopicExist(clusterDO.getId(), topicName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<PartitionState> partitionStateList = globalTopicNamePartitionStateListMap.get(topicName);
|
||||
if (partitionStateList == null) {
|
||||
try {
|
||||
partitionStateList = zkService.getTopicPartitionState(clusterDO.getId(), topicName);
|
||||
} catch (Exception e) {
|
||||
logger.error("get topic partition state failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
|
||||
}
|
||||
if (partitionStateList == null) {
|
||||
continue;
|
||||
}
|
||||
globalTopicNamePartitionStateListMap.put(topicName, partitionStateList);
|
||||
}
|
||||
List<PartitionState> consumerGroupPartitionStateList = new ArrayList<>();
|
||||
for (PartitionState partitionState: partitionStateList) {
|
||||
consumerGroupPartitionStateList.add((PartitionState) partitionState.clone());
|
||||
}
|
||||
|
||||
if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) {
|
||||
updateTopicConsumerOffsetInZK(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList);
|
||||
} else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) {
|
||||
updateTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList);
|
||||
}
|
||||
topicNamePartitionStateListMap.put(topicName, consumerGroupPartitionStateList);
|
||||
}
|
||||
return topicNamePartitionStateListMap;
|
||||
}
|
||||
|
||||
private void updateTopicConsumerOffsetInZK(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List<PartitionState> partitionStateList) {
|
||||
ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(cluster.getId());
|
||||
for (PartitionState partitionState : partitionStateList) {
|
||||
ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(clusterDO.getId());
|
||||
for (int partitionId = 0; partitionId < topicMetadata.getPartitionNum(); ++partitionId) {
|
||||
//offset存储于zk中
|
||||
String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId());
|
||||
String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicMetadata.getTopic(), partitionId);
|
||||
String offset = null;
|
||||
try {
|
||||
Stat stat = zkConfig.getNodeStat(consumerGroupOffsetLocation);
|
||||
@@ -317,39 +314,32 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
continue;
|
||||
}
|
||||
offset = zkConfig.get(consumerGroupOffsetLocation);
|
||||
offsetMap.put(partitionId, Long.valueOf(offset));
|
||||
} catch (ConfigException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
String consumerId = null;
|
||||
try {
|
||||
consumerId = zkConfig.get(ZkPathUtil.getConsumerGroupOwnersTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId()));
|
||||
} catch (ConfigException e) {
|
||||
// logger.error("get consumerId error in updateTopicConsumerOffsetInZK cluster:{} topic:{} consumerGroup:{}", cluster.getClusterName(), topicName, consumerGroupDTO.getConsumerGroup());
|
||||
}
|
||||
partitionState.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
|
||||
updatePartitionStateOffset(partitionState, offset, consumerId);
|
||||
}
|
||||
return offsetMap;
|
||||
}
|
||||
|
||||
private void updateTopicConsumerOffsetInBroker(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List<PartitionState> partitionStateList) {
|
||||
Map<Integer, String> offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(cluster, consumerGroupDTO.getConsumerGroup(), topicName);
|
||||
private Map<Integer, Long> getTopicConsumerOffsetInBroker(ClusterDO clusterDO,
|
||||
String topicName,
|
||||
ConsumerGroupDTO consumerGroupDTO) {
|
||||
Map<Integer, String> offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(clusterDO, consumerGroupDTO.getConsumerGroup(), topicName);
|
||||
if (offsetsFromBroker == null || offsetsFromBroker.isEmpty()) {
|
||||
return;
|
||||
return new HashMap<>(0);
|
||||
}
|
||||
|
||||
for (PartitionState partitionState : partitionStateList) {
|
||||
int partitionId = partitionState.getPartitionId();
|
||||
updatePartitionStateOffset(partitionState, offsetsFromBroker.get(partitionId), null);
|
||||
Map<Integer, Long> offsetMap = new HashMap<>(offsetsFromBroker.size());
|
||||
for (Map.Entry<Integer, String> entry: offsetsFromBroker.entrySet()) {
|
||||
try {
|
||||
offsetMap.put(entry.getKey(), Long.valueOf(entry.getValue()));
|
||||
} catch (Exception e) {
|
||||
logger.error("get topic consumer offset failed, clusterId:{} topicName:{} consumerGroup:{}."
|
||||
, clusterDO.getId(), topicName, consumerGroupDTO.getConsumerGroup());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updatePartitionStateOffset(PartitionState partitionState, String offset, String consumerId) {
|
||||
partitionState.setConsumeOffset(0);
|
||||
if (!StringUtils.isEmpty(offset)) {
|
||||
partitionState.setConsumeOffset(Long.parseLong(offset));
|
||||
}
|
||||
partitionState.setConsumerGroup(consumerId);
|
||||
return offsetMap;
|
||||
}
|
||||
|
||||
private Map<Integer, String> getConsumeIdMap(Long clusterId, String topicName, String consumerGroup) {
|
||||
|
||||
@@ -72,6 +72,9 @@ public class JmxServiceImpl implements JmxService {
|
||||
List<Attribute> attributeValueList = null;
|
||||
try {
|
||||
attributeValueList = connection.getAttributes(new ObjectName(mbean.getObjectName()), properties).asList();
|
||||
} catch (InstanceNotFoundException e) {
|
||||
logger.warn("getSpecifiedBrokerMetricsFromJmx@JmxServiceImpl, get metrics fail, objectName:{}.", mbean.getObjectName(), e);
|
||||
continue;
|
||||
} catch (Exception e) {
|
||||
logger.error("getSpecifiedBrokerMetricsFromJmx@JmxServiceImpl, get metrics fail, objectName:{}.", mbean.getObjectName(), e);
|
||||
continue;
|
||||
|
||||
@@ -51,7 +51,7 @@ public class OrderServiceImpl implements OrderService {
|
||||
if (orderPartitionDO != null) {
|
||||
orderPartitionDO.setOrderStatus(OrderStatusEnum.CANCELLED.getCode());
|
||||
}
|
||||
return modifyOrderPartition(orderPartitionDO, operator);
|
||||
return modifyOrderPartition(orderPartitionDO, operator, false);
|
||||
}
|
||||
return new Result(StatusCode.PARAM_ERROR, "order type illegal");
|
||||
}
|
||||
@@ -74,10 +74,10 @@ public class OrderServiceImpl implements OrderService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result modifyOrderPartition(OrderPartitionDO newOrderPartitionDO, String operator) {
|
||||
public Result modifyOrderPartition(OrderPartitionDO newOrderPartitionDO, String operator, boolean admin) {
|
||||
if (newOrderPartitionDO == null) {
|
||||
return new Result(StatusCode.PARAM_ERROR, "param illegal, order not exist");
|
||||
} else if (!newOrderPartitionDO.getApplicant().equals(operator)) {
|
||||
} else if (!admin && !newOrderPartitionDO.getApplicant().equals(operator)) {
|
||||
return new Result(StatusCode.PARAM_ERROR, "without authority to cancel the order");
|
||||
}
|
||||
OrderPartitionDO oldOrderPartitionDO = orderPartitionDao.getById(newOrderPartitionDO.getId());
|
||||
|
||||
@@ -345,7 +345,7 @@ public class TopicServiceImpl implements TopicService {
|
||||
} else {
|
||||
topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true);
|
||||
topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec());
|
||||
topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec());
|
||||
topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getBytesOutPerSec());
|
||||
}
|
||||
return topicOverviewDTO;
|
||||
}
|
||||
|
||||
16
web/bin/shutdown.sh
Normal file
16
web/bin/shutdown.sh
Normal file
@@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
cd `dirname $0`/../lib
|
||||
lib_dir=`pwd`
|
||||
|
||||
pid=`ps ax | grep -i 'kafka-manager-web' | grep ${lib_dir} | grep java | grep -v grep | awk '{print $1}'`
|
||||
if [ -z "$pid" ] ; then
|
||||
echo "No kafka-manager-web running."
|
||||
exit -1;
|
||||
fi
|
||||
|
||||
echo "The kafka-manager-web(${pid}) is running..."
|
||||
|
||||
kill ${pid}
|
||||
|
||||
echo "Send shutdown request to kafka-manager-web(${pid}) OK"
|
||||
46
web/bin/startup.sh
Normal file
46
web/bin/startup.sh
Normal file
@@ -0,0 +1,46 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
error_exit ()
|
||||
{
|
||||
echo "ERROR: $1 !!"
|
||||
exit 1
|
||||
}
|
||||
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)! jdk8 or later is better!"
|
||||
|
||||
fi
|
||||
|
||||
export WEB_SERVER="kafka-manager-web-*"
|
||||
export JAVA_HOME
|
||||
export JAVA="$JAVA_HOME/bin/java"
|
||||
export BASE_DIR=`cd $(dirname $0)/..; pwd`
|
||||
export DEFAULT_SEARCH_LOCATIONS="classpath:/,classpath:/config/,file:./,file:./config/"
|
||||
export CUSTOM_SEARCH_LOCATIONS=${DEFAULT_SEARCH_LOCATIONS},file:${BASE_DIR}/conf/
|
||||
|
||||
#===========================================================================================
|
||||
# JVM Configuration
|
||||
#===========================================================================================
|
||||
|
||||
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
|
||||
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${BASE_DIR}/logs/java_heapdump.hprof"
|
||||
JAVA_OPT="${JAVA_OPT} -Xloggc:${BASE_DIR}/logs/gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
|
||||
JAVA_OPT="${JAVA_OPT} -jar ${BASE_DIR}/lib/${WEB_SERVER}.jar"
|
||||
JAVA_OPT="${JAVA_OPT} --spring.config.location=${CUSTOM_SEARCH_LOCATIONS}"
|
||||
JAVA_OPT="${JAVA_OPT} --logging.config=${BASE_DIR}/conf/logback-spring.xml"
|
||||
JAVA_OPT="${JAVA_OPT} --server.max-http-header-size=524288"
|
||||
|
||||
if [ ! -d "${BASE_DIR}/logs" ]; then
|
||||
mkdir ${BASE_DIR}/logs
|
||||
fi
|
||||
|
||||
echo "$JAVA ${JAVA_OPT}"
|
||||
|
||||
# check the start.out log output file
|
||||
if [ ! -f "${BASE_DIR}/logs/start.out" ]; then
|
||||
touch "${BASE_DIR}/logs/start.out"
|
||||
fi
|
||||
# start
|
||||
echo "$JAVA ${JAVA_OPT}" > ${BASE_DIR}/logs/start.out 2>&1 &
|
||||
nohup $JAVA ${JAVA_OPT} >> ${BASE_DIR}/logs/start.out 2>&1 &
|
||||
echo "kafka-manager is starting,you can check the ${BASE_DIR}/logs/start.out"
|
||||
32
web/conf/application.yml
Normal file
32
web/conf/application.yml
Normal file
@@ -0,0 +1,32 @@
|
||||
server:
|
||||
port: 8080
|
||||
tomcat:
|
||||
accept-count: 100
|
||||
max-connections: 1000
|
||||
max-threads: 20
|
||||
min-spare-threads: 20
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: kafkamanager
|
||||
datasource:
|
||||
kafka-manager:
|
||||
jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
|
||||
username: admin
|
||||
password: admin
|
||||
driver-class-name: org.mariadb.jdbc.Driver
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
profiles:
|
||||
active: dev
|
||||
|
||||
logging:
|
||||
config: classpath:logback-spring.xml
|
||||
|
||||
# kafka监控
|
||||
kafka-monitor:
|
||||
enabled: true
|
||||
notify-kafka:
|
||||
cluster-id: 95
|
||||
topic-name: kmo_monitor
|
||||
24
web/pom.xml
24
web/pom.xml
@@ -4,13 +4,13 @@
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>kafka-manager-web</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<artifactId>kafka-manager</artifactId>
|
||||
<groupId>com.xiaojukeji.kafka</groupId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
@@ -110,6 +110,26 @@
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>make-assembly</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<finalName>kafka-manager-${project.version}</finalName>
|
||||
<descriptors>
|
||||
<descriptor>./src/main/resources/assembly.xml</descriptor>
|
||||
</descriptors>
|
||||
<tarLongFileMode>posix</tarLongFileMode>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
||||
@@ -176,7 +176,7 @@ public class OrderController {
|
||||
TopicDO topicInfoDO = OrderConverter.convert2TopicInfoDO(orderTopicDO);
|
||||
List<Integer> brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList());
|
||||
Properties topicConfig = new Properties();
|
||||
topicConfig.setProperty("retention.ms", String.valueOf(reqObj.getRetentionTime()));
|
||||
topicConfig.setProperty("retention.ms", String.valueOf(reqObj.getRetentionTime() * 60 * 60 * 1000));
|
||||
try {
|
||||
TopicMetadata topicMetadata = new TopicMetadata();
|
||||
topicMetadata.setTopic(orderTopicDO.getTopicName());
|
||||
@@ -325,14 +325,16 @@ public class OrderController {
|
||||
orderPartitionDO.setApprover(username);
|
||||
orderPartitionDO.setOpinion(reqObj.getApprovalOpinions());
|
||||
orderPartitionDO.setOrderStatus(reqObj.getOrderStatus());
|
||||
result = orderService.modifyOrderPartition(orderPartitionDO, username);
|
||||
result = orderService.modifyOrderPartition(orderPartitionDO, username, true);
|
||||
if (!StatusCode.SUCCESS.equals(result.getCode())) {
|
||||
return new Result(StatusCode.OPERATION_ERROR, "create topic success, but update order status failed, err:" + result.getMessage());
|
||||
return new Result(StatusCode.OPERATION_ERROR, "expand topic success, but update order status failed, err:" + result.getMessage());
|
||||
}
|
||||
return new Result();
|
||||
}
|
||||
|
||||
private Result expandTopic(ClusterDO clusterDO, OrderPartitionExecModel reqObj, OrderPartitionDO orderPartitionDO) {
|
||||
private Result expandTopic(ClusterDO clusterDO,
|
||||
OrderPartitionExecModel reqObj,
|
||||
OrderPartitionDO orderPartitionDO) {
|
||||
List<Integer> brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList());
|
||||
try {
|
||||
TopicMetadata topicMetadata = new TopicMetadata();
|
||||
@@ -343,6 +345,8 @@ public class OrderController {
|
||||
if (!AdminTopicStatusEnum.SUCCESS.equals(adminTopicStatusEnum)) {
|
||||
return new Result(StatusCode.OPERATION_ERROR, adminTopicStatusEnum.getMessage());
|
||||
}
|
||||
orderPartitionDO.setPartitionNum(reqObj.getPartitionNum());
|
||||
orderPartitionDO.setBrokerList(ListUtils.intList2String(brokerIdList));
|
||||
} catch (Exception e) {
|
||||
logger.error("expandTopic@OrderController, create failed, req:{}.", reqObj);
|
||||
return new Result(StatusCode.OPERATION_ERROR, Constant.KAFKA_MANAGER_INNER_ERROR);
|
||||
|
||||
@@ -78,6 +78,7 @@ public class BrokerModelConverter {
|
||||
Double bytesInPerSec = brokerOverallDTO.getBytesInPerSec() / 1024.0 / 1024.0;
|
||||
brokerOverviewVO.setBytesInPerSec(Math.round(bytesInPerSec * 100) / 100.0);
|
||||
}
|
||||
brokerOverviewVO.setUnderReplicatedPartitionCount(brokerOverallDTO.getUnderReplicatedPartitions());
|
||||
brokerOverviewVO.setLeaderCount(brokerOverallDTO.getLeaderCount());
|
||||
if (brokerOverallDTO.getPartitionCount() != null && brokerOverallDTO.getUnderReplicatedPartitions() != null) {
|
||||
brokerOverviewVO.setNotUnderReplicatedPartitionCount(brokerOverallDTO.getPartitionCount() - brokerOverallDTO.getUnderReplicatedPartitions());
|
||||
|
||||
@@ -86,7 +86,8 @@ public class OrderConverter {
|
||||
|
||||
public static OrderPartitionVO convert2OrderPartitionVO(OrderPartitionDO orderPartitionDO,
|
||||
TopicMetadata topicMetadata,
|
||||
Long maxAvgBytes, List<RegionDO> regionDOList) {
|
||||
Long maxAvgBytes,
|
||||
List<RegionDO> regionDOList) {
|
||||
if (orderPartitionDO == null) {
|
||||
return null;
|
||||
}
|
||||
@@ -100,8 +101,12 @@ public class OrderConverter {
|
||||
if (topicMetadata == null) {
|
||||
return orderPartitionVO;
|
||||
}
|
||||
orderPartitionVO.setPartitionNum(topicMetadata.getPartitionNum());
|
||||
|
||||
orderPartitionVO.setPartitionNum(null);
|
||||
orderPartitionVO.setBrokerIdList(new ArrayList<>(topicMetadata.getBrokerIdSet()));
|
||||
if (OrderStatusEnum.PASSED.getCode().equals(orderPartitionDO.getOrderStatus())) {
|
||||
orderPartitionVO.setPartitionNum(orderPartitionDO.getPartitionNum());
|
||||
}
|
||||
|
||||
if (regionDOList == null || regionDOList.isEmpty()) {
|
||||
orderPartitionVO.setRegionNameList(new ArrayList<>());
|
||||
|
||||
@@ -30,9 +30,13 @@ public class BrokerOverallVO {
|
||||
@ApiModelProperty(value = "分区数")
|
||||
private Integer partitionCount;
|
||||
|
||||
@ApiModelProperty(value = "未同步分区数")
|
||||
@Deprecated
|
||||
@ApiModelProperty(value = "同步分区数")
|
||||
private Integer notUnderReplicatedPartitionCount;
|
||||
|
||||
@ApiModelProperty(value = "未同步分区数")
|
||||
private Integer underReplicatedPartitionCount;
|
||||
|
||||
@ApiModelProperty(value = "leader数")
|
||||
private Integer leaderCount;
|
||||
|
||||
@@ -103,6 +107,14 @@ public class BrokerOverallVO {
|
||||
this.notUnderReplicatedPartitionCount = notUnderReplicatedPartitionCount;
|
||||
}
|
||||
|
||||
public Integer getUnderReplicatedPartitionCount() {
|
||||
return underReplicatedPartitionCount;
|
||||
}
|
||||
|
||||
public void setUnderReplicatedPartitionCount(Integer underReplicatedPartitionCount) {
|
||||
this.underReplicatedPartitionCount = underReplicatedPartitionCount;
|
||||
}
|
||||
|
||||
public Integer getLeaderCount() {
|
||||
return leaderCount;
|
||||
}
|
||||
@@ -130,6 +142,7 @@ public class BrokerOverallVO {
|
||||
", bytesInPerSec=" + bytesInPerSec +
|
||||
", partitionCount=" + partitionCount +
|
||||
", notUnderReplicatedPartitionCount=" + notUnderReplicatedPartitionCount +
|
||||
", underReplicatedPartitionCount=" + underReplicatedPartitionCount +
|
||||
", leaderCount=" + leaderCount +
|
||||
", regionName='" + regionName + '\'' +
|
||||
'}';
|
||||
|
||||
33
web/src/main/resources/application-pg.yml
Normal file
33
web/src/main/resources/application-pg.yml
Normal file
@@ -0,0 +1,33 @@
|
||||
server:
|
||||
port: 8080
|
||||
tomcat:
|
||||
accept-count: 100
|
||||
max-connections: 1000
|
||||
max-threads: 20
|
||||
min-spare-threads: 20
|
||||
|
||||
spring:
|
||||
application:
|
||||
name: kafkamanager
|
||||
datasource:
|
||||
kafka-manager:
|
||||
driver-class-name: org.postgresql.Driver
|
||||
jdbc-url: jdbc:postgresql://localhost:5432/kafka_manager?reWriteBatchedInserts=true
|
||||
username: admin
|
||||
password: admin
|
||||
type: com.zaxxer.hikari.HikariDataSource
|
||||
hikari:
|
||||
connection-init-sql: "SET TIME ZONE 'Asia/Chongqing';SET CLIENT_ENCODING TO 'UTF-8';"
|
||||
connection-test-query: "select 1;"
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
logging:
|
||||
config: classpath:logback-spring.xml
|
||||
|
||||
# kafka监控
|
||||
kafka-monitor:
|
||||
enabled: true
|
||||
notify-kafka:
|
||||
cluster-id: 95
|
||||
topic-name: kmo_monitor
|
||||
@@ -11,16 +11,13 @@ spring:
|
||||
name: kafkamanager
|
||||
datasource:
|
||||
kafka-manager:
|
||||
driver-class-name: org.mariadb.jdbc.Driver
|
||||
jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
|
||||
username: admin
|
||||
password: admin
|
||||
driver-class-name: org.mariadb.jdbc.Driver
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
profiles:
|
||||
active: dev
|
||||
|
||||
logging:
|
||||
config: classpath:logback-spring.xml
|
||||
|
||||
|
||||
34
web/src/main/resources/assembly.xml
Normal file
34
web/src/main/resources/assembly.xml
Normal file
@@ -0,0 +1,34 @@
|
||||
<assembly
|
||||
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||
<id>bin</id>
|
||||
<formats>
|
||||
<format>dir</format>
|
||||
<format>tar.gz</format>
|
||||
</formats>
|
||||
<fileSets>
|
||||
<fileSet>
|
||||
<includes>
|
||||
<include>./bin/*</include>
|
||||
</includes>
|
||||
<fileMode>0755</fileMode>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>./src/main/resources/</directory>
|
||||
<outputDirectory>conf</outputDirectory>
|
||||
<includes>
|
||||
<include>application.yml</include>
|
||||
<include>logback-spring.xml</include>
|
||||
</includes>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${project.build.directory}</directory>
|
||||
<outputDirectory>lib</outputDirectory>
|
||||
<includes>
|
||||
<include>*.jar</include>
|
||||
</includes>
|
||||
</fileSet>
|
||||
|
||||
</fileSets>
|
||||
</assembly>
|
||||
Reference in New Issue
Block a user