mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-07 06:02:07 +08:00
Merge branch 'master' into dev
This commit is contained in:
7
Dockerfile
Normal file
7
Dockerfile
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
FROM fabric8/java-alpine-openjdk8-jdk
|
||||||
|
MAINTAINER xuzhengxi
|
||||||
|
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
|
||||||
|
ADD ./web/target/kafka-manager-web-1.0.0-SNAPSHOT.jar kafka-manager-web.jar
|
||||||
|
ADD ./docker/kafka-manager/application-standalone.yml application.yml
|
||||||
|
ENTRYPOINT ["java","-jar","/kafka-manager-web.jar","--spring.config.location=./application.yml"]
|
||||||
|
EXPOSE 8080
|
||||||
22
README.md
22
README.md
@@ -45,12 +45,14 @@
|
|||||||
- `Maven 3.5.0+`(后端打包依赖)
|
- `Maven 3.5.0+`(后端打包依赖)
|
||||||
- `node v8.12.0+`(前端打包依赖)
|
- `node v8.12.0+`(前端打包依赖)
|
||||||
- `Java 8+`(运行环境需要)
|
- `Java 8+`(运行环境需要)
|
||||||
- `MySQL`(数据存储)
|
- `MySQL` 或 `PostgreSQL`(数据存储)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### 环境初始化
|
### 环境初始化
|
||||||
|
|
||||||
|
**MySQL**
|
||||||
|
|
||||||
执行[create_mysql_table.sql](doc/create_mysql_table.sql)中的SQL命令,从而创建所需的MySQL库及表,默认创建的库名是`kafka_manager`。
|
执行[create_mysql_table.sql](doc/create_mysql_table.sql)中的SQL命令,从而创建所需的MySQL库及表,默认创建的库名是`kafka_manager`。
|
||||||
|
|
||||||
```
|
```
|
||||||
@@ -58,6 +60,24 @@
|
|||||||
mysql -uXXXX -pXXX -h XXX.XXX.XXX.XXX -PXXXX < ./create_mysql_table.sql
|
mysql -uXXXX -pXXX -h XXX.XXX.XXX.XXX -PXXXX < ./create_mysql_table.sql
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**PostgreSQL**
|
||||||
|
|
||||||
|
执行[create_postgresql_table.sql](doc/create_postgresql_table.sql)中的SQL命令,从而创建所需的PostgreSQL表。
|
||||||
|
|
||||||
|
```
|
||||||
|
############# 示例:
|
||||||
|
psql -h XXX.XXX.XXX.XXX -U XXXX -d kafka_manager -f ./create_postgresql_table.sql
|
||||||
|
```
|
||||||
|
|
||||||
|
*PostgreSQL 用户、数据库创建方式*
|
||||||
|
|
||||||
|
```sql
|
||||||
|
create user admin encrypted password 'admin';
|
||||||
|
create database kafka_manager owner=admin template=template0 encoding='UTF-8' lc_collate='zh_CN.UTF-8' lc_ctype='zh_CN.UTF-8';
|
||||||
|
```
|
||||||
|
|
||||||
|
***默认配置使用 MySQL 数据库,若要使用 PostgreSQL 数据库,使用 `-Dspring.profiles.active=pg` 指定 `application-pg.yml` 配置文件。***
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,5 @@
|
|||||||
package com.xiaojukeji.kafka.manager.common.entity.dto.consumer;
|
package com.xiaojukeji.kafka.manager.common.entity.dto.consumer;
|
||||||
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -11,20 +8,33 @@ import java.util.Map;
|
|||||||
* @date 2015/11/12
|
* @date 2015/11/12
|
||||||
*/
|
*/
|
||||||
public class ConsumerDTO {
|
public class ConsumerDTO {
|
||||||
/**
|
private Long clusterId;
|
||||||
* 消费group名
|
|
||||||
*/
|
private String topicName;
|
||||||
|
|
||||||
private String consumerGroup;
|
private String consumerGroup;
|
||||||
|
|
||||||
/**
|
|
||||||
* 消费类型,一般为static
|
|
||||||
*/
|
|
||||||
private String location;
|
private String location;
|
||||||
|
|
||||||
/**
|
private Map<Integer, Long> partitionOffsetMap;
|
||||||
* 订阅的每个topic的partition状态列表
|
|
||||||
*/
|
private Map<Integer, Long> consumerOffsetMap;
|
||||||
private Map<String, List<PartitionState>> topicPartitionMap;
|
|
||||||
|
public Long getClusterId() {
|
||||||
|
return clusterId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClusterId(Long clusterId) {
|
||||||
|
this.clusterId = clusterId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTopicName() {
|
||||||
|
return topicName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTopicName(String topicName) {
|
||||||
|
this.topicName = topicName;
|
||||||
|
}
|
||||||
|
|
||||||
public String getConsumerGroup() {
|
public String getConsumerGroup() {
|
||||||
return consumerGroup;
|
return consumerGroup;
|
||||||
@@ -42,20 +52,31 @@ public class ConsumerDTO {
|
|||||||
this.location = location;
|
this.location = location;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, List<PartitionState>> getTopicPartitionMap() {
|
public Map<Integer, Long> getPartitionOffsetMap() {
|
||||||
return topicPartitionMap;
|
return partitionOffsetMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTopicPartitionMap(Map<String, List<PartitionState>> topicPartitionMap) {
|
public void setPartitionOffsetMap(Map<Integer, Long> partitionOffsetMap) {
|
||||||
this.topicPartitionMap = topicPartitionMap;
|
this.partitionOffsetMap = partitionOffsetMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<Integer, Long> getConsumerOffsetMap() {
|
||||||
|
return consumerOffsetMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConsumerOffsetMap(Map<Integer, Long> consumerOffsetMap) {
|
||||||
|
this.consumerOffsetMap = consumerOffsetMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Consumer{" +
|
return "ConsumerDTO{" +
|
||||||
"consumerGroup='" + consumerGroup + '\'' +
|
"clusterId=" + clusterId +
|
||||||
|
", topicName='" + topicName + '\'' +
|
||||||
|
", consumerGroup='" + consumerGroup + '\'' +
|
||||||
", location='" + location + '\'' +
|
", location='" + location + '\'' +
|
||||||
", topicPartitionMap=" + topicPartitionMap +
|
", partitionOffsetMap=" + partitionOffsetMap +
|
||||||
|
", consumerOffsetMap=" + consumerOffsetMap +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,10 @@ public class OrderPartitionDO extends BaseDO{
|
|||||||
|
|
||||||
private String applicant;
|
private String applicant;
|
||||||
|
|
||||||
|
private Integer partitionNum;
|
||||||
|
|
||||||
|
private String brokerList;
|
||||||
|
|
||||||
private Long peakBytesIn;
|
private Long peakBytesIn;
|
||||||
|
|
||||||
private String description;
|
private String description;
|
||||||
@@ -51,6 +55,22 @@ public class OrderPartitionDO extends BaseDO{
|
|||||||
this.applicant = applicant;
|
this.applicant = applicant;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Integer getPartitionNum() {
|
||||||
|
return partitionNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPartitionNum(Integer partitionNum) {
|
||||||
|
this.partitionNum = partitionNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBrokerList() {
|
||||||
|
return brokerList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBrokerList(String brokerList) {
|
||||||
|
this.brokerList = brokerList;
|
||||||
|
}
|
||||||
|
|
||||||
public Long getPeakBytesIn() {
|
public Long getPeakBytesIn() {
|
||||||
return peakBytesIn;
|
return peakBytesIn;
|
||||||
}
|
}
|
||||||
@@ -98,6 +118,8 @@ public class OrderPartitionDO extends BaseDO{
|
|||||||
", clusterName='" + clusterName + '\'' +
|
", clusterName='" + clusterName + '\'' +
|
||||||
", topicName='" + topicName + '\'' +
|
", topicName='" + topicName + '\'' +
|
||||||
", applicant='" + applicant + '\'' +
|
", applicant='" + applicant + '\'' +
|
||||||
|
", partitionNum=" + partitionNum +
|
||||||
|
", brokerList='" + brokerList + '\'' +
|
||||||
", peakBytesIn=" + peakBytesIn +
|
", peakBytesIn=" + peakBytesIn +
|
||||||
", description='" + description + '\'' +
|
", description='" + description + '\'' +
|
||||||
", orderStatus=" + orderStatus +
|
", orderStatus=" + orderStatus +
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import { IClusterData } from 'types/base-type';
|
|||||||
|
|
||||||
const TabPane = Tabs.TabPane;
|
const TabPane = Tabs.TabPane;
|
||||||
|
|
||||||
const detailUrl ='/admin/cluster_detail?clusterId=';
|
const detailUrl = '/admin/cluster_detail?clusterId=';
|
||||||
|
|
||||||
const collectionColumns: Array<ColumnProps<IClusterData>> = [
|
const collectionColumns: Array<ColumnProps<IClusterData>> = [
|
||||||
{
|
{
|
||||||
@@ -24,7 +24,8 @@ const collectionColumns: Array<ColumnProps<IClusterData>> = [
|
|||||||
key: 'clusterName',
|
key: 'clusterName',
|
||||||
sorter: (a: IClusterData, b: IClusterData) => a.clusterName.charCodeAt(0) - b.clusterName.charCodeAt(0),
|
sorter: (a: IClusterData, b: IClusterData) => a.clusterName.charCodeAt(0) - b.clusterName.charCodeAt(0),
|
||||||
render: (text, record) => {
|
render: (text, record) => {
|
||||||
return <a href={`${detailUrl}${record.clusterId}`}>{record.clusterName}</a>;
|
const url = `${detailUrl}${record.clusterId}&clusterName=${record.clusterName}`;
|
||||||
|
return <a href={encodeURI(url)}>{record.clusterName}</a>;
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -39,10 +39,10 @@ export class UserManage extends SearchAndFilter {
|
|||||||
public renderColumns = () => {
|
public renderColumns = () => {
|
||||||
const role = Object.assign({
|
const role = Object.assign({
|
||||||
title: '角色',
|
title: '角色',
|
||||||
key: 'role',
|
key: 'roleName',
|
||||||
dataIndex: 'role',
|
dataIndex: 'roleName',
|
||||||
filters: users.filterRole,
|
filters: users.filterRole,
|
||||||
onFilter: (value: string, record: any) => record.role.indexOf(value) === 0,
|
onFilter: (value: string, record: any) => record.roleName.indexOf(value) === 0,
|
||||||
}, this.renderColumnsFilter('filterVisible'));
|
}, this.renderColumnsFilter('filterVisible'));
|
||||||
|
|
||||||
return [
|
return [
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import * as React from 'react';
|
import * as React from 'react';
|
||||||
import './index.less';
|
import './index.less';
|
||||||
import { Table, Modal, notification, PaginationConfig, Button } from 'component/antd';
|
import { Table, Modal, notification, PaginationConfig, Button, Spin } from 'component/antd';
|
||||||
import { broker, IBroker, IBrokerNetworkInfo, IBrokerPartition } from 'store/broker';
|
import { broker, IBroker, IBrokerNetworkInfo, IBrokerPartition } from 'store/broker';
|
||||||
import { observer } from 'mobx-react';
|
import { observer } from 'mobx-react';
|
||||||
import { StatusGraghCom } from 'component/flow-table';
|
import { StatusGraghCom } from 'component/flow-table';
|
||||||
@@ -49,10 +49,19 @@ export class BrokerList extends SearchAndFilter {
|
|||||||
|
|
||||||
const status = Object.assign({
|
const status = Object.assign({
|
||||||
title: '已同步',
|
title: '已同步',
|
||||||
dataIndex: 'status',
|
dataIndex: 'underReplicatedPartitionCount',
|
||||||
key: 'status',
|
key: 'underReplicatedPartitionCount',
|
||||||
filters: [{ text: '是', value: '是' }, { text: '否', value: '否' }],
|
filters: [{ text: '是', value: '1' }, { text: '否', value: '0' }],
|
||||||
onFilter: (value: string, record: IBrokerPartition) => record.status === value,
|
onFilter: (value: string, record: IBrokerPartition) => {
|
||||||
|
// underReplicatedPartitionCount > 0 表示未同步完成
|
||||||
|
const syncStatus = record.underReplicatedPartitionCount ? '0' : '1';
|
||||||
|
return syncStatus === value;
|
||||||
|
},
|
||||||
|
render: (text: number) => (
|
||||||
|
<>
|
||||||
|
<span style={{ marginRight: 8 }}>{text ? '否' : '是'}</span>
|
||||||
|
</>
|
||||||
|
),
|
||||||
}, this.renderColumnsFilter('filterVisible'));
|
}, this.renderColumnsFilter('filterVisible'));
|
||||||
|
|
||||||
return [{
|
return [{
|
||||||
@@ -80,7 +89,8 @@ export class BrokerList extends SearchAndFilter {
|
|||||||
title: '未同步副本数量',
|
title: '未同步副本数量',
|
||||||
dataIndex: 'notUnderReplicatedPartitionCount',
|
dataIndex: 'notUnderReplicatedPartitionCount',
|
||||||
key: 'notUnderReplicatedPartitionCount',
|
key: 'notUnderReplicatedPartitionCount',
|
||||||
sorter: (a: IBrokerPartition, b: IBrokerPartition) => a.notUnderReplicatedPartitionCount - b.notUnderReplicatedPartitionCount,
|
sorter: (a: IBrokerPartition, b: IBrokerPartition) =>
|
||||||
|
a.notUnderReplicatedPartitionCount - b.notUnderReplicatedPartitionCount,
|
||||||
},
|
},
|
||||||
status,
|
status,
|
||||||
region,
|
region,
|
||||||
@@ -205,7 +215,7 @@ export class BrokerList extends SearchAndFilter {
|
|||||||
const dataPartitions = this.state.searchId !== '' ?
|
const dataPartitions = this.state.searchId !== '' ?
|
||||||
broker.partitions.filter((d) => d.brokerId === +this.state.searchId) : broker.partitions;
|
broker.partitions.filter((d) => d.brokerId === +this.state.searchId) : broker.partitions;
|
||||||
return (
|
return (
|
||||||
<>
|
<Spin spinning={broker.loading}>
|
||||||
<div className="k-row">
|
<div className="k-row">
|
||||||
<ul className="k-tab">
|
<ul className="k-tab">
|
||||||
<li>Broker概览</li>
|
<li>Broker概览</li>
|
||||||
@@ -239,7 +249,7 @@ export class BrokerList extends SearchAndFilter {
|
|||||||
pagination={pagination}
|
pagination={pagination}
|
||||||
/>
|
/>
|
||||||
</div>
|
</div>
|
||||||
</>
|
</Spin>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,14 +45,19 @@ class LeaderRebalance extends React.Component<any> {
|
|||||||
constructor(props: any) {
|
constructor(props: any) {
|
||||||
super(props);
|
super(props);
|
||||||
const url = Url();
|
const url = Url();
|
||||||
this.clusterName = decodeURI(atob(url.search.clusterName));
|
if (url.search.clusterName) {
|
||||||
|
this.clusterName = decodeURI(url.search.clusterName);
|
||||||
|
}
|
||||||
this.clusterId = Number(url.search.clusterId);
|
this.clusterId = Number(url.search.clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public handleSubmit = (e: React.MouseEvent<any, MouseEvent>) => {
|
public handleSubmit = (e: React.MouseEvent<any, MouseEvent>) => {
|
||||||
e.preventDefault();
|
e.preventDefault();
|
||||||
this.setState({ loading: true });
|
|
||||||
this.props.form.validateFieldsAndScroll((err: any, values: any) => {
|
this.props.form.validateFieldsAndScroll((err: any, values: any) => {
|
||||||
|
if (err) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.setState({ loading: true });
|
||||||
this.brokerId = Number(values.brokerId);
|
this.brokerId = Number(values.brokerId);
|
||||||
addRebalance({ brokerId: this.brokerId, clusterId: this.clusterId, dimension: 0 }).then(() => {
|
addRebalance({ brokerId: this.brokerId, clusterId: this.clusterId, dimension: 0 }).then(() => {
|
||||||
cluster.getRebalance(this.clusterId).then(() => {
|
cluster.getRebalance(this.clusterId).then(() => {
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ export interface IBrokerPartition extends IBroker {
|
|||||||
leaderCount: number;
|
leaderCount: number;
|
||||||
partitionCount: number;
|
partitionCount: number;
|
||||||
notUnderReplicatedPartitionCount: number;
|
notUnderReplicatedPartitionCount: number;
|
||||||
|
underReplicatedPartitionCount?: number;
|
||||||
regionName: string;
|
regionName: string;
|
||||||
bytesInPerSec: number;
|
bytesInPerSec: number;
|
||||||
}
|
}
|
||||||
@@ -74,6 +75,9 @@ interface IBrokerOption {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class Broker {
|
class Broker {
|
||||||
|
@observable
|
||||||
|
public loading: boolean = false;
|
||||||
|
|
||||||
@observable
|
@observable
|
||||||
public brokerBaseInfo: IBrokerBaseInfo = {} as IBrokerBaseInfo;
|
public brokerBaseInfo: IBrokerBaseInfo = {} as IBrokerBaseInfo;
|
||||||
|
|
||||||
@@ -119,6 +123,11 @@ class Broker {
|
|||||||
@observable
|
@observable
|
||||||
public BrokerOptions: IValueLabel[] = [{ value: null, label: '请选择Broker' }];
|
public BrokerOptions: IValueLabel[] = [{ value: null, label: '请选择Broker' }];
|
||||||
|
|
||||||
|
@action.bound
|
||||||
|
public setLoading(value: boolean) {
|
||||||
|
this.loading = value;
|
||||||
|
}
|
||||||
|
|
||||||
@action.bound
|
@action.bound
|
||||||
public setBrokerBaseInfo(data: IBrokerBaseInfo) {
|
public setBrokerBaseInfo(data: IBrokerBaseInfo) {
|
||||||
data.startTime = moment(data.startTime).format('YYYY-MM-DD HH:mm:ss'),
|
data.startTime = moment(data.startTime).format('YYYY-MM-DD HH:mm:ss'),
|
||||||
@@ -216,7 +225,8 @@ class Broker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public getBrokerList(clusterId: number) {
|
public getBrokerList(clusterId: number) {
|
||||||
getBrokerList(clusterId).then(this.setBrokerList);
|
this.setLoading(true);
|
||||||
|
getBrokerList(clusterId).then(this.setBrokerList).finally(() => this.setLoading(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
public getBrokerNetwork(clusterId: number) {
|
public getBrokerNetwork(clusterId: number) {
|
||||||
@@ -224,7 +234,8 @@ class Broker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public getBrokerPartition(clusterId: number) {
|
public getBrokerPartition(clusterId: number) {
|
||||||
getBrokerPartition(clusterId).then(this.setBrokerPartition);
|
this.setLoading(true);
|
||||||
|
getBrokerPartition(clusterId).then(this.setBrokerPartition).finally(() => this.setLoading(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
public getOneBrokerNetwork(clusterId: number, brokerId: number) {
|
public getOneBrokerNetwork(clusterId: number, brokerId: number) {
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ export class Users {
|
|||||||
@action.bound
|
@action.bound
|
||||||
public setUserData(data: []) {
|
public setUserData(data: []) {
|
||||||
this.userData = data.map((d: any) => {
|
this.userData = data.map((d: any) => {
|
||||||
d.role = this.roleMap[d.role];
|
d.roleName = this.roleMap[d.role];
|
||||||
return d;
|
return d;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,5 +43,9 @@
|
|||||||
<artifactId>mariadb-java-client</artifactId>
|
<artifactId>mariadb-java-client</artifactId>
|
||||||
<version>2.5.4</version>
|
<version>2.5.4</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.postgresql</groupId>
|
||||||
|
<artifactId>postgresql</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
@@ -18,6 +18,9 @@ public class AccountDaoImpl implements AccountDao {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private SqlSessionTemplate sqlSession;
|
private SqlSessionTemplate sqlSession;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaManagerProperties kafkaManagerProperties;
|
||||||
|
|
||||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||||
this.sqlSession = sqlSession;
|
this.sqlSession = sqlSession;
|
||||||
}
|
}
|
||||||
@@ -25,7 +28,7 @@ public class AccountDaoImpl implements AccountDao {
|
|||||||
@Override
|
@Override
|
||||||
public int addNewAccount(AccountDO accountDO) {
|
public int addNewAccount(AccountDO accountDO) {
|
||||||
accountDO.setStatus(DBStatusEnum.NORMAL.getStatus());
|
accountDO.setStatus(DBStatusEnum.NORMAL.getStatus());
|
||||||
return sqlSession.insert("AccountDao.insert", accountDO);
|
return updateAccount(accountDO);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -35,6 +38,9 @@ public class AccountDaoImpl implements AccountDao {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int updateAccount(AccountDO accountDO) {
|
public int updateAccount(AccountDO accountDO) {
|
||||||
|
if (kafkaManagerProperties.hasPG()) {
|
||||||
|
return sqlSession.insert("AccountDao.insertOnPG", accountDO);
|
||||||
|
}
|
||||||
return sqlSession.insert("AccountDao.insert", accountDO);
|
return sqlSession.insert("AccountDao.insert", accountDO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,12 +19,18 @@ public class BrokerDaoImpl implements BrokerDao {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private SqlSessionTemplate sqlSession;
|
private SqlSessionTemplate sqlSession;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaManagerProperties kafkaManagerProperties;
|
||||||
|
|
||||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||||
this.sqlSession = sqlSession;
|
this.sqlSession = sqlSession;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int replace(BrokerDO brokerInfoDO) {
|
public int replace(BrokerDO brokerInfoDO) {
|
||||||
|
if (kafkaManagerProperties.hasPG()) {
|
||||||
|
return sqlSession.insert("BrokerDao.replaceOnPG", brokerInfoDO);
|
||||||
|
}
|
||||||
return sqlSession.insert("BrokerDao.replace", brokerInfoDO);
|
return sqlSession.insert("BrokerDao.replace", brokerInfoDO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,22 @@
|
|||||||
|
package com.xiaojukeji.kafka.manager.dao.impl;
|
||||||
|
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@ConfigurationProperties("spring.datasource.kafka-manager")
|
||||||
|
public class KafkaManagerProperties {
|
||||||
|
private String jdbcUrl;
|
||||||
|
|
||||||
|
public String getJdbcUrl() {
|
||||||
|
return jdbcUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJdbcUrl(String jdbcUrl) {
|
||||||
|
this.jdbcUrl = jdbcUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasPG() {
|
||||||
|
return jdbcUrl.startsWith("jdbc:postgres");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -17,12 +17,18 @@ public class RegionDaoImpl implements RegionDao {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private SqlSessionTemplate sqlSession;
|
private SqlSessionTemplate sqlSession;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaManagerProperties kafkaManagerProperties;
|
||||||
|
|
||||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||||
this.sqlSession = sqlSession;
|
this.sqlSession = sqlSession;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int insert(RegionDO regionDO) {
|
public int insert(RegionDO regionDO) {
|
||||||
|
if (kafkaManagerProperties.hasPG()) {
|
||||||
|
return sqlSession.insert("RegionDao.insertOnPG", regionDO);
|
||||||
|
}
|
||||||
return sqlSession.insert("RegionDao.insert", regionDO);
|
return sqlSession.insert("RegionDao.insert", regionDO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,12 +19,18 @@ public class TopicDaoImpl implements TopicDao {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private SqlSessionTemplate sqlSession;
|
private SqlSessionTemplate sqlSession;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaManagerProperties kafkaManagerProperties;
|
||||||
|
|
||||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||||
this.sqlSession = sqlSession;
|
this.sqlSession = sqlSession;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int replace(TopicDO topicDO) {
|
public int replace(TopicDO topicDO) {
|
||||||
|
if (kafkaManagerProperties.hasPG()) {
|
||||||
|
return sqlSession.insert("TopicDao.replaceOnPG", topicDO);
|
||||||
|
}
|
||||||
return sqlSession.insert("TopicDao.replace", topicDO);
|
return sqlSession.insert("TopicDao.replace", topicDO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -25,12 +25,18 @@ public class TopicFavoriteDaoImpl implements TopicFavoriteDao {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TransactionTemplate transactionTemplate;
|
private TransactionTemplate transactionTemplate;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaManagerProperties kafkaManagerProperties;
|
||||||
|
|
||||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||||
this.sqlSession = sqlSession;
|
this.sqlSession = sqlSession;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int batchAdd(List<TopicFavoriteDO> topicFavoriteDOList) {
|
public int batchAdd(List<TopicFavoriteDO> topicFavoriteDOList) {
|
||||||
|
if (kafkaManagerProperties.hasPG()) {
|
||||||
|
return sqlSession.insert("TopicFavoriteDao.batchAddOnPG", topicFavoriteDOList);
|
||||||
|
}
|
||||||
return sqlSession.insert("TopicFavoriteDao.batchAdd", topicFavoriteDOList);
|
return sqlSession.insert("TopicFavoriteDao.batchAdd", topicFavoriteDOList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,18 @@
|
|||||||
]]>
|
]]>
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
|
<insert id="insertOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.AccountDO">
|
||||||
|
<![CDATA[
|
||||||
|
insert into account
|
||||||
|
(username, password, role, status)
|
||||||
|
values
|
||||||
|
(#{username}, #{password}, #{role}, #{status})
|
||||||
|
on conflict (username) do update set password = excluded.password,
|
||||||
|
role = excluded.role,
|
||||||
|
status = excluded.status
|
||||||
|
]]>
|
||||||
|
</insert>
|
||||||
|
|
||||||
<delete id="deleteByName" parameterType="java.lang.String">
|
<delete id="deleteByName" parameterType="java.lang.String">
|
||||||
DELETE FROM account WHERE username = #{username}
|
DELETE FROM account WHERE username = #{username}
|
||||||
</delete>
|
</delete>
|
||||||
|
|||||||
@@ -20,6 +20,16 @@
|
|||||||
(#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
|
(#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
|
<insert id="replaceOnPG" parameterType="BrokerDO">
|
||||||
|
insert into broker
|
||||||
|
(cluster_id, broker_id, host, port, timestamp, status)
|
||||||
|
values (#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
|
||||||
|
on conflict (cluster_id, broker_id) do update set host = excluded.host,
|
||||||
|
port = excluded.port,
|
||||||
|
timestamp = excluded.timestamp,
|
||||||
|
status = excluded.status
|
||||||
|
</insert>
|
||||||
|
|
||||||
<delete id="deleteById" parameterType="java.util.Map">
|
<delete id="deleteById" parameterType="java.util.Map">
|
||||||
DELETE FROM broker WHERE cluster_id = #{clusterId} AND broker_id = #{brokerId}
|
DELETE FROM broker WHERE cluster_id = #{clusterId} AND broker_id = #{brokerId}
|
||||||
</delete>
|
</delete>
|
||||||
|
|||||||
@@ -11,6 +11,8 @@
|
|||||||
<result column="cluster_name" property="clusterName" />
|
<result column="cluster_name" property="clusterName" />
|
||||||
<result column="topic_name" property="topicName" />
|
<result column="topic_name" property="topicName" />
|
||||||
<result column="applicant" property="applicant" />
|
<result column="applicant" property="applicant" />
|
||||||
|
<result column="partition_num" property="partitionNum" />
|
||||||
|
<result column="broker_list" property="brokerList" />
|
||||||
<result column="peak_bytes_in" property="peakBytesIn" />
|
<result column="peak_bytes_in" property="peakBytesIn" />
|
||||||
<result column="description" property="description" />
|
<result column="description" property="description" />
|
||||||
<result column="order_status" property="orderStatus" />
|
<result column="order_status" property="orderStatus" />
|
||||||
@@ -38,6 +40,16 @@
|
|||||||
cluster_name=#{clusterName},
|
cluster_name=#{clusterName},
|
||||||
topic_name=#{topicName},
|
topic_name=#{topicName},
|
||||||
applicant=#{applicant},
|
applicant=#{applicant},
|
||||||
|
<trim>
|
||||||
|
<if test="partitionNum!=null">
|
||||||
|
partition_num=#{partitionNum},
|
||||||
|
</if>
|
||||||
|
</trim>
|
||||||
|
<trim>
|
||||||
|
<if test="brokerList!=null">
|
||||||
|
broker_list=#{brokerList},
|
||||||
|
</if>
|
||||||
|
</trim>
|
||||||
peak_bytes_in=#{peakBytesIn},
|
peak_bytes_in=#{peakBytesIn},
|
||||||
description=#{description},
|
description=#{description},
|
||||||
order_status=#{orderStatus},
|
order_status=#{orderStatus},
|
||||||
|
|||||||
@@ -21,6 +21,15 @@
|
|||||||
(#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
|
(#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
|
<insert id="insertOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.RegionDO">
|
||||||
|
insert into region
|
||||||
|
(region_name, cluster_id, broker_list, description, operator)
|
||||||
|
values (#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
|
||||||
|
on conflict (region_name, cluster_id) do update set broker_list = excluded.broker_list,
|
||||||
|
description = excluded.description,
|
||||||
|
operator = excluded.operator
|
||||||
|
</insert>
|
||||||
|
|
||||||
<delete id="deleteById" parameterType="java.lang.Long">
|
<delete id="deleteById" parameterType="java.lang.Long">
|
||||||
DELETE FROM region WHERE id = #{id}
|
DELETE FROM region WHERE id = #{id}
|
||||||
</delete>
|
</delete>
|
||||||
|
|||||||
@@ -19,6 +19,15 @@
|
|||||||
(#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
|
(#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
|
<insert id="replaceOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.TopicDO">
|
||||||
|
insert into topic
|
||||||
|
(cluster_id, topic_name, principals, description, status)
|
||||||
|
values (#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
|
||||||
|
on conflict (cluster_id, topic_name) do update set principals = excluded.principals,
|
||||||
|
description = excluded.description,
|
||||||
|
status = excluded.status
|
||||||
|
</insert>
|
||||||
|
|
||||||
<delete id="deleteById" parameterType="java.lang.Long">
|
<delete id="deleteById" parameterType="java.lang.Long">
|
||||||
DELETE FROM topic WHERE id = #{id}
|
DELETE FROM topic WHERE id = #{id}
|
||||||
</delete>
|
</delete>
|
||||||
|
|||||||
@@ -21,6 +21,15 @@
|
|||||||
</foreach>
|
</foreach>
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
|
<insert id="batchAddOnPG" parameterType="java.util.List">
|
||||||
|
insert into topic_favorite (cluster_id, topic_name, username)
|
||||||
|
values
|
||||||
|
<foreach item="TopicFavoriteDO" index="index" collection="list" separator=",">
|
||||||
|
(#{TopicFavoriteDO.clusterId}, #{TopicFavoriteDO.topicName}, #{TopicFavoriteDO.username})
|
||||||
|
</foreach>
|
||||||
|
on conflict (cluster_id, topic_name, username) do update set gmt_modify = now();
|
||||||
|
</insert>
|
||||||
|
|
||||||
<delete id="deleteById" parameterType="java.lang.Long">
|
<delete id="deleteById" parameterType="java.lang.Long">
|
||||||
DELETE FROM topic_favorite WHERE id=#{id}
|
DELETE FROM topic_favorite WHERE id=#{id}
|
||||||
</delete>
|
</delete>
|
||||||
|
|||||||
@@ -149,6 +149,8 @@ CREATE TABLE `order_partition` (
|
|||||||
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
|
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
|
||||||
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||||
|
`broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表, 逗号分割',
|
||||||
|
`partition_num` int(11) NOT NULL DEFAULT 0 COMMENT '新增分区数',
|
||||||
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
|
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
|
||||||
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量',
|
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量',
|
||||||
`description` text COMMENT '备注信息',
|
`description` text COMMENT '备注信息',
|
||||||
|
|||||||
325
doc/create_postgresql_table.sql
Normal file
325
doc/create_postgresql_table.sql
Normal file
@@ -0,0 +1,325 @@
|
|||||||
|
-- CREATE DATABASE kafka_manager;
|
||||||
|
-- \c kafka_manager;
|
||||||
|
SET TIME ZONE 'Asia/Chongqing';
|
||||||
|
SET CLIENT_ENCODING TO 'UTF-8';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION on_update_timestamp() RETURNS TRIGGER AS
|
||||||
|
$$
|
||||||
|
BEGIN
|
||||||
|
new.gmt_modify = current_timestamp;
|
||||||
|
return new;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- 账号表
|
||||||
|
CREATE TABLE account
|
||||||
|
(
|
||||||
|
id bigserial NOT NULL, -- 'ID',
|
||||||
|
username varchar(64) NOT NULL UNIQUE DEFAULT '', -- '用户名',
|
||||||
|
password varchar(128) NOT NULL DEFAULT '', -- '密码',
|
||||||
|
role int NOT NULL DEFAULT 0, -- '角色类型, 0:普通用户',
|
||||||
|
status int NOT NULL DEFAULT 0, -- '0标识使用中,-1标识已废弃',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||||
|
CONSTRAINT account_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX account_uniq_username ON account (username);
|
||||||
|
INSERT INTO account(username, password, role)
|
||||||
|
VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2);
|
||||||
|
CREATE TRIGGER account_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON account
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
-- 告警规则表
|
||||||
|
CREATE TABLE alarm_rule
|
||||||
|
(
|
||||||
|
id bigserial, -- '自增ID',
|
||||||
|
alarm_name varchar(128) NOT NULL DEFAULT '', -- '告警名字',
|
||||||
|
strategy_expressions text, -- '表达式',
|
||||||
|
strategy_filters text, -- '过滤条件',
|
||||||
|
strategy_actions text, -- '响应',
|
||||||
|
principals varchar(512) NOT NULL DEFAULT '', -- '负责人',
|
||||||
|
status int2 NOT NULL DEFAULT 1, -- '-1:逻辑删除, 0:关闭, 1:正常',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||||
|
CONSTRAINT alarm_rule_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX alarm_rule_uniq_alarm_name ON alarm_rule (alarm_name);
|
||||||
|
CREATE TRIGGER alarm_rule_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON alarm_rule
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
-- Broker信息表
|
||||||
|
CREATE TABLE broker
|
||||||
|
(
|
||||||
|
id bigserial, -- 'id',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID',
|
||||||
|
host varchar(128) NOT NULL DEFAULT '', -- 'Broker主机名',
|
||||||
|
port int NOT NULL DEFAULT '-1', -- 'Broker端口',
|
||||||
|
timestamp bigint NOT NULL DEFAULT '-1', -- '启动时间',
|
||||||
|
status int NOT NULL DEFAULT '0', -- '状态0有效,-1无效',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||||
|
CONSTRAINT broker_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX broker_uniq_cluster_id_broker_id ON broker (cluster_id, broker_id);
|
||||||
|
CREATE TRIGGER broker_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON broker
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
-- BrokerMetric信息表
|
||||||
|
CREATE TABLE broker_metrics
|
||||||
|
(
|
||||||
|
id bigserial, -- '自增id',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID',
|
||||||
|
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入',
|
||||||
|
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出',
|
||||||
|
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒被拒绝字节数',
|
||||||
|
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数流入',
|
||||||
|
fail_fetch_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费失败数',
|
||||||
|
fail_produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒失败生产数',
|
||||||
|
fetch_consumer_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费请求数',
|
||||||
|
produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒生产数',
|
||||||
|
request_handler_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '请求处理器繁忙百分比',
|
||||||
|
network_processor_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '网络处理器繁忙百分比',
|
||||||
|
request_queue_size bigint NOT NULL DEFAULT '0', -- '请求列表大小',
|
||||||
|
response_queue_size bigint NOT NULL DEFAULT '0', -- '响应列表大小',
|
||||||
|
log_flush_time decimal(53, 2) NOT NULL DEFAULT '0.00', -- '刷日志时间',
|
||||||
|
total_time_produce_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-平均值',
|
||||||
|
total_time_produce_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-99分位',
|
||||||
|
total_time_fetch_consumer_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-平均值',
|
||||||
|
total_time_fetch_consumer_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-99分位',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
CONSTRAINT broker_metrics_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE INDEX broker_metrics_idx_cluster_id_broker_id_gmt_create ON broker_metrics (cluster_id, broker_id, gmt_create);
|
||||||
|
|
||||||
|
-- Cluster表
|
||||||
|
CREATE TABLE cluster
|
||||||
|
(
|
||||||
|
id bigserial, -- '集群ID',
|
||||||
|
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
|
||||||
|
zookeeper varchar(512) NOT NULL DEFAULT '', -- 'ZK地址',
|
||||||
|
bootstrap_servers varchar(512) NOT NULL DEFAULT '', -- 'Server地址',
|
||||||
|
kafka_version varchar(32) NOT NULL DEFAULT '', -- 'Kafka版本',
|
||||||
|
alarm_flag int2 NOT NULL DEFAULT '0', -- '0:不开启告警, 1开启告警',
|
||||||
|
security_protocol varchar(512) NOT NULL DEFAULT '', -- '安全协议',
|
||||||
|
sasl_mechanism varchar(512) NOT NULL DEFAULT '', -- '安全机制',
|
||||||
|
sasl_jaas_config varchar(512) NOT NULL DEFAULT '', -- 'Jaas配置',
|
||||||
|
status int2 NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||||
|
CONSTRAINT cluster_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX cluster_uniq_cluster_name ON cluster (cluster_name);
|
||||||
|
CREATE TRIGGER cluster_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON cluster
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
-- ClusterMetrics信息
|
||||||
|
CREATE TABLE cluster_metrics
|
||||||
|
(
|
||||||
|
id bigserial, -- '自增id',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID',
|
||||||
|
topic_num int NOT NULL DEFAULT '0', -- 'Topic数',
|
||||||
|
partition_num int NOT NULL DEFAULT '0', -- '分区数',
|
||||||
|
broker_num int NOT NULL DEFAULT '0', -- 'Broker数',
|
||||||
|
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流入(B)',
|
||||||
|
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流出(B)',
|
||||||
|
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝(B)',
|
||||||
|
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数(条)',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
CONSTRAINT cluster_metrics_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE INDEX cluster_metrics_idx_cluster_id_gmt_create ON cluster_metrics (cluster_id, gmt_create);
|
||||||
|
|
||||||
|
-- Controller历史变更记录表
|
||||||
|
CREATE TABLE controller
|
||||||
|
(
|
||||||
|
id bigserial, -- '自增id',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerId',
|
||||||
|
host varchar(256) NOT NULL DEFAULT '', -- '主机名',
|
||||||
|
timestamp bigint NOT NULL DEFAULT '-1', -- 'Controller变更时间',
|
||||||
|
version int NOT NULL DEFAULT '-1', -- 'Controller格式版本',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
CONSTRAINT controller_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX controller_uniq_cluster_id_broker_id_timestamp ON controller (cluster_id, broker_id, timestamp);
|
||||||
|
|
||||||
|
-- Topic迁移信息
|
||||||
|
CREATE TABLE migration_task
|
||||||
|
(
|
||||||
|
id bigserial, -- '自增id',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID',
|
||||||
|
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||||
|
reassignment_json text, -- '任务参数',
|
||||||
|
real_throttle bigint NOT NULL DEFAULT '0', -- '实际限流值(B/s)',
|
||||||
|
operator varchar(128) NOT NULL DEFAULT '', -- '操作人',
|
||||||
|
description varchar(256) NOT NULL DEFAULT '', -- '备注说明',
|
||||||
|
status int NOT NULL DEFAULT '0', -- '任务状态',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务修改时间',
|
||||||
|
CONSTRAINT migration_task_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE TRIGGER migration_task_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON migration_task
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
CREATE TABLE operation_history
|
||||||
|
(
|
||||||
|
id bigserial, -- 'id',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||||
|
operator varchar(128) NOT NULL DEFAULT '', -- '操作人',
|
||||||
|
operation varchar(256) NOT NULL DEFAULT '', -- '操作描述',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
--='操作记录表';
|
||||||
|
|
||||||
|
-- 分区申请工单
|
||||||
|
CREATE TABLE order_partition
|
||||||
|
(
|
||||||
|
id bigserial, -- 'id',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
|
||||||
|
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||||
|
applicant varchar(128) NOT NULL DEFAULT '', -- '申请人',
|
||||||
|
partition_num int NOT NULL DEFAULT '0', -- '分区数',
|
||||||
|
broker_list varchar(128) NOT NULL DEFAULT '', -- 'Broker列表',
|
||||||
|
peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流量',
|
||||||
|
description text, -- '备注信息',
|
||||||
|
order_status int NOT NULL DEFAULT '0', -- '工单状态',
|
||||||
|
approver varchar(128) NOT NULL DEFAULT '', -- '审批人',
|
||||||
|
opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见',
|
||||||
|
status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||||
|
CONSTRAINT order_partition_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE TRIGGER order_partition_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON order_partition
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
-- Topic申请工单
|
||||||
|
CREATE TABLE order_topic
|
||||||
|
(
|
||||||
|
id bigserial, -- 'ID',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
|
||||||
|
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||||
|
retention_time bigint NOT NULL DEFAULT '-1', -- '保留时间(ms)',
|
||||||
|
partition_num int NOT NULL DEFAULT '-1', -- '分区数',
|
||||||
|
replica_num int NOT NULL DEFAULT '-1', -- '副本数',
|
||||||
|
regions varchar(128) NOT NULL DEFAULT '', -- 'RegionId列表',
|
||||||
|
brokers varchar(128) NOT NULL DEFAULT '', -- 'Broker列表',
|
||||||
|
peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流入流量(KB)',
|
||||||
|
applicant varchar(128) NOT NULL DEFAULT '', -- '申请人',
|
||||||
|
principals varchar(256) NOT NULL DEFAULT '', -- '负责人',
|
||||||
|
description text, -- '备注信息',
|
||||||
|
order_status int NOT NULL DEFAULT '0', -- '工单状态',
|
||||||
|
approver varchar(128) NOT NULL DEFAULT '', -- '审批人',
|
||||||
|
opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见',
|
||||||
|
status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||||
|
CONSTRAINT order_topic_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE TRIGGER order_topic_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON order_topic
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
-- Region信息表
|
||||||
|
CREATE TABLE region
|
||||||
|
(
|
||||||
|
id bigserial, -- 'id',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
region_name varchar(128) NOT NULL DEFAULT '', -- 'Region名称',
|
||||||
|
broker_list varchar(256) NOT NULL DEFAULT '', -- 'Broker列表',
|
||||||
|
level int NOT NULL DEFAULT '0', -- 'Region重要等级, 0级普通, 1极重要,2级极重要',
|
||||||
|
operator varchar(45) NOT NULL DEFAULT '', -- '操作人',
|
||||||
|
description text, -- '备注说明',
|
||||||
|
status int NOT NULL DEFAULT '0', -- '状态,0正常,-1废弃,1容量已满',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||||
|
CONSTRAINT region_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX region_uniq_cluster_id_region_name ON region (cluster_id, region_name);
|
||||||
|
CREATE TRIGGER region_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON region
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
-- Topic信息表
|
||||||
|
CREATE TABLE topic
|
||||||
|
(
|
||||||
|
id bigserial, -- 'ID',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||||
|
applicant varchar(256) NOT NULL DEFAULT '', -- '申请人',
|
||||||
|
principals varchar(256) NOT NULL DEFAULT '', -- '负责人',
|
||||||
|
description text, -- '备注信息',
|
||||||
|
status int NOT NULL DEFAULT '0', -- '0标识使用中,-1标识已废弃',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||||
|
CONSTRAINT topic_pk PRIMARY KEY (id)
|
||||||
|
); --='';
|
||||||
|
CREATE UNIQUE INDEX topic_uniq_cluster_id_topic_name ON topic (cluster_id, topic_name);
|
||||||
|
CREATE TRIGGER topic_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON topic
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
-- 用户收藏的Topic表
|
||||||
|
CREATE TABLE topic_favorite
|
||||||
|
(
|
||||||
|
id bigserial, -- '自增Id',
|
||||||
|
username varchar(64) NOT NULL DEFAULT '', -- '用户名',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||||
|
status int NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
|
||||||
|
CONSTRAINT topic_favorite_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX topic_favorite_uniq_username_cluster_id_topic_name ON topic_favorite (username, cluster_id, topic_name);
|
||||||
|
CREATE TRIGGER topic_favorite_trig_gmt_modify
|
||||||
|
BEFORE UPDATE
|
||||||
|
ON topic_favorite
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE on_update_timestamp();
|
||||||
|
|
||||||
|
-- TopicMetrics表
|
||||||
|
CREATE TABLE topic_metrics
|
||||||
|
(
|
||||||
|
id bigserial, -- '自增id',
|
||||||
|
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
|
||||||
|
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
|
||||||
|
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒进入消息条数',
|
||||||
|
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入',
|
||||||
|
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出',
|
||||||
|
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝字节数',
|
||||||
|
total_produce_requests decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒请求数',
|
||||||
|
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
|
||||||
|
CONSTRAINT topic_metrics_pk PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
CREATE INDEX topic_metrics_idx_cluster_id_topic_name_gmt_create ON topic_metrics (cluster_id, topic_name, gmt_create);
|
||||||
32
docker-compose.yml
Normal file
32
docker-compose.yml
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
version: '2'
|
||||||
|
services:
|
||||||
|
mysqldbserver:
|
||||||
|
container_name: mysqldbserver
|
||||||
|
image: mysql:5.7
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: docker/mysql/Dockerfile
|
||||||
|
ports:
|
||||||
|
- "3306:3306"
|
||||||
|
command: [
|
||||||
|
'mysqld',
|
||||||
|
'--character-set-server=utf8',
|
||||||
|
'--collation-server=utf8_general_ci',
|
||||||
|
'--default-time-zone=+8:00'
|
||||||
|
]
|
||||||
|
environment:
|
||||||
|
MYSQL_ROOT_PASSWORD: 12345678
|
||||||
|
MYSQL_DATABASE: kafka_manager
|
||||||
|
MYSQL_USER: admin
|
||||||
|
MYSQL_PASSWORD: 12345678
|
||||||
|
restart: always
|
||||||
|
kafka-manager-web:
|
||||||
|
container_name: kafka-manager-web
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: Dockerfile
|
||||||
|
ports:
|
||||||
|
- "8080:8080"
|
||||||
|
links:
|
||||||
|
- mysqldbserver
|
||||||
|
restart: always
|
||||||
7
docker/kafka-manager/Dockerfile
Normal file
7
docker/kafka-manager/Dockerfile
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
FROM java:8
|
||||||
|
MAINTAINER xuzhengxi
|
||||||
|
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
|
||||||
|
ADD ../../web/target/kafka-manager-web-1.0.0-SNAPSHOT.jar kafka-manager-web.jar
|
||||||
|
ADD ./application.yml application.yml
|
||||||
|
ENTRYPOINT ["java","-jar","/kafka-manager-web.jar","--spring.config.location=./application.yml"]
|
||||||
|
EXPOSE 8080
|
||||||
32
docker/kafka-manager/application-standalone.yml
Normal file
32
docker/kafka-manager/application-standalone.yml
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
server:
|
||||||
|
port: 8080
|
||||||
|
tomcat:
|
||||||
|
accept-count: 100
|
||||||
|
max-connections: 1000
|
||||||
|
max-threads: 20
|
||||||
|
min-spare-threads: 20
|
||||||
|
|
||||||
|
spring:
|
||||||
|
application:
|
||||||
|
name: kafkamanager
|
||||||
|
datasource:
|
||||||
|
kafka-manager:
|
||||||
|
jdbc-url: jdbc:mysql://mysqldbserver:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
|
||||||
|
username: admin
|
||||||
|
password: 12345678
|
||||||
|
driver-class-name: org.mariadb.jdbc.Driver
|
||||||
|
main:
|
||||||
|
allow-bean-definition-overriding: true
|
||||||
|
|
||||||
|
profiles:
|
||||||
|
active: dev
|
||||||
|
|
||||||
|
logging:
|
||||||
|
config: classpath:logback-spring.xml
|
||||||
|
|
||||||
|
# kafka监控
|
||||||
|
kafka-monitor:
|
||||||
|
enabled: true
|
||||||
|
notify-kafka:
|
||||||
|
cluster-id: 95
|
||||||
|
topic-name: kmo_monitor
|
||||||
32
docker/kafka-manager/application.yml
Normal file
32
docker/kafka-manager/application.yml
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
server:
|
||||||
|
port: 8080
|
||||||
|
tomcat:
|
||||||
|
accept-count: 100
|
||||||
|
max-connections: 1000
|
||||||
|
max-threads: 20
|
||||||
|
min-spare-threads: 20
|
||||||
|
|
||||||
|
spring:
|
||||||
|
application:
|
||||||
|
name: kafkamanager
|
||||||
|
datasource:
|
||||||
|
kafka-manager:
|
||||||
|
jdbc-url: jdbc:mysql://mysqldbserver:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
|
||||||
|
username: admin
|
||||||
|
password: 12345678
|
||||||
|
driver-class-name: org.mariadb.jdbc.Driver
|
||||||
|
main:
|
||||||
|
allow-bean-definition-overriding: true
|
||||||
|
|
||||||
|
profiles:
|
||||||
|
active: dev
|
||||||
|
|
||||||
|
logging:
|
||||||
|
config: classpath:logback-spring.xml
|
||||||
|
|
||||||
|
# kafka监控
|
||||||
|
kafka-monitor:
|
||||||
|
enabled: true
|
||||||
|
notify-kafka:
|
||||||
|
cluster-id: 95
|
||||||
|
topic-name: kmo_monitor
|
||||||
3
docker/mysql/Dockerfile
Normal file
3
docker/mysql/Dockerfile
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
FROM mysql:5.7
|
||||||
|
MAINTAINER xuzhengxi
|
||||||
|
COPY ./docker/mysql/create_mysql_table.sql /docker-entrypoint-initdb.d/
|
||||||
243
docker/mysql/create_mysql_table.sql
Normal file
243
docker/mysql/create_mysql_table.sql
Normal file
@@ -0,0 +1,243 @@
|
|||||||
|
CREATE DATABASE IF NOT EXISTS `kafka_manager`;
|
||||||
|
|
||||||
|
USE `kafka_manager`;
|
||||||
|
|
||||||
|
CREATE TABLE `account` (
|
||||||
|
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
|
||||||
|
`username` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '用户名',
|
||||||
|
`password` varchar(128) NOT NULL DEFAULT '' COMMENT '密码',
|
||||||
|
`role` int(16) NOT NULL DEFAULT '0' COMMENT '角色类型, 0:普通用户',
|
||||||
|
`status` int(16) NOT NULL DEFAULT '0' COMMENT '0标识使用中,-1标识已废弃',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
UNIQUE KEY `uniq_username` (`username`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='账号表';
|
||||||
|
INSERT INTO account(username, password, role) VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2);
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE `alarm_rule` (
|
||||||
|
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
|
||||||
|
`alarm_name` varchar(128) NOT NULL DEFAULT '' COMMENT '告警名字',
|
||||||
|
`strategy_expressions` text COMMENT '表达式',
|
||||||
|
`strategy_filters` text COMMENT '过滤条件',
|
||||||
|
`strategy_actions` text COMMENT '响应',
|
||||||
|
`principals` varchar(512) NOT NULL DEFAULT '' COMMENT '负责人',
|
||||||
|
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '-1:逻辑删除, 0:关闭, 1:正常',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
UNIQUE KEY `uniq_alarm_name` (`alarm_name`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='告警规则表';
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE `broker` (
|
||||||
|
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerID',
|
||||||
|
`host` varchar(128) NOT NULL DEFAULT '' COMMENT 'Broker主机名',
|
||||||
|
`port` int(32) NOT NULL DEFAULT '-1' COMMENT 'Broker端口',
|
||||||
|
`timestamp` bigint(128) NOT NULL DEFAULT '-1' COMMENT '启动时间',
|
||||||
|
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态0有效,-1无效',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
UNIQUE KEY `uniq_cluster_id_broker_id` (`cluster_id`,`broker_id`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Broker信息表';
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE `broker_metrics` (
|
||||||
|
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerID',
|
||||||
|
`bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流入',
|
||||||
|
`bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流出',
|
||||||
|
`bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒被拒绝字节数',
|
||||||
|
`messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消息数流入',
|
||||||
|
`fail_fetch_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消费失败数',
|
||||||
|
`fail_produce_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒失败生产数',
|
||||||
|
`fetch_consumer_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消费请求数',
|
||||||
|
`produce_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒生产数',
|
||||||
|
`request_handler_idl_percent` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '请求处理器繁忙百分比',
|
||||||
|
`network_processor_idl_percent` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '网络处理器繁忙百分比',
|
||||||
|
`request_queue_size` bigint(20) NOT NULL DEFAULT '0' COMMENT '请求列表大小',
|
||||||
|
`response_queue_size` bigint(20) NOT NULL DEFAULT '0' COMMENT '响应列表大小',
|
||||||
|
`log_flush_time` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '刷日志时间',
|
||||||
|
`total_time_produce_mean` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'produce请求处理总时间-平均值',
|
||||||
|
`total_time_produce_99th` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'produce请求处理总时间-99分位',
|
||||||
|
`total_time_fetch_consumer_mean` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'fetch请求总时间-平均值',
|
||||||
|
`total_time_fetch_consumer_99th` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'fetch请求总时间-99分位',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
KEY `idx_cluster_id_broker_id_gmt_create` (`cluster_id`,`broker_id`,`gmt_create`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='BrokerMetric信息表';
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE `cluster` (
|
||||||
|
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '集群ID',
|
||||||
|
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
|
||||||
|
`zookeeper` varchar(512) NOT NULL DEFAULT '' COMMENT 'ZK地址',
|
||||||
|
`bootstrap_servers` varchar(512) NOT NULL DEFAULT '' COMMENT 'Server地址',
|
||||||
|
`kafka_version` varchar(32) NOT NULL DEFAULT '' COMMENT 'Kafka版本',
|
||||||
|
`alarm_flag` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启告警, 1开启告警',
|
||||||
|
`security_protocol` varchar(512) NOT NULL DEFAULT '' COMMENT '安全协议',
|
||||||
|
`sasl_mechanism` varchar(512) NOT NULL DEFAULT '' COMMENT '安全机制',
|
||||||
|
`sasl_jaas_config` varchar(512) NOT NULL DEFAULT '' COMMENT 'Jaas配置',
|
||||||
|
`status` int(4) NOT NULL DEFAULT '0' COMMENT '删除标记, 0表示未删除, -1表示删除',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
UNIQUE KEY `uniq_cluster_name` (`cluster_name`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Cluster表';
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE `cluster_metrics` (
|
||||||
|
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '集群ID',
|
||||||
|
`topic_num` int(11) NOT NULL DEFAULT '0' COMMENT 'Topic数',
|
||||||
|
`partition_num` int(11) NOT NULL DEFAULT '0' COMMENT '分区数',
|
||||||
|
`broker_num` int(11) NOT NULL DEFAULT '0' COMMENT 'Broker数',
|
||||||
|
`bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒流入(B)',
|
||||||
|
`bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒流出(B)',
|
||||||
|
`bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒拒绝(B)',
|
||||||
|
`messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消息数(条)',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
KEY `idx_cluster_id_gmt_create` (`cluster_id`,`gmt_create`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='ClusterMetrics信息';
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE `controller` (
|
||||||
|
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerId',
|
||||||
|
`host` varchar(256) NOT NULL DEFAULT '' COMMENT '主机名',
|
||||||
|
`timestamp` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Controller变更时间',
|
||||||
|
`version` int(11) NOT NULL DEFAULT '-1' COMMENT 'Controller格式版本',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
UNIQUE KEY `uniq_cluster_id_broker_id_timestamp` (`cluster_id`,`broker_id`,`timestamp`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Controller历史变更记录表';
|
||||||
|
|
||||||
|
CREATE TABLE `migration_task` (
|
||||||
|
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '集群ID',
|
||||||
|
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||||
|
`reassignment_json` text COMMENT '任务参数',
|
||||||
|
`real_throttle` bigint(20) NOT NULL DEFAULT '0' COMMENT '实际限流值(B/s)',
|
||||||
|
`operator` varchar(128) NOT NULL DEFAULT '' COMMENT '操作人',
|
||||||
|
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '备注说明',
|
||||||
|
`status` int(11) NOT NULL DEFAULT '0' COMMENT '任务状态',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '任务创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '任务修改时间',
|
||||||
|
PRIMARY KEY (`id`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic迁移信息';
|
||||||
|
|
||||||
|
CREATE TABLE `operation_history` (
|
||||||
|
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||||
|
`operator` varchar(128) NOT NULL DEFAULT '' COMMENT '操作人',
|
||||||
|
`operation` varchar(256) NOT NULL DEFAULT '' COMMENT '操作描述',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
PRIMARY KEY (`id`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='操作记录表';
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE `order_partition` (
|
||||||
|
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
|
||||||
|
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||||
|
`broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表, 逗号分割',
|
||||||
|
`partition_num` int(11) NOT NULL DEFAULT 0 COMMENT '新增分区数',
|
||||||
|
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
|
||||||
|
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量',
|
||||||
|
`description` text COMMENT '备注信息',
|
||||||
|
`order_status` int(16) NOT NULL DEFAULT '0' COMMENT '工单状态',
|
||||||
|
`approver` varchar(128) NOT NULL DEFAULT '' COMMENT '审批人',
|
||||||
|
`opinion` varchar(256) NOT NULL DEFAULT '' COMMENT '审批意见',
|
||||||
|
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0标识有效,-1无效',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
PRIMARY KEY (`id`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='分区申请工单';
|
||||||
|
|
||||||
|
CREATE TABLE `order_topic` (
|
||||||
|
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
|
||||||
|
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||||
|
`retention_time` bigint(20) NOT NULL DEFAULT '-1' COMMENT '保留时间(ms)',
|
||||||
|
`partition_num` int(16) NOT NULL DEFAULT '-1' COMMENT '分区数',
|
||||||
|
`replica_num` int(16) NOT NULL DEFAULT '-1' COMMENT '副本数',
|
||||||
|
`regions` varchar(128) NOT NULL DEFAULT '' COMMENT 'RegionId列表',
|
||||||
|
`brokers` varchar(128) NOT NULL DEFAULT '' COMMENT 'Broker列表',
|
||||||
|
`peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流入流量(KB)',
|
||||||
|
`applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人',
|
||||||
|
`principals` varchar(256) NOT NULL DEFAULT '' COMMENT '负责人',
|
||||||
|
`description` text COMMENT '备注信息',
|
||||||
|
`order_status` int(16) NOT NULL DEFAULT '0' COMMENT '工单状态',
|
||||||
|
`approver` varchar(128) NOT NULL DEFAULT '' COMMENT '审批人',
|
||||||
|
`opinion` varchar(256) NOT NULL DEFAULT '' COMMENT '审批意见',
|
||||||
|
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态,0标识有效,-1无效',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
PRIMARY KEY (`id`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic申请工单';
|
||||||
|
|
||||||
|
CREATE TABLE `region` (
|
||||||
|
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`region_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Region名称',
|
||||||
|
`broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表',
|
||||||
|
`level` int(16) NOT NULL DEFAULT '0' COMMENT 'Region重要等级, 0级普通, 1极重要,2级极重要',
|
||||||
|
`operator` varchar(45) NOT NULL DEFAULT '' COMMENT '操作人',
|
||||||
|
`description` text COMMENT '备注说明',
|
||||||
|
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0正常,-1废弃,1容量已满',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
UNIQUE KEY `uniq_cluster_id_region_name` (`cluster_id`,`region_name`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Region信息表';
|
||||||
|
|
||||||
|
CREATE TABLE `topic` (
|
||||||
|
`id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`topic_name` varchar(192) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||||
|
`applicant` varchar(256) NOT NULL DEFAULT '' COMMENT '申请人',
|
||||||
|
`principals` varchar(256) NOT NULL DEFAULT '' COMMENT '负责人',
|
||||||
|
`description` text COMMENT '备注信息',
|
||||||
|
`status` int(16) NOT NULL DEFAULT '0' COMMENT '0标识使用中,-1标识已废弃',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
UNIQUE KEY `uniq_cluster_id_topic_name` (`cluster_id`,`topic_name`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic信息表';
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE `topic_favorite` (
|
||||||
|
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增Id',
|
||||||
|
`username` varchar(64) NOT NULL DEFAULT '' COMMENT '用户名',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||||
|
`status` int(16) NOT NULL DEFAULT '0' COMMENT '删除标记, 0表示未删除, -1表示删除',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
`gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
UNIQUE KEY `uniq_username_cluster_id_topic_name` (`username`,`cluster_id`,`topic_name`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户收藏的Topic表';
|
||||||
|
|
||||||
|
CREATE TABLE `topic_metrics` (
|
||||||
|
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
|
||||||
|
`cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID',
|
||||||
|
`topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称',
|
||||||
|
`messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒进入消息条数',
|
||||||
|
`bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流入',
|
||||||
|
`bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流出',
|
||||||
|
`bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒拒绝字节数',
|
||||||
|
`total_produce_requests` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒请求数',
|
||||||
|
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
KEY `idx_cluster_id_topic_name_gmt_create` (`cluster_id`,`topic_name`,`gmt_create`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='TopicMetrics表';
|
||||||
2
pom.xml
2
pom.xml
@@ -82,7 +82,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.zookeeper</groupId>
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
<artifactId>zookeeper</artifactId>
|
<artifactId>zookeeper</artifactId>
|
||||||
<version>3.4.6</version>
|
<version>3.4.14</version>
|
||||||
<exclusions>
|
<exclusions>
|
||||||
<exclusion>
|
<exclusion>
|
||||||
<groupId>org.slf4j</groupId>
|
<groupId>org.slf4j</groupId>
|
||||||
|
|||||||
@@ -2,12 +2,12 @@ package com.xiaojukeji.kafka.manager.service.collector;
|
|||||||
|
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetrics;
|
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetrics;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
|
||||||
import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager;
|
import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager;
|
||||||
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
|
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
|
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -34,8 +34,8 @@ public class CollectConsumerMetricsTask extends BaseCollectTask {
|
|||||||
if (clusterDO == null) {
|
if (clusterDO == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap = new HashMap<>();
|
Map<TopicPartition, Long> allPartitionOffsetMap = new HashMap<>();
|
||||||
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, topicNamePartitionStateListMap);
|
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, allPartitionOffsetMap);
|
||||||
|
|
||||||
List<ConsumerMetrics> consumerMetricsList = convert2ConsumerMetrics(consumerDTOList);
|
List<ConsumerMetrics> consumerMetricsList = convert2ConsumerMetrics(consumerDTOList);
|
||||||
KafkaMetricsCache.putConsumerMetricsToCache(clusterId, consumerMetricsList);
|
KafkaMetricsCache.putConsumerMetricsToCache(clusterId, consumerMetricsList);
|
||||||
@@ -47,23 +47,27 @@ public class CollectConsumerMetricsTask extends BaseCollectTask {
|
|||||||
private List<ConsumerMetrics> convert2ConsumerMetrics(List<ConsumerDTO> consumerDTOList) {
|
private List<ConsumerMetrics> convert2ConsumerMetrics(List<ConsumerDTO> consumerDTOList) {
|
||||||
List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();
|
List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();
|
||||||
for (ConsumerDTO consumerDTO : consumerDTOList) {
|
for (ConsumerDTO consumerDTO : consumerDTOList) {
|
||||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap = consumerDTO.getTopicPartitionMap();
|
if (consumerDTO.getPartitionOffsetMap() == null || consumerDTO.getConsumerOffsetMap() == null) {
|
||||||
for(Map.Entry<String, List<PartitionState>> entry : topicNamePartitionStateListMap.entrySet()){
|
continue;
|
||||||
String topicName = entry.getKey();
|
|
||||||
List<PartitionState> partitionStateList = entry.getValue();
|
|
||||||
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
|
|
||||||
consumerMetrics.setClusterId(clusterId);
|
|
||||||
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
|
|
||||||
consumerMetrics.setLocation(consumerDTO.getLocation());
|
|
||||||
consumerMetrics.setTopicName(topicName);
|
|
||||||
long sumLag = 0;
|
|
||||||
for (PartitionState partitionState : partitionStateList) {
|
|
||||||
Map.Entry<Long, Long> offsetEntry = new AbstractMap.SimpleEntry<>(partitionState.getOffset(), partitionState.getConsumeOffset());
|
|
||||||
sumLag += (offsetEntry.getKey() - offsetEntry.getValue() > 0 ? offsetEntry.getKey() - offsetEntry.getValue(): 0);
|
|
||||||
}
|
|
||||||
consumerMetrics.setSumLag(sumLag);
|
|
||||||
consumerMetricsList.add(consumerMetrics);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
|
||||||
|
consumerMetrics.setClusterId(consumerDTO.getClusterId());
|
||||||
|
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
|
||||||
|
consumerMetrics.setLocation(consumerDTO.getLocation());
|
||||||
|
consumerMetrics.setTopicName(consumerDTO.getTopicName());
|
||||||
|
|
||||||
|
long sumLag = 0;
|
||||||
|
for(Map.Entry<Integer, Long> entry : consumerDTO.getPartitionOffsetMap().entrySet()){
|
||||||
|
Long partitionOffset = entry.getValue();
|
||||||
|
Long consumerOffset = consumerDTO.getConsumerOffsetMap().get(entry.getKey());
|
||||||
|
if (partitionOffset == null || consumerOffset == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
sumLag += Math.max(partitionOffset - consumerOffset, 0);
|
||||||
|
}
|
||||||
|
consumerMetrics.setSumLag(sumLag);
|
||||||
|
consumerMetricsList.add(consumerMetrics);
|
||||||
}
|
}
|
||||||
return consumerMetricsList;
|
return consumerMetricsList;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerGroupDTO;
|
|||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.PartitionOffsetDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.PartitionOffsetDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -57,7 +58,7 @@ public interface ConsumerService {
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
List<ConsumerDTO> getMonitoredConsumerList(ClusterDO clusterDO,
|
List<ConsumerDTO> getMonitoredConsumerList(ClusterDO clusterDO,
|
||||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap);
|
Map<TopicPartition, Long> partitionOffsetMap);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 重置offset
|
* 重置offset
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ public interface OrderService {
|
|||||||
* @date 19/6/23
|
* @date 19/6/23
|
||||||
* @return Result
|
* @return Result
|
||||||
*/
|
*/
|
||||||
Result modifyOrderPartition(OrderPartitionDO orderPartitionDO, String operator);
|
Result modifyOrderPartition(OrderPartitionDO orderPartitionDO, String operator, boolean admin);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 查询Topic工单
|
* 查询Topic工单
|
||||||
|
|||||||
@@ -76,6 +76,7 @@ public class AdminTopicServiceImpl implements AdminTopicService {
|
|||||||
OperationHistoryDO operationHistoryDO = OperationHistoryDO.newInstance(topicDO.getClusterId(), topicDO.getTopicName(), operator, OperationEnum.CREATE_TOPIC.message);
|
OperationHistoryDO operationHistoryDO = OperationHistoryDO.newInstance(topicDO.getClusterId(), topicDO.getTopicName(), operator, OperationEnum.CREATE_TOPIC.message);
|
||||||
operationHistoryDao.insert(operationHistoryDO);
|
operationHistoryDao.insert(operationHistoryDO);
|
||||||
topicDao.replace(topicDO);
|
topicDao.replace(topicDO);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return AdminTopicStatusEnum.REPLACE_DB_FAILED;
|
return AdminTopicStatusEnum.REPLACE_DB_FAILED;
|
||||||
}
|
}
|
||||||
@@ -188,4 +189,5 @@ public class AdminTopicServiceImpl implements AdminTopicService {
|
|||||||
}
|
}
|
||||||
return AdminTopicStatusEnum.SUCCESS;
|
return AdminTopicStatusEnum.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
|
|||||||
|
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.OffsetStoreLocation;
|
import com.xiaojukeji.kafka.manager.common.constant.OffsetStoreLocation;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.StatusCode;
|
import com.xiaojukeji.kafka.manager.common.constant.StatusCode;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
|
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
|
||||||
@@ -18,7 +17,6 @@ import com.xiaojukeji.kafka.manager.service.cache.ConsumerMetadataCache;
|
|||||||
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientCache;
|
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientCache;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
|
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.TopicService;
|
import com.xiaojukeji.kafka.manager.service.service.TopicService;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.ZookeeperService;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.zk.ZkPathUtil;
|
import com.xiaojukeji.kafka.manager.common.utils.zk.ZkPathUtil;
|
||||||
import kafka.admin.AdminClient;
|
import kafka.admin.AdminClient;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
@@ -49,9 +47,6 @@ public class ConsumerServiceImpl implements ConsumerService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TopicService topicService;
|
private TopicService topicService;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ZookeeperService zkService;
|
|
||||||
|
|
||||||
private final ExecutorService consumerListThreadPool = Executors.newFixedThreadPool(50, new DefaultThreadFactory("ConsumerPool"));
|
private final ExecutorService consumerListThreadPool = Executors.newFixedThreadPool(50, new DefaultThreadFactory("ConsumerPool"));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -135,20 +130,20 @@ public class ConsumerServiceImpl implements ConsumerService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ConsumerDTO> getMonitoredConsumerList(final ClusterDO clusterDO,
|
public List<ConsumerDTO> getMonitoredConsumerList(final ClusterDO clusterDO,
|
||||||
final Map<String, List<PartitionState>> partitionStateListMap) {
|
final Map<TopicPartition, Long> allPartitionOffsetMap) {
|
||||||
List<ConsumerGroupDTO> consumerGroupDTOList = getConsumerGroupList(clusterDO.getId());
|
List<ConsumerGroupDTO> consumerGroupDTOList = getConsumerGroupList(clusterDO.getId());
|
||||||
if (consumerGroupDTOList == null || consumerGroupDTOList.isEmpty()) {
|
if (consumerGroupDTOList == null || consumerGroupDTOList.isEmpty()) {
|
||||||
return new ArrayList<>();
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
FutureTask<ConsumerDTO>[] taskList = new FutureTask[consumerGroupDTOList.size()];
|
FutureTask<List<ConsumerDTO>>[] taskList = new FutureTask[consumerGroupDTOList.size()];
|
||||||
for (int i = 0; i < consumerGroupDTOList.size(); i++) {
|
for (int i = 0; i < consumerGroupDTOList.size(); i++) {
|
||||||
final ConsumerGroupDTO consumerGroupDTO = consumerGroupDTOList.get(i);
|
final ConsumerGroupDTO consumerGroupDTO = consumerGroupDTOList.get(i);
|
||||||
taskList[i] = new FutureTask<>(new Callable<ConsumerDTO>() {
|
taskList[i] = new FutureTask<>(new Callable<List<ConsumerDTO>>() {
|
||||||
@Override
|
@Override
|
||||||
public ConsumerDTO call() throws Exception {
|
public List<ConsumerDTO> call() throws Exception {
|
||||||
try {
|
try {
|
||||||
return getMonitoredConsumer(clusterDO, consumerGroupDTO, partitionStateListMap);
|
return getMonitoredConsumer(clusterDO, consumerGroupDTO, allPartitionOffsetMap);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("get monitored consumer error, group:{}", consumerGroupDTO.getConsumerGroup(), e);
|
logger.error("get monitored consumer error, group:{}", consumerGroupDTO.getConsumerGroup(), e);
|
||||||
}
|
}
|
||||||
@@ -159,31 +154,70 @@ public class ConsumerServiceImpl implements ConsumerService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
List<ConsumerDTO> consumerList = new ArrayList<>();
|
List<ConsumerDTO> consumerList = new ArrayList<>();
|
||||||
for (FutureTask<ConsumerDTO> task : taskList) {
|
for (FutureTask<List<ConsumerDTO>> task : taskList) {
|
||||||
ConsumerDTO consumer = null;
|
List<ConsumerDTO> dtoList = null;
|
||||||
try {
|
try {
|
||||||
consumer = task.get();
|
dtoList = task.get();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("getMonitoredConsumerList@ConsumeServiceImpl, ", e);
|
logger.error("getMonitoredConsumerList@ConsumeServiceImpl, ", e);
|
||||||
}
|
}
|
||||||
if (consumer == null) {
|
if (dtoList == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
consumerList.add(consumer);
|
consumerList.addAll(dtoList);
|
||||||
}
|
}
|
||||||
return consumerList;
|
return consumerList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ConsumerDTO getMonitoredConsumer(ClusterDO cluster, ConsumerGroupDTO consumerGroupDTO, Map<String, List<PartitionState>> globalTopicNamePartitionStateListMap) {
|
private List<ConsumerDTO> getMonitoredConsumer(ClusterDO clusterDO,
|
||||||
// 获取当前consumerGroup下的所有的topic的partitionState信息
|
ConsumerGroupDTO consumerGroupDTO,
|
||||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap = getConsumerGroupPartitionStateList(cluster, consumerGroupDTO, globalTopicNamePartitionStateListMap);
|
Map<TopicPartition, Long> allPartitionOffsetMap) {
|
||||||
|
List<ConsumerDTO> dtoList = new ArrayList<>();
|
||||||
|
|
||||||
//将没有对应consumer的partition信息统一放到一个consumer中
|
List<String> topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(
|
||||||
ConsumerDTO consumerDTO = new ConsumerDTO();
|
clusterDO.getId(),
|
||||||
consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
|
consumerGroupDTO.getOffsetStoreLocation().getLocation(),
|
||||||
consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().name());
|
consumerGroupDTO.getConsumerGroup()
|
||||||
consumerDTO.setTopicPartitionMap(topicNamePartitionStateListMap);
|
);
|
||||||
return consumerDTO;
|
for (String topicName : topicNameList) {
|
||||||
|
TopicMetadata metadata = ClusterMetadataManager.getTopicMetaData(clusterDO.getId(), topicName);
|
||||||
|
if (metadata == null || metadata.getPartitionNum() <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!allPartitionOffsetMap.containsKey(new TopicPartition(topicName, 0))) {
|
||||||
|
Map<TopicPartition, Long> offsetMap = topicService.getTopicPartitionOffset(clusterDO, topicName);
|
||||||
|
if (offsetMap == null) {
|
||||||
|
offsetMap = new HashMap<>();
|
||||||
|
}
|
||||||
|
allPartitionOffsetMap.putAll(offsetMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<Integer, Long> consumerOffsetMap = null;
|
||||||
|
if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) {
|
||||||
|
consumerOffsetMap = getTopicConsumerOffsetInZK(clusterDO, metadata, consumerGroupDTO);
|
||||||
|
} else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) {
|
||||||
|
consumerOffsetMap = getTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<Integer, Long> partitionOffsetMap = new HashMap<>();
|
||||||
|
for (int partitionId = 0; partitionId < metadata.getPartitionNum(); ++partitionId) {
|
||||||
|
Long offset = allPartitionOffsetMap.get(new TopicPartition(topicName, partitionId));
|
||||||
|
if (offset == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
partitionOffsetMap.put(partitionId, offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
ConsumerDTO consumerDTO = new ConsumerDTO();
|
||||||
|
consumerDTO.setClusterId(clusterDO.getId());
|
||||||
|
consumerDTO.setTopicName(topicName);
|
||||||
|
consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
|
||||||
|
consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().getLocation());
|
||||||
|
consumerDTO.setPartitionOffsetMap(partitionOffsetMap);
|
||||||
|
consumerDTO.setConsumerOffsetMap(consumerOffsetMap);
|
||||||
|
dtoList.add(consumerDTO);
|
||||||
|
}
|
||||||
|
return dtoList;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -264,52 +298,15 @@ public class ConsumerServiceImpl implements ConsumerService {
|
|||||||
kafkaConsumer.commitSync();
|
kafkaConsumer.commitSync();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private Map<Integer, Long> getTopicConsumerOffsetInZK(ClusterDO clusterDO,
|
||||||
* 获取属于该集群和consumerGroup下的所有topic的信息
|
TopicMetadata topicMetadata,
|
||||||
*/
|
ConsumerGroupDTO consumerGroupDTO) {
|
||||||
private Map<String, List<PartitionState>> getConsumerGroupPartitionStateList(ClusterDO clusterDO,
|
Map<Integer, Long> offsetMap = new HashMap<>();
|
||||||
ConsumerGroupDTO consumerGroupDTO,
|
|
||||||
Map<String, List<PartitionState>> globalTopicNamePartitionStateListMap) {
|
|
||||||
Map<String, List<PartitionState>> topicNamePartitionStateListMap = new HashMap<>(2);
|
|
||||||
|
|
||||||
List<String> topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(clusterDO.getId(),consumerGroupDTO.getOffsetStoreLocation().getLocation(), consumerGroupDTO.getConsumerGroup());
|
ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(clusterDO.getId());
|
||||||
for (String topicName : topicNameList) {
|
for (int partitionId = 0; partitionId < topicMetadata.getPartitionNum(); ++partitionId) {
|
||||||
if (!ClusterMetadataManager.isTopicExist(clusterDO.getId(), topicName)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<PartitionState> partitionStateList = globalTopicNamePartitionStateListMap.get(topicName);
|
|
||||||
if (partitionStateList == null) {
|
|
||||||
try {
|
|
||||||
partitionStateList = zkService.getTopicPartitionState(clusterDO.getId(), topicName);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("get topic partition state failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
|
|
||||||
}
|
|
||||||
if (partitionStateList == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
globalTopicNamePartitionStateListMap.put(topicName, partitionStateList);
|
|
||||||
}
|
|
||||||
List<PartitionState> consumerGroupPartitionStateList = new ArrayList<>();
|
|
||||||
for (PartitionState partitionState: partitionStateList) {
|
|
||||||
consumerGroupPartitionStateList.add((PartitionState) partitionState.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) {
|
|
||||||
updateTopicConsumerOffsetInZK(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList);
|
|
||||||
} else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) {
|
|
||||||
updateTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList);
|
|
||||||
}
|
|
||||||
topicNamePartitionStateListMap.put(topicName, consumerGroupPartitionStateList);
|
|
||||||
}
|
|
||||||
return topicNamePartitionStateListMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void updateTopicConsumerOffsetInZK(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List<PartitionState> partitionStateList) {
|
|
||||||
ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(cluster.getId());
|
|
||||||
for (PartitionState partitionState : partitionStateList) {
|
|
||||||
//offset存储于zk中
|
//offset存储于zk中
|
||||||
String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId());
|
String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicMetadata.getTopic(), partitionId);
|
||||||
String offset = null;
|
String offset = null;
|
||||||
try {
|
try {
|
||||||
Stat stat = zkConfig.getNodeStat(consumerGroupOffsetLocation);
|
Stat stat = zkConfig.getNodeStat(consumerGroupOffsetLocation);
|
||||||
@@ -317,39 +314,32 @@ public class ConsumerServiceImpl implements ConsumerService {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
offset = zkConfig.get(consumerGroupOffsetLocation);
|
offset = zkConfig.get(consumerGroupOffsetLocation);
|
||||||
|
offsetMap.put(partitionId, Long.valueOf(offset));
|
||||||
} catch (ConfigException e) {
|
} catch (ConfigException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
||||||
String consumerId = null;
|
|
||||||
try {
|
|
||||||
consumerId = zkConfig.get(ZkPathUtil.getConsumerGroupOwnersTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId()));
|
|
||||||
} catch (ConfigException e) {
|
|
||||||
// logger.error("get consumerId error in updateTopicConsumerOffsetInZK cluster:{} topic:{} consumerGroup:{}", cluster.getClusterName(), topicName, consumerGroupDTO.getConsumerGroup());
|
|
||||||
}
|
|
||||||
partitionState.setConsumerGroup(consumerGroupDTO.getConsumerGroup());
|
|
||||||
updatePartitionStateOffset(partitionState, offset, consumerId);
|
|
||||||
}
|
}
|
||||||
|
return offsetMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateTopicConsumerOffsetInBroker(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List<PartitionState> partitionStateList) {
|
private Map<Integer, Long> getTopicConsumerOffsetInBroker(ClusterDO clusterDO,
|
||||||
Map<Integer, String> offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(cluster, consumerGroupDTO.getConsumerGroup(), topicName);
|
String topicName,
|
||||||
|
ConsumerGroupDTO consumerGroupDTO) {
|
||||||
|
Map<Integer, String> offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(clusterDO, consumerGroupDTO.getConsumerGroup(), topicName);
|
||||||
if (offsetsFromBroker == null || offsetsFromBroker.isEmpty()) {
|
if (offsetsFromBroker == null || offsetsFromBroker.isEmpty()) {
|
||||||
return;
|
return new HashMap<>(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (PartitionState partitionState : partitionStateList) {
|
Map<Integer, Long> offsetMap = new HashMap<>(offsetsFromBroker.size());
|
||||||
int partitionId = partitionState.getPartitionId();
|
for (Map.Entry<Integer, String> entry: offsetsFromBroker.entrySet()) {
|
||||||
updatePartitionStateOffset(partitionState, offsetsFromBroker.get(partitionId), null);
|
try {
|
||||||
|
offsetMap.put(entry.getKey(), Long.valueOf(entry.getValue()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("get topic consumer offset failed, clusterId:{} topicName:{} consumerGroup:{}."
|
||||||
|
, clusterDO.getId(), topicName, consumerGroupDTO.getConsumerGroup());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
return offsetMap;
|
||||||
|
|
||||||
private void updatePartitionStateOffset(PartitionState partitionState, String offset, String consumerId) {
|
|
||||||
partitionState.setConsumeOffset(0);
|
|
||||||
if (!StringUtils.isEmpty(offset)) {
|
|
||||||
partitionState.setConsumeOffset(Long.parseLong(offset));
|
|
||||||
}
|
|
||||||
partitionState.setConsumerGroup(consumerId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<Integer, String> getConsumeIdMap(Long clusterId, String topicName, String consumerGroup) {
|
private Map<Integer, String> getConsumeIdMap(Long clusterId, String topicName, String consumerGroup) {
|
||||||
|
|||||||
@@ -72,6 +72,9 @@ public class JmxServiceImpl implements JmxService {
|
|||||||
List<Attribute> attributeValueList = null;
|
List<Attribute> attributeValueList = null;
|
||||||
try {
|
try {
|
||||||
attributeValueList = connection.getAttributes(new ObjectName(mbean.getObjectName()), properties).asList();
|
attributeValueList = connection.getAttributes(new ObjectName(mbean.getObjectName()), properties).asList();
|
||||||
|
} catch (InstanceNotFoundException e) {
|
||||||
|
logger.warn("getSpecifiedBrokerMetricsFromJmx@JmxServiceImpl, get metrics fail, objectName:{}.", mbean.getObjectName(), e);
|
||||||
|
continue;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("getSpecifiedBrokerMetricsFromJmx@JmxServiceImpl, get metrics fail, objectName:{}.", mbean.getObjectName(), e);
|
logger.error("getSpecifiedBrokerMetricsFromJmx@JmxServiceImpl, get metrics fail, objectName:{}.", mbean.getObjectName(), e);
|
||||||
continue;
|
continue;
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ public class OrderServiceImpl implements OrderService {
|
|||||||
if (orderPartitionDO != null) {
|
if (orderPartitionDO != null) {
|
||||||
orderPartitionDO.setOrderStatus(OrderStatusEnum.CANCELLED.getCode());
|
orderPartitionDO.setOrderStatus(OrderStatusEnum.CANCELLED.getCode());
|
||||||
}
|
}
|
||||||
return modifyOrderPartition(orderPartitionDO, operator);
|
return modifyOrderPartition(orderPartitionDO, operator, false);
|
||||||
}
|
}
|
||||||
return new Result(StatusCode.PARAM_ERROR, "order type illegal");
|
return new Result(StatusCode.PARAM_ERROR, "order type illegal");
|
||||||
}
|
}
|
||||||
@@ -74,10 +74,10 @@ public class OrderServiceImpl implements OrderService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result modifyOrderPartition(OrderPartitionDO newOrderPartitionDO, String operator) {
|
public Result modifyOrderPartition(OrderPartitionDO newOrderPartitionDO, String operator, boolean admin) {
|
||||||
if (newOrderPartitionDO == null) {
|
if (newOrderPartitionDO == null) {
|
||||||
return new Result(StatusCode.PARAM_ERROR, "param illegal, order not exist");
|
return new Result(StatusCode.PARAM_ERROR, "param illegal, order not exist");
|
||||||
} else if (!newOrderPartitionDO.getApplicant().equals(operator)) {
|
} else if (!admin && !newOrderPartitionDO.getApplicant().equals(operator)) {
|
||||||
return new Result(StatusCode.PARAM_ERROR, "without authority to cancel the order");
|
return new Result(StatusCode.PARAM_ERROR, "without authority to cancel the order");
|
||||||
}
|
}
|
||||||
OrderPartitionDO oldOrderPartitionDO = orderPartitionDao.getById(newOrderPartitionDO.getId());
|
OrderPartitionDO oldOrderPartitionDO = orderPartitionDao.getById(newOrderPartitionDO.getId());
|
||||||
|
|||||||
@@ -345,7 +345,7 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
} else {
|
} else {
|
||||||
topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true);
|
topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true);
|
||||||
topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec());
|
topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec());
|
||||||
topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec());
|
topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getBytesOutPerSec());
|
||||||
}
|
}
|
||||||
return topicOverviewDTO;
|
return topicOverviewDTO;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -176,7 +176,7 @@ public class OrderController {
|
|||||||
TopicDO topicInfoDO = OrderConverter.convert2TopicInfoDO(orderTopicDO);
|
TopicDO topicInfoDO = OrderConverter.convert2TopicInfoDO(orderTopicDO);
|
||||||
List<Integer> brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList());
|
List<Integer> brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList());
|
||||||
Properties topicConfig = new Properties();
|
Properties topicConfig = new Properties();
|
||||||
topicConfig.setProperty("retention.ms", String.valueOf(reqObj.getRetentionTime()));
|
topicConfig.setProperty("retention.ms", String.valueOf(reqObj.getRetentionTime() * 60 * 60 * 1000));
|
||||||
try {
|
try {
|
||||||
TopicMetadata topicMetadata = new TopicMetadata();
|
TopicMetadata topicMetadata = new TopicMetadata();
|
||||||
topicMetadata.setTopic(orderTopicDO.getTopicName());
|
topicMetadata.setTopic(orderTopicDO.getTopicName());
|
||||||
@@ -325,14 +325,16 @@ public class OrderController {
|
|||||||
orderPartitionDO.setApprover(username);
|
orderPartitionDO.setApprover(username);
|
||||||
orderPartitionDO.setOpinion(reqObj.getApprovalOpinions());
|
orderPartitionDO.setOpinion(reqObj.getApprovalOpinions());
|
||||||
orderPartitionDO.setOrderStatus(reqObj.getOrderStatus());
|
orderPartitionDO.setOrderStatus(reqObj.getOrderStatus());
|
||||||
result = orderService.modifyOrderPartition(orderPartitionDO, username);
|
result = orderService.modifyOrderPartition(orderPartitionDO, username, true);
|
||||||
if (!StatusCode.SUCCESS.equals(result.getCode())) {
|
if (!StatusCode.SUCCESS.equals(result.getCode())) {
|
||||||
return new Result(StatusCode.OPERATION_ERROR, "create topic success, but update order status failed, err:" + result.getMessage());
|
return new Result(StatusCode.OPERATION_ERROR, "expand topic success, but update order status failed, err:" + result.getMessage());
|
||||||
}
|
}
|
||||||
return new Result();
|
return new Result();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Result expandTopic(ClusterDO clusterDO, OrderPartitionExecModel reqObj, OrderPartitionDO orderPartitionDO) {
|
private Result expandTopic(ClusterDO clusterDO,
|
||||||
|
OrderPartitionExecModel reqObj,
|
||||||
|
OrderPartitionDO orderPartitionDO) {
|
||||||
List<Integer> brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList());
|
List<Integer> brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList());
|
||||||
try {
|
try {
|
||||||
TopicMetadata topicMetadata = new TopicMetadata();
|
TopicMetadata topicMetadata = new TopicMetadata();
|
||||||
@@ -343,6 +345,8 @@ public class OrderController {
|
|||||||
if (!AdminTopicStatusEnum.SUCCESS.equals(adminTopicStatusEnum)) {
|
if (!AdminTopicStatusEnum.SUCCESS.equals(adminTopicStatusEnum)) {
|
||||||
return new Result(StatusCode.OPERATION_ERROR, adminTopicStatusEnum.getMessage());
|
return new Result(StatusCode.OPERATION_ERROR, adminTopicStatusEnum.getMessage());
|
||||||
}
|
}
|
||||||
|
orderPartitionDO.setPartitionNum(reqObj.getPartitionNum());
|
||||||
|
orderPartitionDO.setBrokerList(ListUtils.intList2String(brokerIdList));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("expandTopic@OrderController, create failed, req:{}.", reqObj);
|
logger.error("expandTopic@OrderController, create failed, req:{}.", reqObj);
|
||||||
return new Result(StatusCode.OPERATION_ERROR, Constant.KAFKA_MANAGER_INNER_ERROR);
|
return new Result(StatusCode.OPERATION_ERROR, Constant.KAFKA_MANAGER_INNER_ERROR);
|
||||||
|
|||||||
@@ -86,7 +86,8 @@ public class OrderConverter {
|
|||||||
|
|
||||||
public static OrderPartitionVO convert2OrderPartitionVO(OrderPartitionDO orderPartitionDO,
|
public static OrderPartitionVO convert2OrderPartitionVO(OrderPartitionDO orderPartitionDO,
|
||||||
TopicMetadata topicMetadata,
|
TopicMetadata topicMetadata,
|
||||||
Long maxAvgBytes, List<RegionDO> regionDOList) {
|
Long maxAvgBytes,
|
||||||
|
List<RegionDO> regionDOList) {
|
||||||
if (orderPartitionDO == null) {
|
if (orderPartitionDO == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@@ -100,8 +101,12 @@ public class OrderConverter {
|
|||||||
if (topicMetadata == null) {
|
if (topicMetadata == null) {
|
||||||
return orderPartitionVO;
|
return orderPartitionVO;
|
||||||
}
|
}
|
||||||
orderPartitionVO.setPartitionNum(topicMetadata.getPartitionNum());
|
|
||||||
|
orderPartitionVO.setPartitionNum(null);
|
||||||
orderPartitionVO.setBrokerIdList(new ArrayList<>(topicMetadata.getBrokerIdSet()));
|
orderPartitionVO.setBrokerIdList(new ArrayList<>(topicMetadata.getBrokerIdSet()));
|
||||||
|
if (OrderStatusEnum.PASSED.getCode().equals(orderPartitionDO.getOrderStatus())) {
|
||||||
|
orderPartitionVO.setPartitionNum(orderPartitionDO.getPartitionNum());
|
||||||
|
}
|
||||||
|
|
||||||
if (regionDOList == null || regionDOList.isEmpty()) {
|
if (regionDOList == null || regionDOList.isEmpty()) {
|
||||||
orderPartitionVO.setRegionNameList(new ArrayList<>());
|
orderPartitionVO.setRegionNameList(new ArrayList<>());
|
||||||
|
|||||||
33
web/src/main/resources/application-pg.yml
Normal file
33
web/src/main/resources/application-pg.yml
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
server:
|
||||||
|
port: 8080
|
||||||
|
tomcat:
|
||||||
|
accept-count: 100
|
||||||
|
max-connections: 1000
|
||||||
|
max-threads: 20
|
||||||
|
min-spare-threads: 20
|
||||||
|
|
||||||
|
spring:
|
||||||
|
application:
|
||||||
|
name: kafkamanager
|
||||||
|
datasource:
|
||||||
|
kafka-manager:
|
||||||
|
driver-class-name: org.postgresql.Driver
|
||||||
|
jdbc-url: jdbc:postgresql://localhost:5432/kafka_manager?reWriteBatchedInserts=true
|
||||||
|
username: admin
|
||||||
|
password: admin
|
||||||
|
type: com.zaxxer.hikari.HikariDataSource
|
||||||
|
hikari:
|
||||||
|
connection-init-sql: "SET TIME ZONE 'Asia/Chongqing';SET CLIENT_ENCODING TO 'UTF-8';"
|
||||||
|
connection-test-query: "select 1;"
|
||||||
|
main:
|
||||||
|
allow-bean-definition-overriding: true
|
||||||
|
|
||||||
|
logging:
|
||||||
|
config: classpath:logback-spring.xml
|
||||||
|
|
||||||
|
# kafka监控
|
||||||
|
kafka-monitor:
|
||||||
|
enabled: true
|
||||||
|
notify-kafka:
|
||||||
|
cluster-id: 95
|
||||||
|
topic-name: kmo_monitor
|
||||||
@@ -11,16 +11,13 @@ spring:
|
|||||||
name: kafkamanager
|
name: kafkamanager
|
||||||
datasource:
|
datasource:
|
||||||
kafka-manager:
|
kafka-manager:
|
||||||
|
driver-class-name: org.mariadb.jdbc.Driver
|
||||||
jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
|
jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
|
||||||
username: admin
|
username: admin
|
||||||
password: admin
|
password: admin
|
||||||
driver-class-name: org.mariadb.jdbc.Driver
|
|
||||||
main:
|
main:
|
||||||
allow-bean-definition-overriding: true
|
allow-bean-definition-overriding: true
|
||||||
|
|
||||||
profiles:
|
|
||||||
active: dev
|
|
||||||
|
|
||||||
logging:
|
logging:
|
||||||
config: classpath:logback-spring.xml
|
config: classpath:logback-spring.xml
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user