Compare commits

...

34 Commits

Author SHA1 Message Date
ZQKC
28d985aaf1 Merge pull request #58 from didi/dev
版本调整为1.1.0
2020-09-25 12:00:38 +08:00
zengqiao
2397cbf80b 调整版本为1.1 2020-09-25 11:56:31 +08:00
zengqiao
a13d9daae3 Merge branch 'master' into dev 2020-09-25 11:44:23 +08:00
zengqiao
c23870e020 bugfix, fix topic overview page 2020-09-17 19:04:33 +08:00
zengqiao
dd2e29dd40 bugfix, fix collect consumer metrics task 2020-09-16 21:04:47 +08:00
ZQKC
74b5700573 Merge pull request #48 from ZQKC/master
扩分区工单详情优化
2020-07-30 21:00:37 +08:00
zengqiao
ba6abea6d8 扩分区工单优化 2020-07-30 20:56:45 +08:00
ZQKC
33b231d512 Merge pull request #3 from didi/master
MR
2020-07-30 20:45:58 +08:00
ZQKC
61f0b67a92 Merge pull request #47 from ZQKC/master
修复扩分区工单
2020-07-30 19:34:56 +08:00
zengqiao
4b679be310 fix execute order partition 2020-07-30 19:30:31 +08:00
ZQKC
a969795677 Merge pull request #40 from hyper-xx/master
增加docker及docker-compose部署方式
2020-07-06 14:00:24 +08:00
xuzhengxi
4f4e7e80fc 增加docker及docker-compose部署方式 2020-07-06 13:19:58 +08:00
ZQKC
2f72cbb627 Merge pull request #38 from yangbajing/feature/postgresql
管理端存储添加 PostgreSQL 数据库支持。
2020-07-05 16:10:16 +08:00
Yang Jing
a460e169ab 修改 Spring 默认配置为使用 MySQL 数据库。 2020-07-05 16:07:33 +08:00
Yang Jing
27ce4d6a0d 为 AccountDao.insert 也提供 PostgreSQL 的 AccountDao.insertOnPG 版。 2020-07-05 00:55:55 +08:00
Yang Jing
ac86f8aded 当使用 PostgreSQL 数据库时,使用 insert on conflict 替代 MySQL 的 replace SQL语句。 2020-07-05 00:42:55 +08:00
Yang Jing
93eca239cb 通过 spring.profiles.active 指定配置来选择后端不同数据库的配置。 2020-07-02 21:39:02 +08:00
Yang Jing
dc5949d497 管理端存储添加 PostgreSQL 数据库支持。 2020-07-02 16:21:34 +08:00
ZQKC
5e24f6b044 Merge pull request #37 from ZQKC/master
fix retention.ms when execute topic order
2020-07-01 11:01:38 +08:00
zengqiao
0cd31e0545 fix retentionTime when create topic 2020-07-01 10:56:55 +08:00
ZQKC
d4dc4b9d0a Merge pull request #1 from didi/dependabot/maven/org.apache.zookeeper-zookeeper-3.4.14
Bump zookeeper from 3.4.6 to 3.4.14
2020-06-08 16:46:33 +08:00
ZQKC
8c6fe40de1 Merge pull request #17 from pierre94/pierre94-add-assembly
Add assembly and operation scripts to simplify deployment
2020-06-05 18:45:41 +08:00
ZQKC
e4dc4bae30 Merge pull request #2 from didi/master
pull
2020-06-05 18:28:28 +08:00
potaaato
d99c21f4d7 Merge pull request #31 from Candieslove/bugfix
fix  重置用户密码&&broker状态修改&&leader rebalance弹框修复
2020-06-05 18:10:31 +08:00
eilenexuzhe
8ef549de80 feat bugfix 2020-06-05 18:03:16 +08:00
ZQKC
1b57758102 Merge pull request #30 from didi/dev
fix underReplicatedPartition and add cluster
2020-06-05 17:08:49 +08:00
ZQKC
553fe30662 Merge pull request #29 from ZQKC/master
fix underReplicatedPartitionCount and add cluster
2020-06-05 17:01:06 +08:00
zengqiao
b6138afe8b fix add cluster 2020-06-05 16:56:00 +08:00
zengqiao
64d64fe6fe bugfix, fix underReplicatedPartitionCount 2020-06-05 15:23:56 +08:00
ZQKC
f29b356b74 Merge pull request #1 from didi/master
pull code
2020-06-05 14:04:11 +08:00
pierrexiong
b5621d1ffd add assembly and operation scripts to simplify deployment
xiongyongxin@hotmail
2020-05-22 20:17:38 +08:00
pierre xiong
66f0da934d Merge pull request #1 from didi/master
pull from didi/kafka-manager
2020-05-22 19:48:35 +08:00
zengqiao
13a90fdd57 add dingding notes 2020-05-19 21:37:15 +08:00
dependabot[bot]
47265bb8d3 Bump zookeeper from 3.4.6 to 3.4.14
Bumps zookeeper from 3.4.6 to 3.4.14.

Signed-off-by: dependabot[bot] <support@github.com>
2020-03-24 06:19:53 +00:00
56 changed files with 1306 additions and 235 deletions

7
Dockerfile Normal file
View File

@@ -0,0 +1,7 @@
FROM fabric8/java-alpine-openjdk8-jdk
MAINTAINER xuzhengxi
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
ADD ./web/target/kafka-manager-web-1.1.0-SNAPSHOT.jar kafka-manager-web.jar
ADD ./docker/kafka-manager/application-standalone.yml application.yml
ENTRYPOINT ["java","-jar","/kafka-manager-web.jar","--spring.config.location=./application.yml"]
EXPOSE 8080

View File

@@ -28,15 +28,15 @@
- 管理员用户与普通用户视角区分;
- 管理员用户与普通用户权限区分;
---
## kafka-manager架构图
![kafka-manager-arch](doc/assets/images/common/arch.png)
---
---
## 安装手册
@@ -45,12 +45,14 @@
- `Maven 3.5.0+`(后端打包依赖)
- `node v8.12.0+`(前端打包依赖)
- `Java 8+`(运行环境需要)
- `MySQL`(数据存储)
- `MySQL``PostgreSQL`(数据存储)
---
### 环境初始化
**MySQL**
执行[create_mysql_table.sql](doc/create_mysql_table.sql)中的SQL命令从而创建所需的MySQL库及表默认创建的库名是`kafka_manager`
```
@@ -58,32 +60,50 @@
mysql -uXXXX -pXXX -h XXX.XXX.XXX.XXX -PXXXX < ./create_mysql_table.sql
```
**PostgreSQL**
执行[create_postgresql_table.sql](doc/create_postgresql_table.sql)中的SQL命令从而创建所需的PostgreSQL表。
```
############# 示例:
psql -h XXX.XXX.XXX.XXX -U XXXX -d kafka_manager -f ./create_postgresql_table.sql
```
*PostgreSQL 用户、数据库创建方式*
```sql
create user admin encrypted password 'admin';
create database kafka_manager owner=admin template=template0 encoding='UTF-8' lc_collate='zh_CN.UTF-8' lc_ctype='zh_CN.UTF-8';
```
***默认配置使用 MySQL 数据库,若要使用 PostgreSQL 数据库,使用 `-Dspring.profiles.active=pg` 指定 `application-pg.yml` 配置文件。***
---
### 打包
执行`mvn install`命令即可。
执行`mvn install`命令即可。
备注:每一次执行`mvn install`命令,都将在`web/src/main/resources/templates`下面生成最新的前端资源文件,如果`console`模块下的代码没有变更,可以修改`./pom.xml`文件,忽略对`console`模块的打包。
---
### 启动
```
```
############# application.yml 是配置文件
cp web/src/main/resources/application.yml web/target/
cd web/target/
nohup java -jar kafka-manager-web-1.0.0-SNAPSHOT.jar --spring.config.location=./application.yml > /dev/null 2>&1 &
nohup java -jar kafka-manager-web-1.1.0-SNAPSHOT.jar --spring.config.location=./application.yml > /dev/null 2>&1 &
```
### 使用
本地启动的话,访问`http://localhost:8080`,输入帐号及密码进行登录。更多参考:[kafka-manager使用手册](doc/user_cn_guide.md)
---
---
## 相关文档
@@ -91,9 +111,9 @@ nohup java -jar kafka-manager-web-1.0.0-SNAPSHOT.jar --spring.config.location=./
## 钉钉交流群
搜索群号:`32821440` 或者扫码可入群交流
搜索群号:`32821440` 或者扫码可入群交流. 备注:在钉钉搜索框搜索`32821440`,然后搜索结果中点击 "网络查找手机/邮箱/钉钉号" 即可看到我们的钉钉群滴滴KafkaManager开源用户群。
![dingding_group](doc/assets/images/common/dingding_group.jpg)

View File

@@ -5,13 +5,13 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<artifactId>kafka-manager</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -1,8 +1,5 @@
package com.xiaojukeji.kafka.manager.common.entity.dto.consumer;
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
import java.util.List;
import java.util.Map;
/**
@@ -11,20 +8,33 @@ import java.util.Map;
* @date 2015/11/12
*/
public class ConsumerDTO {
/**
* 消费group名
*/
private Long clusterId;
private String topicName;
private String consumerGroup;
/**
* 消费类型一般为static
*/
private String location;
/**
* 订阅的每个topic的partition状态列表
*/
private Map<String, List<PartitionState>> topicPartitionMap;
private Map<Integer, Long> partitionOffsetMap;
private Map<Integer, Long> consumerOffsetMap;
public Long getClusterId() {
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getConsumerGroup() {
return consumerGroup;
@@ -42,20 +52,31 @@ public class ConsumerDTO {
this.location = location;
}
public Map<String, List<PartitionState>> getTopicPartitionMap() {
return topicPartitionMap;
public Map<Integer, Long> getPartitionOffsetMap() {
return partitionOffsetMap;
}
public void setTopicPartitionMap(Map<String, List<PartitionState>> topicPartitionMap) {
this.topicPartitionMap = topicPartitionMap;
public void setPartitionOffsetMap(Map<Integer, Long> partitionOffsetMap) {
this.partitionOffsetMap = partitionOffsetMap;
}
public Map<Integer, Long> getConsumerOffsetMap() {
return consumerOffsetMap;
}
public void setConsumerOffsetMap(Map<Integer, Long> consumerOffsetMap) {
this.consumerOffsetMap = consumerOffsetMap;
}
@Override
public String toString() {
return "Consumer{" +
"consumerGroup='" + consumerGroup + '\'' +
return "ConsumerDTO{" +
"clusterId=" + clusterId +
", topicName='" + topicName + '\'' +
", consumerGroup='" + consumerGroup + '\'' +
", location='" + location + '\'' +
", topicPartitionMap=" + topicPartitionMap +
", partitionOffsetMap=" + partitionOffsetMap +
", consumerOffsetMap=" + consumerOffsetMap +
'}';
}
}

View File

@@ -9,6 +9,10 @@ public class OrderPartitionDO extends BaseDO{
private String applicant;
private Integer partitionNum;
private String brokerList;
private Long peakBytesIn;
private String description;
@@ -51,6 +55,22 @@ public class OrderPartitionDO extends BaseDO{
this.applicant = applicant;
}
public Integer getPartitionNum() {
return partitionNum;
}
public void setPartitionNum(Integer partitionNum) {
this.partitionNum = partitionNum;
}
public String getBrokerList() {
return brokerList;
}
public void setBrokerList(String brokerList) {
this.brokerList = brokerList;
}
public Long getPeakBytesIn() {
return peakBytesIn;
}
@@ -98,6 +118,8 @@ public class OrderPartitionDO extends BaseDO{
", clusterName='" + clusterName + '\'' +
", topicName='" + topicName + '\'' +
", applicant='" + applicant + '\'' +
", partitionNum=" + partitionNum +
", brokerList='" + brokerList + '\'' +
", peakBytesIn=" + peakBytesIn +
", description='" + description + '\'' +
", orderStatus=" + orderStatus +

View File

@@ -751,8 +751,7 @@
"version": "1.0.0",
"resolved": "http://registry.npm.taobao.org/assert-plus/download/assert-plus-1.0.0.tgz",
"integrity": "sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU=",
"dev": true,
"optional": true
"dev": true
},
"assign-symbols": {
"version": "1.0.0",
@@ -1505,7 +1504,6 @@
"resolved": "http://registry.npm.taobao.org/combined-stream/download/combined-stream-1.0.7.tgz",
"integrity": "sha1-LR0kMXr7ir6V1tLAsHtXgTU52Cg=",
"dev": true,
"optional": true,
"requires": {
"delayed-stream": "~1.0.0"
}
@@ -2222,8 +2220,7 @@
"version": "1.0.0",
"resolved": "http://registry.npm.taobao.org/delayed-stream/download/delayed-stream-1.0.0.tgz",
"integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=",
"dev": true,
"optional": true
"dev": true
},
"depd": {
"version": "1.1.2",
@@ -2941,8 +2938,7 @@
"version": "1.3.0",
"resolved": "http://registry.npm.taobao.org/extsprintf/download/extsprintf-1.3.0.tgz",
"integrity": "sha1-lpGEQOMEGnpBT4xS48V06zw+HgU=",
"dev": true,
"optional": true
"dev": true
},
"fast-deep-equal": {
"version": "2.0.1",
@@ -3358,8 +3354,7 @@
"ansi-regex": {
"version": "2.1.1",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"aproba": {
"version": "1.2.0",
@@ -3380,14 +3375,12 @@
"balanced-match": {
"version": "1.0.0",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"brace-expansion": {
"version": "1.1.11",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"balanced-match": "^1.0.0",
"concat-map": "0.0.1"
@@ -3402,20 +3395,17 @@
"code-point-at": {
"version": "1.1.0",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"concat-map": {
"version": "0.0.1",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"console-control-strings": {
"version": "1.1.0",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"core-util-is": {
"version": "1.0.2",
@@ -3532,8 +3522,7 @@
"inherits": {
"version": "2.0.3",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"ini": {
"version": "1.3.5",
@@ -3545,7 +3534,6 @@
"version": "1.0.0",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"number-is-nan": "^1.0.0"
}
@@ -3560,7 +3548,6 @@
"version": "3.0.4",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"brace-expansion": "^1.1.7"
}
@@ -3568,14 +3555,12 @@
"minimist": {
"version": "0.0.8",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"minipass": {
"version": "2.3.5",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"safe-buffer": "^5.1.2",
"yallist": "^3.0.0"
@@ -3594,7 +3579,6 @@
"version": "0.5.1",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"minimist": "0.0.8"
}
@@ -3675,8 +3659,7 @@
"number-is-nan": {
"version": "1.0.1",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"object-assign": {
"version": "4.1.1",
@@ -3688,7 +3671,6 @@
"version": "1.4.0",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"wrappy": "1"
}
@@ -3774,8 +3756,7 @@
"safe-buffer": {
"version": "5.1.2",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"safer-buffer": {
"version": "2.1.2",
@@ -3811,7 +3792,6 @@
"version": "1.0.2",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"code-point-at": "^1.0.0",
"is-fullwidth-code-point": "^1.0.0",
@@ -3831,7 +3811,6 @@
"version": "3.0.1",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"ansi-regex": "^2.0.0"
}
@@ -3875,14 +3854,12 @@
"wrappy": {
"version": "1.0.2",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"yallist": {
"version": "3.0.3",
"bundled": true,
"dev": true,
"optional": true
"dev": true
}
}
},
@@ -4864,8 +4841,7 @@
"version": "0.1.1",
"resolved": "http://registry.npm.taobao.org/jsbn/download/jsbn-0.1.1.tgz",
"integrity": "sha1-peZUwuWi3rXyAdls77yoDA7y9RM=",
"dev": true,
"optional": true
"dev": true
},
"json-parse-better-errors": {
"version": "1.0.2",
@@ -8883,8 +8859,7 @@
"version": "0.14.5",
"resolved": "http://registry.npm.taobao.org/tweetnacl/download/tweetnacl-0.14.5.tgz",
"integrity": "sha1-WuaBd/GS1EViadEIr6k/+HQ/T2Q=",
"dev": true,
"optional": true
"dev": true
},
"type-is": {
"version": "1.6.16",

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>kafka-manager</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-manager-console</artifactId>

View File

@@ -10,7 +10,7 @@ import { IClusterData } from 'types/base-type';
const TabPane = Tabs.TabPane;
const detailUrl ='/admin/cluster_detail?clusterId=';
const detailUrl = '/admin/cluster_detail?clusterId=';
const collectionColumns: Array<ColumnProps<IClusterData>> = [
{
@@ -24,7 +24,8 @@ const collectionColumns: Array<ColumnProps<IClusterData>> = [
key: 'clusterName',
sorter: (a: IClusterData, b: IClusterData) => a.clusterName.charCodeAt(0) - b.clusterName.charCodeAt(0),
render: (text, record) => {
return <a href={`${detailUrl}${record.clusterId}`}>{record.clusterName}</a>;
const url = `${detailUrl}${record.clusterId}&clusterName=${record.clusterName}`;
return <a href={encodeURI(url)}>{record.clusterName}</a>;
},
},
{

View File

@@ -39,10 +39,10 @@ export class UserManage extends SearchAndFilter {
public renderColumns = () => {
const role = Object.assign({
title: '角色',
key: 'role',
dataIndex: 'role',
key: 'roleName',
dataIndex: 'roleName',
filters: users.filterRole,
onFilter: (value: string, record: any) => record.role.indexOf(value) === 0,
onFilter: (value: string, record: any) => record.roleName.indexOf(value) === 0,
}, this.renderColumnsFilter('filterVisible'));
return [

View File

@@ -1,6 +1,6 @@
import * as React from 'react';
import './index.less';
import { Table, Modal, notification, PaginationConfig, Button } from 'component/antd';
import { Table, Modal, notification, PaginationConfig, Button, Spin } from 'component/antd';
import { broker, IBroker, IBrokerNetworkInfo, IBrokerPartition } from 'store/broker';
import { observer } from 'mobx-react';
import { StatusGraghCom } from 'component/flow-table';
@@ -49,10 +49,19 @@ export class BrokerList extends SearchAndFilter {
const status = Object.assign({
title: '已同步',
dataIndex: 'status',
key: 'status',
filters: [{ text: '是', value: '' }, { text: '否', value: '' }],
onFilter: (value: string, record: IBrokerPartition) => record.status === value,
dataIndex: 'underReplicatedPartitionCount',
key: 'underReplicatedPartitionCount',
filters: [{ text: '是', value: '1' }, { text: '否', value: '0' }],
onFilter: (value: string, record: IBrokerPartition) => {
// underReplicatedPartitionCount > 0 表示未同步完成
const syncStatus = record.underReplicatedPartitionCount ? '0' : '1';
return syncStatus === value;
},
render: (text: number) => (
<>
<span style={{ marginRight: 8 }}>{text ? '否' : '是'}</span>
</>
),
}, this.renderColumnsFilter('filterVisible'));
return [{
@@ -80,7 +89,8 @@ export class BrokerList extends SearchAndFilter {
title: '未同步副本数量',
dataIndex: 'notUnderReplicatedPartitionCount',
key: 'notUnderReplicatedPartitionCount',
sorter: (a: IBrokerPartition, b: IBrokerPartition) => a.notUnderReplicatedPartitionCount - b.notUnderReplicatedPartitionCount,
sorter: (a: IBrokerPartition, b: IBrokerPartition) =>
a.notUnderReplicatedPartitionCount - b.notUnderReplicatedPartitionCount,
},
status,
region,
@@ -205,7 +215,7 @@ export class BrokerList extends SearchAndFilter {
const dataPartitions = this.state.searchId !== '' ?
broker.partitions.filter((d) => d.brokerId === +this.state.searchId) : broker.partitions;
return (
<>
<Spin spinning={broker.loading}>
<div className="k-row">
<ul className="k-tab">
<li>Broker概览</li>
@@ -239,7 +249,7 @@ export class BrokerList extends SearchAndFilter {
pagination={pagination}
/>
</div>
</>
</Spin>
);
}
}

View File

@@ -45,14 +45,19 @@ class LeaderRebalance extends React.Component<any> {
constructor(props: any) {
super(props);
const url = Url();
this.clusterName = decodeURI(atob(url.search.clusterName));
if (url.search.clusterName) {
this.clusterName = decodeURI(url.search.clusterName);
}
this.clusterId = Number(url.search.clusterId);
}
public handleSubmit = (e: React.MouseEvent<any, MouseEvent>) => {
e.preventDefault();
this.setState({ loading: true });
this.props.form.validateFieldsAndScroll((err: any, values: any) => {
if (err) {
return;
}
this.setState({ loading: true });
this.brokerId = Number(values.brokerId);
addRebalance({ brokerId: this.brokerId, clusterId: this.clusterId, dimension: 0 }).then(() => {
cluster.getRebalance(this.clusterId).then(() => {

View File

@@ -35,6 +35,7 @@ export interface IBrokerPartition extends IBroker {
leaderCount: number;
partitionCount: number;
notUnderReplicatedPartitionCount: number;
underReplicatedPartitionCount?: number;
regionName: string;
bytesInPerSec: number;
}
@@ -74,6 +75,9 @@ interface IBrokerOption {
}
class Broker {
@observable
public loading: boolean = false;
@observable
public brokerBaseInfo: IBrokerBaseInfo = {} as IBrokerBaseInfo;
@@ -119,6 +123,11 @@ class Broker {
@observable
public BrokerOptions: IValueLabel[] = [{ value: null, label: '请选择Broker' }];
@action.bound
public setLoading(value: boolean) {
this.loading = value;
}
@action.bound
public setBrokerBaseInfo(data: IBrokerBaseInfo) {
data.startTime = moment(data.startTime).format('YYYY-MM-DD HH:mm:ss'),
@@ -216,7 +225,8 @@ class Broker {
}
public getBrokerList(clusterId: number) {
getBrokerList(clusterId).then(this.setBrokerList);
this.setLoading(true);
getBrokerList(clusterId).then(this.setBrokerList).finally(() => this.setLoading(false));
}
public getBrokerNetwork(clusterId: number) {
@@ -224,7 +234,8 @@ class Broker {
}
public getBrokerPartition(clusterId: number) {
getBrokerPartition(clusterId).then(this.setBrokerPartition);
this.setLoading(true);
getBrokerPartition(clusterId).then(this.setBrokerPartition).finally(() => this.setLoading(false));
}
public getOneBrokerNetwork(clusterId: number, brokerId: number) {

View File

@@ -17,7 +17,7 @@ export class Users {
@action.bound
public setUserData(data: []) {
this.userData = data.map((d: any) => {
d.role = this.roleMap[d.role];
d.roleName = this.roleMap[d.role];
return d;
});
}

View File

@@ -5,13 +5,13 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager-dao</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<artifactId>kafka-manager</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
</parent>
<properties>
@@ -43,5 +43,9 @@
<artifactId>mariadb-java-client</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -18,6 +18,9 @@ public class AccountDaoImpl implements AccountDao {
@Autowired
private SqlSessionTemplate sqlSession;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
@@ -25,7 +28,7 @@ public class AccountDaoImpl implements AccountDao {
@Override
public int addNewAccount(AccountDO accountDO) {
accountDO.setStatus(DBStatusEnum.NORMAL.getStatus());
return sqlSession.insert("AccountDao.insert", accountDO);
return updateAccount(accountDO);
}
@Override
@@ -35,6 +38,9 @@ public class AccountDaoImpl implements AccountDao {
@Override
public int updateAccount(AccountDO accountDO) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("AccountDao.insertOnPG", accountDO);
}
return sqlSession.insert("AccountDao.insert", accountDO);
}

View File

@@ -19,12 +19,18 @@ public class BrokerDaoImpl implements BrokerDao {
@Autowired
private SqlSessionTemplate sqlSession;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
@Override
public int replace(BrokerDO brokerInfoDO) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("BrokerDao.replaceOnPG", brokerInfoDO);
}
return sqlSession.insert("BrokerDao.replace", brokerInfoDO);
}

View File

@@ -0,0 +1,22 @@
package com.xiaojukeji.kafka.manager.dao.impl;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("spring.datasource.kafka-manager")
public class KafkaManagerProperties {
private String jdbcUrl;
public String getJdbcUrl() {
return jdbcUrl;
}
public void setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}
public boolean hasPG() {
return jdbcUrl.startsWith("jdbc:postgres");
}
}

View File

@@ -17,12 +17,18 @@ public class RegionDaoImpl implements RegionDao {
@Autowired
private SqlSessionTemplate sqlSession;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
@Override
public int insert(RegionDO regionDO) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("RegionDao.insertOnPG", regionDO);
}
return sqlSession.insert("RegionDao.insert", regionDO);
}

View File

@@ -19,12 +19,18 @@ public class TopicDaoImpl implements TopicDao {
@Autowired
private SqlSessionTemplate sqlSession;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
@Override
public int replace(TopicDO topicDO) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("TopicDao.replaceOnPG", topicDO);
}
return sqlSession.insert("TopicDao.replace", topicDO);
}

View File

@@ -25,12 +25,18 @@ public class TopicFavoriteDaoImpl implements TopicFavoriteDao {
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
@Override
public int batchAdd(List<TopicFavoriteDO> topicFavoriteDOList) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("TopicFavoriteDao.batchAddOnPG", topicFavoriteDOList);
}
return sqlSession.insert("TopicFavoriteDao.batchAdd", topicFavoriteDOList);
}

View File

@@ -22,6 +22,18 @@
]]>
</insert>
<insert id="insertOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.AccountDO">
<![CDATA[
insert into account
(username, password, role, status)
values
(#{username}, #{password}, #{role}, #{status})
on conflict (username) do update set password = excluded.password,
role = excluded.role,
status = excluded.status
]]>
</insert>
<delete id="deleteByName" parameterType="java.lang.String">
DELETE FROM account WHERE username = #{username}
</delete>

View File

@@ -20,6 +20,16 @@
(#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
</insert>
<insert id="replaceOnPG" parameterType="BrokerDO">
insert into broker
(cluster_id, broker_id, host, port, timestamp, status)
values (#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
on conflict (cluster_id, broker_id) do update set host = excluded.host,
port = excluded.port,
timestamp = excluded.timestamp,
status = excluded.status
</insert>
<delete id="deleteById" parameterType="java.util.Map">
DELETE FROM broker WHERE cluster_id = #{clusterId} AND broker_id = #{brokerId}
</delete>

View File

@@ -11,6 +11,8 @@
<result column="cluster_name" property="clusterName" />
<result column="topic_name" property="topicName" />
<result column="applicant" property="applicant" />
<result column="partition_num" property="partitionNum" />
<result column="broker_list" property="brokerList" />
<result column="peak_bytes_in" property="peakBytesIn" />
<result column="description" property="description" />
<result column="order_status" property="orderStatus" />
@@ -38,6 +40,16 @@
cluster_name=#{clusterName},
topic_name=#{topicName},
applicant=#{applicant},
<trim>
<if test="partitionNum!=null">
partition_num=#{partitionNum},
</if>
</trim>
<trim>
<if test="brokerList!=null">
broker_list=#{brokerList},
</if>
</trim>
peak_bytes_in=#{peakBytesIn},
description=#{description},
order_status=#{orderStatus},

View File

@@ -21,6 +21,15 @@
(#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
</insert>
<insert id="insertOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.RegionDO">
insert into region
(region_name, cluster_id, broker_list, description, operator)
values (#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
on conflict (region_name, cluster_id) do update set broker_list = excluded.broker_list,
description = excluded.description,
operator = excluded.operator
</insert>
<delete id="deleteById" parameterType="java.lang.Long">
DELETE FROM region WHERE id = #{id}
</delete>

View File

@@ -19,6 +19,15 @@
(#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
</insert>
<insert id="replaceOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.TopicDO">
insert into topic
(cluster_id, topic_name, principals, description, status)
values (#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
on conflict (cluster_id, topic_name) do update set principals = excluded.principals,
description = excluded.description,
status = excluded.status
</insert>
<delete id="deleteById" parameterType="java.lang.Long">
DELETE FROM topic WHERE id = #{id}
</delete>

View File

@@ -21,6 +21,15 @@
</foreach>
</insert>
<insert id="batchAddOnPG" parameterType="java.util.List">
insert into topic_favorite (cluster_id, topic_name, username)
values
<foreach item="TopicFavoriteDO" index="index" collection="list" separator=",">
(#{TopicFavoriteDO.clusterId}, #{TopicFavoriteDO.topicName}, #{TopicFavoriteDO.username})
</foreach>
on conflict (cluster_id, topic_name, username) do update set gmt_modify = now();
</insert>
<delete id="deleteById" parameterType="java.lang.Long">
DELETE FROM topic_favorite WHERE id=#{id}
</delete>

View File

@@ -149,6 +149,8 @@ CREATE TABLE `order_partition` (
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
`broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表, 逗号分割',
`partition_num` int(11) NOT NULL DEFAULT 0 COMMENT '新增分区数',
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量',
`description` text COMMENT '备注信息',

View File

@@ -0,0 +1,325 @@
-- CREATE DATABASE kafka_manager;
-- \c kafka_manager;
SET TIME ZONE 'Asia/Chongqing';
SET CLIENT_ENCODING TO 'UTF-8';
CREATE OR REPLACE FUNCTION on_update_timestamp() RETURNS TRIGGER AS
$$
BEGIN
new.gmt_modify = current_timestamp;
return new;
END;
$$ LANGUAGE plpgsql;
-- 账号表
CREATE TABLE account
(
id bigserial NOT NULL, -- 'ID',
username varchar(64) NOT NULL UNIQUE DEFAULT '', -- '用户名',
password varchar(128) NOT NULL DEFAULT '', -- '密码',
role int NOT NULL DEFAULT 0, -- '角色类型, 0:普通用户',
status int NOT NULL DEFAULT 0, -- '0标识使用中-1标识已废弃',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT account_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX account_uniq_username ON account (username);
INSERT INTO account(username, password, role)
VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2);
CREATE TRIGGER account_trig_gmt_modify
BEFORE UPDATE
ON account
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- 告警规则表
CREATE TABLE alarm_rule
(
id bigserial, -- '自增ID',
alarm_name varchar(128) NOT NULL DEFAULT '', -- '告警名字',
strategy_expressions text, -- '表达式',
strategy_filters text, -- '过滤条件',
strategy_actions text, -- '响应',
principals varchar(512) NOT NULL DEFAULT '', -- '负责人',
status int2 NOT NULL DEFAULT 1, -- '-1:逻辑删除, 0:关闭, 1:正常',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT alarm_rule_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX alarm_rule_uniq_alarm_name ON alarm_rule (alarm_name);
CREATE TRIGGER alarm_rule_trig_gmt_modify
BEFORE UPDATE
ON alarm_rule
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- Broker信息表
CREATE TABLE broker
(
id bigserial, -- 'id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID',
host varchar(128) NOT NULL DEFAULT '', -- 'Broker主机名',
port int NOT NULL DEFAULT '-1', -- 'Broker端口',
timestamp bigint NOT NULL DEFAULT '-1', -- '启动时间',
status int NOT NULL DEFAULT '0', -- '状态0有效-1无效',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT broker_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX broker_uniq_cluster_id_broker_id ON broker (cluster_id, broker_id);
CREATE TRIGGER broker_trig_gmt_modify
BEFORE UPDATE
ON broker
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- BrokerMetric信息表
CREATE TABLE broker_metrics
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID',
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入',
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出',
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒被拒绝字节数',
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数流入',
fail_fetch_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费失败数',
fail_produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒失败生产数',
fetch_consumer_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费请求数',
produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒生产数',
request_handler_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '请求处理器繁忙百分比',
network_processor_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '网络处理器繁忙百分比',
request_queue_size bigint NOT NULL DEFAULT '0', -- '请求列表大小',
response_queue_size bigint NOT NULL DEFAULT '0', -- '响应列表大小',
log_flush_time decimal(53, 2) NOT NULL DEFAULT '0.00', -- '刷日志时间',
total_time_produce_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-平均值',
total_time_produce_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-99分位',
total_time_fetch_consumer_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-平均值',
total_time_fetch_consumer_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-99分位',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
CONSTRAINT broker_metrics_pk PRIMARY KEY (id)
);
CREATE INDEX broker_metrics_idx_cluster_id_broker_id_gmt_create ON broker_metrics (cluster_id, broker_id, gmt_create);
-- Cluster表
CREATE TABLE cluster
(
id bigserial, -- '集群ID',
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
zookeeper varchar(512) NOT NULL DEFAULT '', -- 'ZK地址',
bootstrap_servers varchar(512) NOT NULL DEFAULT '', -- 'Server地址',
kafka_version varchar(32) NOT NULL DEFAULT '', -- 'Kafka版本',
alarm_flag int2 NOT NULL DEFAULT '0', -- '0:不开启告警, 1开启告警',
security_protocol varchar(512) NOT NULL DEFAULT '', -- '安全协议',
sasl_mechanism varchar(512) NOT NULL DEFAULT '', -- '安全机制',
sasl_jaas_config varchar(512) NOT NULL DEFAULT '', -- 'Jaas配置',
status int2 NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT cluster_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX cluster_uniq_cluster_name ON cluster (cluster_name);
CREATE TRIGGER cluster_trig_gmt_modify
BEFORE UPDATE
ON cluster
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- ClusterMetrics信息
CREATE TABLE cluster_metrics
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID',
topic_num int NOT NULL DEFAULT '0', -- 'Topic数',
partition_num int NOT NULL DEFAULT '0', -- '分区数',
broker_num int NOT NULL DEFAULT '0', -- 'Broker数',
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流入(B)',
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流出(B)',
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝(B)',
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数(条)',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
CONSTRAINT cluster_metrics_pk PRIMARY KEY (id)
);
CREATE INDEX cluster_metrics_idx_cluster_id_gmt_create ON cluster_metrics (cluster_id, gmt_create);
-- Controller历史变更记录表
CREATE TABLE controller
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerId',
host varchar(256) NOT NULL DEFAULT '', -- '主机名',
timestamp bigint NOT NULL DEFAULT '-1', -- 'Controller变更时间',
version int NOT NULL DEFAULT '-1', -- 'Controller格式版本',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
CONSTRAINT controller_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX controller_uniq_cluster_id_broker_id_timestamp ON controller (cluster_id, broker_id, timestamp);
-- Topic迁移信息
CREATE TABLE migration_task
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
reassignment_json text, -- '任务参数',
real_throttle bigint NOT NULL DEFAULT '0', -- '实际限流值(B/s)',
operator varchar(128) NOT NULL DEFAULT '', -- '操作人',
description varchar(256) NOT NULL DEFAULT '', -- '备注说明',
status int NOT NULL DEFAULT '0', -- '任务状态',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务修改时间',
CONSTRAINT migration_task_pk PRIMARY KEY (id)
);
CREATE TRIGGER migration_task_trig_gmt_modify
BEFORE UPDATE
ON migration_task
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
CREATE TABLE operation_history
(
id bigserial, -- 'id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
operator varchar(128) NOT NULL DEFAULT '', -- '操作人',
operation varchar(256) NOT NULL DEFAULT '', -- '操作描述',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
PRIMARY KEY (id)
);
--='操作记录表';
-- 分区申请工单
CREATE TABLE order_partition
(
id bigserial, -- 'id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
applicant varchar(128) NOT NULL DEFAULT '', -- '申请人',
partition_num int NOT NULL DEFAULT '0', -- '分区数',
broker_list varchar(128) NOT NULL DEFAULT '', -- 'Broker列表',
peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流量',
description text, -- '备注信息',
order_status int NOT NULL DEFAULT '0', -- '工单状态',
approver varchar(128) NOT NULL DEFAULT '', -- '审批人',
opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见',
status int NOT NULL DEFAULT '0', -- '状态0标识有效-1无效',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT order_partition_pk PRIMARY KEY (id)
);
CREATE TRIGGER order_partition_trig_gmt_modify
BEFORE UPDATE
ON order_partition
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- Topic申请工单
CREATE TABLE order_topic
(
id bigserial, -- 'ID',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
retention_time bigint NOT NULL DEFAULT '-1', -- '保留时间(ms)',
partition_num int NOT NULL DEFAULT '-1', -- '分区数',
replica_num int NOT NULL DEFAULT '-1', -- '副本数',
regions varchar(128) NOT NULL DEFAULT '', -- 'RegionId列表',
brokers varchar(128) NOT NULL DEFAULT '', -- 'Broker列表',
peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流入流量(KB)',
applicant varchar(128) NOT NULL DEFAULT '', -- '申请人',
principals varchar(256) NOT NULL DEFAULT '', -- '负责人',
description text, -- '备注信息',
order_status int NOT NULL DEFAULT '0', -- '工单状态',
approver varchar(128) NOT NULL DEFAULT '', -- '审批人',
opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见',
status int NOT NULL DEFAULT '0', -- '状态0标识有效-1无效',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT order_topic_pk PRIMARY KEY (id)
);
CREATE TRIGGER order_topic_trig_gmt_modify
BEFORE UPDATE
ON order_topic
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- Region信息表
CREATE TABLE region
(
id bigserial, -- 'id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
region_name varchar(128) NOT NULL DEFAULT '', -- 'Region名称',
broker_list varchar(256) NOT NULL DEFAULT '', -- 'Broker列表',
level int NOT NULL DEFAULT '0', -- 'Region重要等级, 0级普通, 1极重要2级极重要',
operator varchar(45) NOT NULL DEFAULT '', -- '操作人',
description text, -- '备注说明',
status int NOT NULL DEFAULT '0', -- '状态0正常-1废弃1容量已满',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT region_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX region_uniq_cluster_id_region_name ON region (cluster_id, region_name);
CREATE TRIGGER region_trig_gmt_modify
BEFORE UPDATE
ON region
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- Topic信息表
CREATE TABLE topic
(
id bigserial, -- 'ID',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
applicant varchar(256) NOT NULL DEFAULT '', -- '申请人',
principals varchar(256) NOT NULL DEFAULT '', -- '负责人',
description text, -- '备注信息',
status int NOT NULL DEFAULT '0', -- '0标识使用中-1标识已废弃',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT topic_pk PRIMARY KEY (id)
); --='';
CREATE UNIQUE INDEX topic_uniq_cluster_id_topic_name ON topic (cluster_id, topic_name);
CREATE TRIGGER topic_trig_gmt_modify
BEFORE UPDATE
ON topic
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- 用户收藏的Topic表
CREATE TABLE topic_favorite
(
id bigserial, -- '自增Id',
username varchar(64) NOT NULL DEFAULT '', -- '用户名',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
status int NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT topic_favorite_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX topic_favorite_uniq_username_cluster_id_topic_name ON topic_favorite (username, cluster_id, topic_name);
CREATE TRIGGER topic_favorite_trig_gmt_modify
BEFORE UPDATE
ON topic_favorite
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- TopicMetrics表
CREATE TABLE topic_metrics
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒进入消息条数',
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入',
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出',
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝字节数',
total_produce_requests decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒请求数',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
CONSTRAINT topic_metrics_pk PRIMARY KEY (id)
);
CREATE INDEX topic_metrics_idx_cluster_id_topic_name_gmt_create ON topic_metrics (cluster_id, topic_name, gmt_create);

32
docker-compose.yml Normal file
View File

@@ -0,0 +1,32 @@
version: '2'
services:
mysqldbserver:
container_name: mysqldbserver
image: mysql:5.7
build:
context: .
dockerfile: docker/mysql/Dockerfile
ports:
- "3306:3306"
command: [
'mysqld',
'--character-set-server=utf8',
'--collation-server=utf8_general_ci',
'--default-time-zone=+8:00'
]
environment:
MYSQL_ROOT_PASSWORD: 12345678
MYSQL_DATABASE: kafka_manager
MYSQL_USER: admin
MYSQL_PASSWORD: 12345678
restart: always
kafka-manager-web:
container_name: kafka-manager-web
build:
context: .
dockerfile: Dockerfile
ports:
- "8080:8080"
links:
- mysqldbserver
restart: always

View File

@@ -0,0 +1,7 @@
FROM java:8
MAINTAINER xuzhengxi
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
ADD ../../web/target/kafka-manager-web-1.1.0-SNAPSHOT.jar kafka-manager-web.jar
ADD ./application.yml application.yml
ENTRYPOINT ["java","-jar","/kafka-manager-web.jar","--spring.config.location=./application.yml"]
EXPOSE 8080

View File

@@ -0,0 +1,32 @@
server:
port: 8080
tomcat:
accept-count: 100
max-connections: 1000
max-threads: 20
min-spare-threads: 20
spring:
application:
name: kafkamanager
datasource:
kafka-manager:
jdbc-url: jdbc:mysql://mysqldbserver:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: admin
password: 12345678
driver-class-name: org.mariadb.jdbc.Driver
main:
allow-bean-definition-overriding: true
profiles:
active: dev
logging:
config: classpath:logback-spring.xml
# kafka监控
kafka-monitor:
enabled: true
notify-kafka:
cluster-id: 95
topic-name: kmo_monitor

View File

@@ -0,0 +1,32 @@
server:
port: 8080
tomcat:
accept-count: 100
max-connections: 1000
max-threads: 20
min-spare-threads: 20
spring:
application:
name: kafkamanager
datasource:
kafka-manager:
jdbc-url: jdbc:mysql://mysqldbserver:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: admin
password: 12345678
driver-class-name: org.mariadb.jdbc.Driver
main:
allow-bean-definition-overriding: true
profiles:
active: dev
logging:
config: classpath:logback-spring.xml
# kafka监控
kafka-monitor:
enabled: true
notify-kafka:
cluster-id: 95
topic-name: kmo_monitor

3
docker/mysql/Dockerfile Normal file
View File

@@ -0,0 +1,3 @@
FROM mysql:5.7
MAINTAINER xuzhengxi
COPY ./docker/mysql/create_mysql_table.sql /docker-entrypoint-initdb.d/

View File

@@ -0,0 +1,243 @@
CREATE DATABASE IF NOT EXISTS `kafka_manager`;
USE `kafka_manager`;
CREATE TABLE `account` (
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
`username` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '用户名',
`password` varchar(128) NOT NULL DEFAULT '' COMMENT '密码',
`role` int(16) NOT NULL DEFAULT '0' COMMENT '角色类型, 0:普通用户',
`status` int(16) NOT NULL DEFAULT '0' COMMENT '0标识使用中-1标识已废弃',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='账号表';
INSERT INTO account(username, password, role) VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2);
CREATE TABLE `alarm_rule` (
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
`alarm_name` varchar(128) NOT NULL DEFAULT '' COMMENT '告警名字',
`strategy_expressions` text COMMENT '表达式',
`strategy_filters` text COMMENT '过滤条件',
`strategy_actions` text COMMENT '响应',
`principals` varchar(512) NOT NULL DEFAULT '' COMMENT '负责人',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '-1:逻辑删除, 0:关闭, 1:正常',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_alarm_name` (`alarm_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='告警规则表';
CREATE TABLE `broker` (
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerID',
`host` varchar(128) NOT NULL DEFAULT '' COMMENT 'Broker主机名',
`port` int(32) NOT NULL DEFAULT '-1' COMMENT 'Broker端口',
`timestamp` bigint(128) NOT NULL DEFAULT '-1' COMMENT '启动时间',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态0有效-1无效',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_cluster_id_broker_id` (`cluster_id`,`broker_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Broker信息表';
CREATE TABLE `broker_metrics` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerID',
`bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流入',
`bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流出',
`bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒被拒绝字节数',
`messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消息数流入',
`fail_fetch_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消费失败数',
`fail_produce_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒失败生产数',
`fetch_consumer_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消费请求数',
`produce_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒生产数',
`request_handler_idl_percent` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '请求处理器繁忙百分比',
`network_processor_idl_percent` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '网络处理器繁忙百分比',
`request_queue_size` bigint(20) NOT NULL DEFAULT '0' COMMENT '请求列表大小',
`response_queue_size` bigint(20) NOT NULL DEFAULT '0' COMMENT '响应列表大小',
`log_flush_time` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '刷日志时间',
`total_time_produce_mean` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'produce请求处理总时间-平均值',
`total_time_produce_99th` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'produce请求处理总时间-99分位',
`total_time_fetch_consumer_mean` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'fetch请求总时间-平均值',
`total_time_fetch_consumer_99th` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'fetch请求总时间-99分位',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_cluster_id_broker_id_gmt_create` (`cluster_id`,`broker_id`,`gmt_create`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='BrokerMetric信息表';
CREATE TABLE `cluster` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '集群ID',
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
`zookeeper` varchar(512) NOT NULL DEFAULT '' COMMENT 'ZK地址',
`bootstrap_servers` varchar(512) NOT NULL DEFAULT '' COMMENT 'Server地址',
`kafka_version` varchar(32) NOT NULL DEFAULT '' COMMENT 'Kafka版本',
`alarm_flag` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启告警, 1开启告警',
`security_protocol` varchar(512) NOT NULL DEFAULT '' COMMENT '安全协议',
`sasl_mechanism` varchar(512) NOT NULL DEFAULT '' COMMENT '安全机制',
`sasl_jaas_config` varchar(512) NOT NULL DEFAULT '' COMMENT 'Jaas配置',
`status` int(4) NOT NULL DEFAULT '0' COMMENT '删除标记, 0表示未删除, -1表示删除',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_cluster_name` (`cluster_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Cluster表';
CREATE TABLE `cluster_metrics` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
`cluster_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '集群ID',
`topic_num` int(11) NOT NULL DEFAULT '0' COMMENT 'Topic数',
`partition_num` int(11) NOT NULL DEFAULT '0' COMMENT '分区数',
`broker_num` int(11) NOT NULL DEFAULT '0' COMMENT 'Broker数',
`bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒流入(B)',
`bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒流出(B)',
`bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒拒绝(B)',
`messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消息数(条)',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_cluster_id_gmt_create` (`cluster_id`,`gmt_create`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='ClusterMetrics信息';
CREATE TABLE `controller` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerId',
`host` varchar(256) NOT NULL DEFAULT '' COMMENT '主机名',
`timestamp` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Controller变更时间',
`version` int(11) NOT NULL DEFAULT '-1' COMMENT 'Controller格式版本',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_cluster_id_broker_id_timestamp` (`cluster_id`,`broker_id`,`timestamp`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Controller历史变更记录表';
CREATE TABLE `migration_task` (
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
`cluster_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '集群ID',
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
`reassignment_json` text COMMENT '任务参数',
`real_throttle` bigint(20) NOT NULL DEFAULT '0' COMMENT '实际限流值(B/s)',
`operator` varchar(128) NOT NULL DEFAULT '' COMMENT '操作人',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '备注说明',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '任务状态',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '任务创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '任务修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic迁移信息';
CREATE TABLE `operation_history` (
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
`operator` varchar(128) NOT NULL DEFAULT '' COMMENT '操作人',
`operation` varchar(256) NOT NULL DEFAULT '' COMMENT '操作描述',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='操作记录表';
CREATE TABLE `order_partition` (
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
`broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表, 逗号分割',
`partition_num` int(11) NOT NULL DEFAULT 0 COMMENT '新增分区数',
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量',
`description` text COMMENT '备注信息',
`order_status` int(16) NOT NULL DEFAULT '0' COMMENT '工单状态',
`approver` varchar(128) NOT NULL DEFAULT '' COMMENT '审批人',
`opinion` varchar(256) NOT NULL DEFAULT '' COMMENT '审批意见',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态0标识有效-1无效',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='分区申请工单';
CREATE TABLE `order_topic` (
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
`retention_time` bigint(20) NOT NULL DEFAULT '-1' COMMENT '保留时间(ms)',
`partition_num` int(16) NOT NULL DEFAULT '-1' COMMENT '分区数',
`replica_num` int(16) NOT NULL DEFAULT '-1' COMMENT '副本数',
`regions` varchar(128) NOT NULL DEFAULT '' COMMENT 'RegionId列表',
`brokers` varchar(128) NOT NULL DEFAULT '' COMMENT 'Broker列表',
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流入流量(KB)',
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
`principals` varchar(256) NOT NULL DEFAULT '' COMMENT '负责人',
`description` text COMMENT '备注信息',
`order_status` int(16) NOT NULL DEFAULT '0' COMMENT '工单状态',
`approver` varchar(128) NOT NULL DEFAULT '' COMMENT '审批人',
`opinion` varchar(256) NOT NULL DEFAULT '' COMMENT '审批意见',
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态0标识有效-1无效',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic申请工单';
CREATE TABLE `region` (
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`region_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Region名称',
`broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表',
`level` int(16) NOT NULL DEFAULT '0' COMMENT 'Region重要等级, 0级普通, 1极重要2级极重要',
`operator` varchar(45) NOT NULL DEFAULT '' COMMENT '操作人',
`description` text COMMENT '备注说明',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态0正常-1废弃1容量已满',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_cluster_id_region_name` (`cluster_id`,`region_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Region信息表';
CREATE TABLE `topic` (
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`topic_name` varchar(192) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT 'Topic名称',
`applicant` varchar(256) NOT NULL DEFAULT '' COMMENT '申请人',
`principals` varchar(256) NOT NULL DEFAULT '' COMMENT '负责人',
`description` text COMMENT '备注信息',
`status` int(16) NOT NULL DEFAULT '0' COMMENT '0标识使用中-1标识已废弃',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_cluster_id_topic_name` (`cluster_id`,`topic_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic信息表';
CREATE TABLE `topic_favorite` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增Id',
`username` varchar(64) NOT NULL DEFAULT '' COMMENT '用户名',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
`status` int(16) NOT NULL DEFAULT '0' COMMENT '删除标记, 0表示未删除, -1表示删除',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_username_cluster_id_topic_name` (`username`,`cluster_id`,`topic_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户收藏的Topic表';
CREATE TABLE `topic_metrics` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
`messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒进入消息条数',
`bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流入',
`bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流出',
`bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒拒绝字节数',
`total_produce_requests` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒请求数',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_cluster_id_topic_name_gmt_create` (`cluster_id`,`topic_name`,`gmt_create`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='TopicMetrics表';

View File

@@ -6,7 +6,7 @@
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager</artifactId>
<packaging>pom</packaging>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
@@ -15,7 +15,7 @@
</parent>
<properties>
<kafka-manager.revision>1.0.0-SNAPSHOT</kafka-manager.revision>
<kafka-manager.revision>1.1.0-SNAPSHOT</kafka-manager.revision>
<jackson.version>2.9.0</jackson.version>
<!-- maven properties -->
@@ -77,12 +77,12 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
<version>3.4.14</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>

View File

@@ -5,13 +5,13 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager-service</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<artifactId>kafka-manager</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
</parent>
<properties>
@@ -31,7 +31,7 @@
<dependency>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>kafka-manager-dao</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
</dependency>
<!-- spring -->

View File

@@ -2,12 +2,12 @@ package com.xiaojukeji.kafka.manager.service.collector;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetrics;
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerDTO;
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +34,8 @@ public class CollectConsumerMetricsTask extends BaseCollectTask {
if (clusterDO == null) {
return;
}
Map<String, List<PartitionState>> topicNamePartitionStateListMap = new HashMap<>();
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, topicNamePartitionStateListMap);
Map<TopicPartition, Long> allPartitionOffsetMap = new HashMap<>();
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, allPartitionOffsetMap);
List<ConsumerMetrics> consumerMetricsList = convert2ConsumerMetrics(consumerDTOList);
KafkaMetricsCache.putConsumerMetricsToCache(clusterId, consumerMetricsList);
@@ -47,23 +47,27 @@ public class CollectConsumerMetricsTask extends BaseCollectTask {
private List<ConsumerMetrics> convert2ConsumerMetrics(List<ConsumerDTO> consumerDTOList) {
List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();
for (ConsumerDTO consumerDTO : consumerDTOList) {
Map<String, List<PartitionState>> topicNamePartitionStateListMap = consumerDTO.getTopicPartitionMap();
for(Map.Entry<String, List<PartitionState>> entry : topicNamePartitionStateListMap.entrySet()){
String topicName = entry.getKey();
List<PartitionState> partitionStateList = entry.getValue();
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
consumerMetrics.setClusterId(clusterId);
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
consumerMetrics.setLocation(consumerDTO.getLocation());
consumerMetrics.setTopicName(topicName);
long sumLag = 0;
for (PartitionState partitionState : partitionStateList) {
Map.Entry<Long, Long> offsetEntry = new AbstractMap.SimpleEntry<>(partitionState.getOffset(), partitionState.getConsumeOffset());
sumLag += (offsetEntry.getKey() - offsetEntry.getValue() > 0 ? offsetEntry.getKey() - offsetEntry.getValue(): 0);
}
consumerMetrics.setSumLag(sumLag);
consumerMetricsList.add(consumerMetrics);
if (consumerDTO.getPartitionOffsetMap() == null || consumerDTO.getConsumerOffsetMap() == null) {
continue;
}
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
consumerMetrics.setClusterId(consumerDTO.getClusterId());
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
consumerMetrics.setLocation(consumerDTO.getLocation());
consumerMetrics.setTopicName(consumerDTO.getTopicName());
long sumLag = 0;
for(Map.Entry<Integer, Long> entry : consumerDTO.getPartitionOffsetMap().entrySet()){
Long partitionOffset = entry.getValue();
Long consumerOffset = consumerDTO.getConsumerOffsetMap().get(entry.getKey());
if (partitionOffset == null || consumerOffset == null) {
continue;
}
sumLag += Math.max(partitionOffset - consumerOffset, 0);
}
consumerMetrics.setSumLag(sumLag);
consumerMetricsList.add(consumerMetrics);
}
return consumerMetricsList;
}

View File

@@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerGroupDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.PartitionOffsetDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
import java.util.Map;
@@ -57,7 +58,7 @@ public interface ConsumerService {
* @return
*/
List<ConsumerDTO> getMonitoredConsumerList(ClusterDO clusterDO,
Map<String, List<PartitionState>> topicNamePartitionStateListMap);
Map<TopicPartition, Long> partitionOffsetMap);
/**
* 重置offset

View File

@@ -45,7 +45,7 @@ public interface OrderService {
* @date 19/6/23
* @return Result
*/
Result modifyOrderPartition(OrderPartitionDO orderPartitionDO, String operator);
Result modifyOrderPartition(OrderPartitionDO orderPartitionDO, String operator, boolean admin);
/**
* 查询Topic工单

View File

@@ -76,6 +76,7 @@ public class AdminTopicServiceImpl implements AdminTopicService {
OperationHistoryDO operationHistoryDO = OperationHistoryDO.newInstance(topicDO.getClusterId(), topicDO.getTopicName(), operator, OperationEnum.CREATE_TOPIC.message);
operationHistoryDao.insert(operationHistoryDO);
topicDao.replace(topicDO);
} catch (Exception e) {
return AdminTopicStatusEnum.REPLACE_DB_FAILED;
}
@@ -188,4 +189,5 @@ public class AdminTopicServiceImpl implements AdminTopicService {
}
return AdminTopicStatusEnum.SUCCESS;
}
}

View File

@@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.dao.ClusterDao;
import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao;
import com.xiaojukeji.kafka.manager.dao.ControllerDao;
import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.schedule.ScheduleCollectDataManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
@@ -37,6 +38,9 @@ public class ClusterServiceImpl implements ClusterService {
@Autowired
private ClusterMetadataManager clusterMetadataManager;
@Autowired
private ScheduleCollectDataManager scheduleCollectDataManager;
@Autowired
private ControllerDao controllerDao;
@@ -57,6 +61,7 @@ public class ClusterServiceImpl implements ClusterService {
if (!status) {
return new Result(StatusCode.OPERATION_ERROR, "add zookeeper watch failed");
}
scheduleCollectDataManager.start(clusterDO);
if (clusterDO.getAlarmFlag() == null || clusterDO.getAlarmFlag() <= 0) {
return new Result();

View File

@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
import com.xiaojukeji.kafka.manager.common.constant.OffsetStoreLocation;
import com.xiaojukeji.kafka.manager.common.constant.StatusCode;
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
@@ -18,7 +17,6 @@ import com.xiaojukeji.kafka.manager.service.cache.ConsumerMetadataCache;
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientCache;
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
import com.xiaojukeji.kafka.manager.service.service.TopicService;
import com.xiaojukeji.kafka.manager.service.service.ZookeeperService;
import com.xiaojukeji.kafka.manager.common.utils.zk.ZkPathUtil;
import kafka.admin.AdminClient;
import org.apache.commons.lang.StringUtils;
@@ -49,9 +47,6 @@ public class ConsumerServiceImpl implements ConsumerService {
@Autowired
private TopicService topicService;
@Autowired
private ZookeeperService zkService;
private final ExecutorService consumerListThreadPool = Executors.newFixedThreadPool(50, new DefaultThreadFactory("ConsumerPool"));
@Override
@@ -135,20 +130,20 @@ public class ConsumerServiceImpl implements ConsumerService {
@Override
public List<ConsumerDTO> getMonitoredConsumerList(final ClusterDO clusterDO,
final Map<String, List<PartitionState>> partitionStateListMap) {
final Map<TopicPartition, Long> allPartitionOffsetMap) {
List<ConsumerGroupDTO> consumerGroupDTOList = getConsumerGroupList(clusterDO.getId());
if (consumerGroupDTOList == null || consumerGroupDTOList.isEmpty()) {
return new ArrayList<>();
}
FutureTask<ConsumerDTO>[] taskList = new FutureTask[consumerGroupDTOList.size()];
FutureTask<List<ConsumerDTO>>[] taskList = new FutureTask[consumerGroupDTOList.size()];
for (int i = 0; i < consumerGroupDTOList.size(); i++) {
final ConsumerGroupDTO consumerGroupDTO = consumerGroupDTOList.get(i);
taskList[i] = new FutureTask<>(new Callable<ConsumerDTO>() {
taskList[i] = new FutureTask<>(new Callable<List<ConsumerDTO>>() {
@Override
public ConsumerDTO call() throws Exception {
public List<ConsumerDTO> call() throws Exception {
try {
return getMonitoredConsumer(clusterDO, consumerGroupDTO, partitionStateListMap);
return getMonitoredConsumer(clusterDO, consumerGroupDTO, allPartitionOffsetMap);
} catch (Exception e) {
logger.error("get monitored consumer error, group:{}", consumerGroupDTO.getConsumerGroup(), e);
}
@@ -159,31 +154,70 @@ public class ConsumerServiceImpl implements ConsumerService {
}
List<ConsumerDTO> consumerList = new ArrayList<>();
for (FutureTask<ConsumerDTO> task : taskList) {
ConsumerDTO consumer = null;
for (FutureTask<List<ConsumerDTO>> task : taskList) {
List<ConsumerDTO> dtoList = null;
try {
consumer = task.get();
dtoList = task.get();
} catch (Exception e) {
logger.error("getMonitoredConsumerList@ConsumeServiceImpl, ", e);
}
if (consumer == null) {
if (dtoList == null) {
continue;
}
consumerList.add(consumer);
consumerList.addAll(dtoList);
}
return consumerList;
}
private ConsumerDTO getMonitoredConsumer(ClusterDO cluster, ConsumerGroupDTO consumerGroupDTO, Map<String, List<PartitionState>> globalTopicNamePartitionStateListMap) {
// 获取当前consumerGroup下的所有的topic的partitionState信息
Map<String, List<PartitionState>> topicNamePartitionStateListMap = getConsumerGroupPartitionStateList(cluster, consumerGroupDTO, globalTopicNamePartitionStateListMap);
private List<ConsumerDTO> getMonitoredConsumer(ClusterDO clusterDO,
ConsumerGroupDTO consumerGroupDTO,
Map<TopicPartition, Long> allPartitionOffsetMap) {
List<ConsumerDTO> dtoList = new ArrayList<>();
//将没有对应consumer的partition信息统一放到一个consumer中
ConsumerDTO consumerDTO = new ConsumerDTO();
consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().name());
consumerDTO.setTopicPartitionMap(topicNamePartitionStateListMap);
return consumerDTO;
List<String> topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(
clusterDO.getId(),
consumerGroupDTO.getOffsetStoreLocation().getLocation(),
consumerGroupDTO.getConsumerGroup()
);
for (String topicName : topicNameList) {
TopicMetadata metadata = ClusterMetadataManager.getTopicMetaData(clusterDO.getId(), topicName);
if (metadata == null || metadata.getPartitionNum() <= 0) {
continue;
}
if (!allPartitionOffsetMap.containsKey(new TopicPartition(topicName, 0))) {
Map<TopicPartition, Long> offsetMap = topicService.getTopicPartitionOffset(clusterDO, topicName);
if (offsetMap == null) {
offsetMap = new HashMap<>();
}
allPartitionOffsetMap.putAll(offsetMap);
}
Map<Integer, Long> consumerOffsetMap = null;
if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) {
consumerOffsetMap = getTopicConsumerOffsetInZK(clusterDO, metadata, consumerGroupDTO);
} else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) {
consumerOffsetMap = getTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO);
}
Map<Integer, Long> partitionOffsetMap = new HashMap<>();
for (int partitionId = 0; partitionId < metadata.getPartitionNum(); ++partitionId) {
Long offset = allPartitionOffsetMap.get(new TopicPartition(topicName, partitionId));
if (offset == null) {
continue;
}
partitionOffsetMap.put(partitionId, offset);
}
ConsumerDTO consumerDTO = new ConsumerDTO();
consumerDTO.setClusterId(clusterDO.getId());
consumerDTO.setTopicName(topicName);
consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().getLocation());
consumerDTO.setPartitionOffsetMap(partitionOffsetMap);
consumerDTO.setConsumerOffsetMap(consumerOffsetMap);
dtoList.add(consumerDTO);
}
return dtoList;
}
@Override
@@ -264,52 +298,15 @@ public class ConsumerServiceImpl implements ConsumerService {
kafkaConsumer.commitSync();
}
/**
* 获取属于该集群和consumerGroup下的所有topic的信息
*/
private Map<String, List<PartitionState>> getConsumerGroupPartitionStateList(ClusterDO clusterDO,
ConsumerGroupDTO consumerGroupDTO,
Map<String, List<PartitionState>> globalTopicNamePartitionStateListMap) {
Map<String, List<PartitionState>> topicNamePartitionStateListMap = new HashMap<>(2);
private Map<Integer, Long> getTopicConsumerOffsetInZK(ClusterDO clusterDO,
TopicMetadata topicMetadata,
ConsumerGroupDTO consumerGroupDTO) {
Map<Integer, Long> offsetMap = new HashMap<>();
List<String> topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(clusterDO.getId(),consumerGroupDTO.getOffsetStoreLocation().getLocation(), consumerGroupDTO.getConsumerGroup());
for (String topicName : topicNameList) {
if (!ClusterMetadataManager.isTopicExist(clusterDO.getId(), topicName)) {
continue;
}
List<PartitionState> partitionStateList = globalTopicNamePartitionStateListMap.get(topicName);
if (partitionStateList == null) {
try {
partitionStateList = zkService.getTopicPartitionState(clusterDO.getId(), topicName);
} catch (Exception e) {
logger.error("get topic partition state failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
}
if (partitionStateList == null) {
continue;
}
globalTopicNamePartitionStateListMap.put(topicName, partitionStateList);
}
List<PartitionState> consumerGroupPartitionStateList = new ArrayList<>();
for (PartitionState partitionState: partitionStateList) {
consumerGroupPartitionStateList.add((PartitionState) partitionState.clone());
}
if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) {
updateTopicConsumerOffsetInZK(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList);
} else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) {
updateTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList);
}
topicNamePartitionStateListMap.put(topicName, consumerGroupPartitionStateList);
}
return topicNamePartitionStateListMap;
}
private void updateTopicConsumerOffsetInZK(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List<PartitionState> partitionStateList) {
ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(cluster.getId());
for (PartitionState partitionState : partitionStateList) {
ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(clusterDO.getId());
for (int partitionId = 0; partitionId < topicMetadata.getPartitionNum(); ++partitionId) {
//offset存储于zk中
String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId());
String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicMetadata.getTopic(), partitionId);
String offset = null;
try {
Stat stat = zkConfig.getNodeStat(consumerGroupOffsetLocation);
@@ -317,39 +314,32 @@ public class ConsumerServiceImpl implements ConsumerService {
continue;
}
offset = zkConfig.get(consumerGroupOffsetLocation);
offsetMap.put(partitionId, Long.valueOf(offset));
} catch (ConfigException e) {
e.printStackTrace();
}
String consumerId = null;
try {
consumerId = zkConfig.get(ZkPathUtil.getConsumerGroupOwnersTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId()));
} catch (ConfigException e) {
// logger.error("get consumerId error in updateTopicConsumerOffsetInZK cluster:{} topic:{} consumerGroup:{}", cluster.getClusterName(), topicName, consumerGroupDTO.getConsumerGroup());
}
partitionState.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
updatePartitionStateOffset(partitionState, offset, consumerId);
}
return offsetMap;
}
private void updateTopicConsumerOffsetInBroker(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List<PartitionState> partitionStateList) {
Map<Integer, String> offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(cluster, consumerGroupDTO.getConsumerGroup(), topicName);
private Map<Integer, Long> getTopicConsumerOffsetInBroker(ClusterDO clusterDO,
String topicName,
ConsumerGroupDTO consumerGroupDTO) {
Map<Integer, String> offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(clusterDO, consumerGroupDTO.getConsumerGroup(), topicName);
if (offsetsFromBroker == null || offsetsFromBroker.isEmpty()) {
return;
return new HashMap<>(0);
}
for (PartitionState partitionState : partitionStateList) {
int partitionId = partitionState.getPartitionId();
updatePartitionStateOffset(partitionState, offsetsFromBroker.get(partitionId), null);
Map<Integer, Long> offsetMap = new HashMap<>(offsetsFromBroker.size());
for (Map.Entry<Integer, String> entry: offsetsFromBroker.entrySet()) {
try {
offsetMap.put(entry.getKey(), Long.valueOf(entry.getValue()));
} catch (Exception e) {
logger.error("get topic consumer offset failed, clusterId:{} topicName:{} consumerGroup:{}."
, clusterDO.getId(), topicName, consumerGroupDTO.getConsumerGroup());
}
}
}
private void updatePartitionStateOffset(PartitionState partitionState, String offset, String consumerId) {
partitionState.setConsumeOffset(0);
if (!StringUtils.isEmpty(offset)) {
partitionState.setConsumeOffset(Long.parseLong(offset));
}
partitionState.setConsumerGroup(consumerId);
return offsetMap;
}
private Map<Integer, String> getConsumeIdMap(Long clusterId, String topicName, String consumerGroup) {

View File

@@ -72,6 +72,9 @@ public class JmxServiceImpl implements JmxService {
List<Attribute> attributeValueList = null;
try {
attributeValueList = connection.getAttributes(new ObjectName(mbean.getObjectName()), properties).asList();
} catch (InstanceNotFoundException e) {
logger.warn("getSpecifiedBrokerMetricsFromJmx@JmxServiceImpl, get metrics fail, objectName:{}.", mbean.getObjectName(), e);
continue;
} catch (Exception e) {
logger.error("getSpecifiedBrokerMetricsFromJmx@JmxServiceImpl, get metrics fail, objectName:{}.", mbean.getObjectName(), e);
continue;

View File

@@ -51,7 +51,7 @@ public class OrderServiceImpl implements OrderService {
if (orderPartitionDO != null) {
orderPartitionDO.setOrderStatus(OrderStatusEnum.CANCELLED.getCode());
}
return modifyOrderPartition(orderPartitionDO, operator);
return modifyOrderPartition(orderPartitionDO, operator, false);
}
return new Result(StatusCode.PARAM_ERROR, "order type illegal");
}
@@ -74,10 +74,10 @@ public class OrderServiceImpl implements OrderService {
}
@Override
public Result modifyOrderPartition(OrderPartitionDO newOrderPartitionDO, String operator) {
public Result modifyOrderPartition(OrderPartitionDO newOrderPartitionDO, String operator, boolean admin) {
if (newOrderPartitionDO == null) {
return new Result(StatusCode.PARAM_ERROR, "param illegal, order not exist");
} else if (!newOrderPartitionDO.getApplicant().equals(operator)) {
} else if (!admin && !newOrderPartitionDO.getApplicant().equals(operator)) {
return new Result(StatusCode.PARAM_ERROR, "without authority to cancel the order");
}
OrderPartitionDO oldOrderPartitionDO = orderPartitionDao.getById(newOrderPartitionDO.getId());

View File

@@ -345,7 +345,7 @@ public class TopicServiceImpl implements TopicService {
} else {
topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true);
topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec());
topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec());
topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getBytesOutPerSec());
}
return topicOverviewDTO;
}

16
web/bin/shutdown.sh Normal file
View File

@@ -0,0 +1,16 @@
#!/usr/bin/env bash
cd `dirname $0`/../lib
lib_dir=`pwd`
pid=`ps ax | grep -i 'kafka-manager-web' | grep ${lib_dir} | grep java | grep -v grep | awk '{print $1}'`
if [ -z "$pid" ] ; then
echo "No kafka-manager-web running."
exit -1;
fi
echo "The kafka-manager-web(${pid}) is running..."
kill ${pid}
echo "Send shutdown request to kafka-manager-web(${pid}) OK"

46
web/bin/startup.sh Normal file
View File

@@ -0,0 +1,46 @@
#!/usr/bin/env bash
error_exit ()
{
echo "ERROR: $1 !!"
exit 1
}
if [ -z "$JAVA_HOME" ]; then
error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)! jdk8 or later is better!"
fi
export WEB_SERVER="kafka-manager-web-*"
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=`cd $(dirname $0)/..; pwd`
export DEFAULT_SEARCH_LOCATIONS="classpath:/,classpath:/config/,file:./,file:./config/"
export CUSTOM_SEARCH_LOCATIONS=${DEFAULT_SEARCH_LOCATIONS},file:${BASE_DIR}/conf/
#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${BASE_DIR}/logs/java_heapdump.hprof"
JAVA_OPT="${JAVA_OPT} -Xloggc:${BASE_DIR}/logs/gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
JAVA_OPT="${JAVA_OPT} -jar ${BASE_DIR}/lib/${WEB_SERVER}.jar"
JAVA_OPT="${JAVA_OPT} --spring.config.location=${CUSTOM_SEARCH_LOCATIONS}"
JAVA_OPT="${JAVA_OPT} --logging.config=${BASE_DIR}/conf/logback-spring.xml"
JAVA_OPT="${JAVA_OPT} --server.max-http-header-size=524288"
if [ ! -d "${BASE_DIR}/logs" ]; then
mkdir ${BASE_DIR}/logs
fi
echo "$JAVA ${JAVA_OPT}"
# check the start.out log output file
if [ ! -f "${BASE_DIR}/logs/start.out" ]; then
touch "${BASE_DIR}/logs/start.out"
fi
# start
echo "$JAVA ${JAVA_OPT}" > ${BASE_DIR}/logs/start.out 2>&1 &
nohup $JAVA ${JAVA_OPT} >> ${BASE_DIR}/logs/start.out 2>&1 &
echo "kafka-manager is startingyou can check the ${BASE_DIR}/logs/start.out"

32
web/conf/application.yml Normal file
View File

@@ -0,0 +1,32 @@
server:
port: 8080
tomcat:
accept-count: 100
max-connections: 1000
max-threads: 20
min-spare-threads: 20
spring:
application:
name: kafkamanager
datasource:
kafka-manager:
jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: admin
password: admin
driver-class-name: org.mariadb.jdbc.Driver
main:
allow-bean-definition-overriding: true
profiles:
active: dev
logging:
config: classpath:logback-spring.xml
# kafka监控
kafka-monitor:
enabled: true
notify-kafka:
cluster-id: 95
topic-name: kmo_monitor

View File

@@ -4,13 +4,13 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-manager-web</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<artifactId>kafka-manager</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
</parent>
<properties>
@@ -110,6 +110,26 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>kafka-manager-${project.version}</finalName>
<descriptors>
<descriptor>./src/main/resources/assembly.xml</descriptor>
</descriptors>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -176,7 +176,7 @@ public class OrderController {
TopicDO topicInfoDO = OrderConverter.convert2TopicInfoDO(orderTopicDO);
List<Integer> brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList());
Properties topicConfig = new Properties();
topicConfig.setProperty("retention.ms", String.valueOf(reqObj.getRetentionTime()));
topicConfig.setProperty("retention.ms", String.valueOf(reqObj.getRetentionTime() * 60 * 60 * 1000));
try {
TopicMetadata topicMetadata = new TopicMetadata();
topicMetadata.setTopic(orderTopicDO.getTopicName());
@@ -325,14 +325,16 @@ public class OrderController {
orderPartitionDO.setApprover(username);
orderPartitionDO.setOpinion(reqObj.getApprovalOpinions());
orderPartitionDO.setOrderStatus(reqObj.getOrderStatus());
result = orderService.modifyOrderPartition(orderPartitionDO, username);
result = orderService.modifyOrderPartition(orderPartitionDO, username, true);
if (!StatusCode.SUCCESS.equals(result.getCode())) {
return new Result(StatusCode.OPERATION_ERROR, "create topic success, but update order status failed, err:" + result.getMessage());
return new Result(StatusCode.OPERATION_ERROR, "expand topic success, but update order status failed, err:" + result.getMessage());
}
return new Result();
}
private Result expandTopic(ClusterDO clusterDO, OrderPartitionExecModel reqObj, OrderPartitionDO orderPartitionDO) {
private Result expandTopic(ClusterDO clusterDO,
OrderPartitionExecModel reqObj,
OrderPartitionDO orderPartitionDO) {
List<Integer> brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList());
try {
TopicMetadata topicMetadata = new TopicMetadata();
@@ -343,6 +345,8 @@ public class OrderController {
if (!AdminTopicStatusEnum.SUCCESS.equals(adminTopicStatusEnum)) {
return new Result(StatusCode.OPERATION_ERROR, adminTopicStatusEnum.getMessage());
}
orderPartitionDO.setPartitionNum(reqObj.getPartitionNum());
orderPartitionDO.setBrokerList(ListUtils.intList2String(brokerIdList));
} catch (Exception e) {
logger.error("expandTopic@OrderController, create failed, req:{}.", reqObj);
return new Result(StatusCode.OPERATION_ERROR, Constant.KAFKA_MANAGER_INNER_ERROR);

View File

@@ -78,6 +78,7 @@ public class BrokerModelConverter {
Double bytesInPerSec = brokerOverallDTO.getBytesInPerSec() / 1024.0 / 1024.0;
brokerOverviewVO.setBytesInPerSec(Math.round(bytesInPerSec * 100) / 100.0);
}
brokerOverviewVO.setUnderReplicatedPartitionCount(brokerOverallDTO.getUnderReplicatedPartitions());
brokerOverviewVO.setLeaderCount(brokerOverallDTO.getLeaderCount());
if (brokerOverallDTO.getPartitionCount() != null && brokerOverallDTO.getUnderReplicatedPartitions() != null) {
brokerOverviewVO.setNotUnderReplicatedPartitionCount(brokerOverallDTO.getPartitionCount() - brokerOverallDTO.getUnderReplicatedPartitions());

View File

@@ -86,7 +86,8 @@ public class OrderConverter {
public static OrderPartitionVO convert2OrderPartitionVO(OrderPartitionDO orderPartitionDO,
TopicMetadata topicMetadata,
Long maxAvgBytes, List<RegionDO> regionDOList) {
Long maxAvgBytes,
List<RegionDO> regionDOList) {
if (orderPartitionDO == null) {
return null;
}
@@ -100,8 +101,12 @@ public class OrderConverter {
if (topicMetadata == null) {
return orderPartitionVO;
}
orderPartitionVO.setPartitionNum(topicMetadata.getPartitionNum());
orderPartitionVO.setPartitionNum(null);
orderPartitionVO.setBrokerIdList(new ArrayList<>(topicMetadata.getBrokerIdSet()));
if (OrderStatusEnum.PASSED.getCode().equals(orderPartitionDO.getOrderStatus())) {
orderPartitionVO.setPartitionNum(orderPartitionDO.getPartitionNum());
}
if (regionDOList == null || regionDOList.isEmpty()) {
orderPartitionVO.setRegionNameList(new ArrayList<>());

View File

@@ -30,9 +30,13 @@ public class BrokerOverallVO {
@ApiModelProperty(value = "分区数")
private Integer partitionCount;
@ApiModelProperty(value = "未同步分区数")
@Deprecated
@ApiModelProperty(value = "同步分区数")
private Integer notUnderReplicatedPartitionCount;
@ApiModelProperty(value = "未同步分区数")
private Integer underReplicatedPartitionCount;
@ApiModelProperty(value = "leader数")
private Integer leaderCount;
@@ -103,6 +107,14 @@ public class BrokerOverallVO {
this.notUnderReplicatedPartitionCount = notUnderReplicatedPartitionCount;
}
public Integer getUnderReplicatedPartitionCount() {
return underReplicatedPartitionCount;
}
public void setUnderReplicatedPartitionCount(Integer underReplicatedPartitionCount) {
this.underReplicatedPartitionCount = underReplicatedPartitionCount;
}
public Integer getLeaderCount() {
return leaderCount;
}
@@ -130,6 +142,7 @@ public class BrokerOverallVO {
", bytesInPerSec=" + bytesInPerSec +
", partitionCount=" + partitionCount +
", notUnderReplicatedPartitionCount=" + notUnderReplicatedPartitionCount +
", underReplicatedPartitionCount=" + underReplicatedPartitionCount +
", leaderCount=" + leaderCount +
", regionName='" + regionName + '\'' +
'}';

View File

@@ -0,0 +1,33 @@
server:
port: 8080
tomcat:
accept-count: 100
max-connections: 1000
max-threads: 20
min-spare-threads: 20
spring:
application:
name: kafkamanager
datasource:
kafka-manager:
driver-class-name: org.postgresql.Driver
jdbc-url: jdbc:postgresql://localhost:5432/kafka_manager?reWriteBatchedInserts=true
username: admin
password: admin
type: com.zaxxer.hikari.HikariDataSource
hikari:
connection-init-sql: "SET TIME ZONE 'Asia/Chongqing';SET CLIENT_ENCODING TO 'UTF-8';"
connection-test-query: "select 1;"
main:
allow-bean-definition-overriding: true
logging:
config: classpath:logback-spring.xml
# kafka监控
kafka-monitor:
enabled: true
notify-kafka:
cluster-id: 95
topic-name: kmo_monitor

View File

@@ -11,16 +11,13 @@ spring:
name: kafkamanager
datasource:
kafka-manager:
driver-class-name: org.mariadb.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: admin
password: admin
driver-class-name: org.mariadb.jdbc.Driver
main:
allow-bean-definition-overriding: true
profiles:
active: dev
logging:
config: classpath:logback-spring.xml

View File

@@ -0,0 +1,34 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>bin</id>
<formats>
<format>dir</format>
<format>tar.gz</format>
</formats>
<fileSets>
<fileSet>
<includes>
<include>./bin/*</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>./src/main/resources/</directory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>application.yml</include>
<include>logback-spring.xml</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.build.directory}</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>