From 47265bb8d355a0d5d857cfde98776de63addc4ca Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 24 Mar 2020 06:19:53 +0000 Subject: [PATCH 01/13] Bump zookeeper from 3.4.6 to 3.4.14 Bumps zookeeper from 3.4.6 to 3.4.14. Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9fe35d43..7a111df2 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ org.apache.zookeeper zookeeper - 3.4.6 + 3.4.14 org.slf4j From 8ef549de805084e56028f8b762eee8a657328fe9 Mon Sep 17 00:00:00 2001 From: eilenexuzhe Date: Fri, 5 Jun 2020 18:03:16 +0800 Subject: [PATCH 02/13] feat bugfix --- console/src/container/admin-home/index.tsx | 5 ++-- .../src/container/admin-usermanage/index.tsx | 6 ++--- .../src/container/broker-info/base-info.tsx | 26 +++++++++++++------ .../src/container/modal/leader-rebalance.tsx | 9 +++++-- console/src/store/broker.ts | 15 +++++++++-- console/src/store/users.ts | 2 +- 6 files changed, 45 insertions(+), 18 deletions(-) diff --git a/console/src/container/admin-home/index.tsx b/console/src/container/admin-home/index.tsx index 640fdf0a..a53d9df3 100644 --- a/console/src/container/admin-home/index.tsx +++ b/console/src/container/admin-home/index.tsx @@ -10,7 +10,7 @@ import { IClusterData } from 'types/base-type'; const TabPane = Tabs.TabPane; -const detailUrl ='/admin/cluster_detail?clusterId='; +const detailUrl = '/admin/cluster_detail?clusterId='; const collectionColumns: Array> = [ { @@ -24,7 +24,8 @@ const collectionColumns: Array> = [ key: 'clusterName', sorter: (a: IClusterData, b: IClusterData) => a.clusterName.charCodeAt(0) - b.clusterName.charCodeAt(0), render: (text, record) => { - return {record.clusterName}; + const url = `${detailUrl}${record.clusterId}&clusterName=${record.clusterName}`; + return {record.clusterName}; }, }, { diff --git a/console/src/container/admin-usermanage/index.tsx b/console/src/container/admin-usermanage/index.tsx index ee6007ba..7bb4c47c 100644 --- a/console/src/container/admin-usermanage/index.tsx +++ b/console/src/container/admin-usermanage/index.tsx @@ -39,10 +39,10 @@ export class UserManage extends SearchAndFilter { public renderColumns = () => { const role = Object.assign({ title: '角色', - key: 'role', - dataIndex: 'role', + key: 'roleName', + dataIndex: 'roleName', filters: users.filterRole, - onFilter: (value: string, record: any) => record.role.indexOf(value) === 0, + onFilter: (value: string, record: any) => record.roleName.indexOf(value) === 0, }, this.renderColumnsFilter('filterVisible')); return [ diff --git a/console/src/container/broker-info/base-info.tsx b/console/src/container/broker-info/base-info.tsx index 514b1e53..df26b25a 100644 --- a/console/src/container/broker-info/base-info.tsx +++ b/console/src/container/broker-info/base-info.tsx @@ -1,6 +1,6 @@ import * as React from 'react'; import './index.less'; -import { Table, Modal, notification, PaginationConfig, Button } from 'component/antd'; +import { Table, Modal, notification, PaginationConfig, Button, Spin } from 'component/antd'; import { broker, IBroker, IBrokerNetworkInfo, IBrokerPartition } from 'store/broker'; import { observer } from 'mobx-react'; import { StatusGraghCom } from 'component/flow-table'; @@ -49,10 +49,19 @@ export class BrokerList extends SearchAndFilter { const status = Object.assign({ title: '已同步', - dataIndex: 'status', - key: 'status', - filters: [{ text: '是', value: '是' }, { text: '否', value: '否' }], - onFilter: (value: string, record: IBrokerPartition) => record.status === value, + dataIndex: 'underReplicatedPartitionCount', + key: 'underReplicatedPartitionCount', + filters: [{ text: '是', value: '1' }, { text: '否', value: '0' }], + onFilter: (value: string, record: IBrokerPartition) => { + // underReplicatedPartitionCount > 0 表示未同步完成 + const syncStatus = record.underReplicatedPartitionCount ? '0' : '1'; + return syncStatus === value; + }, + render: (text: number) => ( + <> + {text ? '否' : '是'} + + ), }, this.renderColumnsFilter('filterVisible')); return [{ @@ -80,7 +89,8 @@ export class BrokerList extends SearchAndFilter { title: '未同步副本数量', dataIndex: 'notUnderReplicatedPartitionCount', key: 'notUnderReplicatedPartitionCount', - sorter: (a: IBrokerPartition, b: IBrokerPartition) => a.notUnderReplicatedPartitionCount - b.notUnderReplicatedPartitionCount, + sorter: (a: IBrokerPartition, b: IBrokerPartition) => + a.notUnderReplicatedPartitionCount - b.notUnderReplicatedPartitionCount, }, status, region, @@ -205,7 +215,7 @@ export class BrokerList extends SearchAndFilter { const dataPartitions = this.state.searchId !== '' ? broker.partitions.filter((d) => d.brokerId === +this.state.searchId) : broker.partitions; return ( - <> +
  • Broker概览
  • @@ -239,7 +249,7 @@ export class BrokerList extends SearchAndFilter { pagination={pagination} />
- +
); } } diff --git a/console/src/container/modal/leader-rebalance.tsx b/console/src/container/modal/leader-rebalance.tsx index aefbb978..57d4ff7d 100644 --- a/console/src/container/modal/leader-rebalance.tsx +++ b/console/src/container/modal/leader-rebalance.tsx @@ -45,14 +45,19 @@ class LeaderRebalance extends React.Component { constructor(props: any) { super(props); const url = Url(); - this.clusterName = decodeURI(atob(url.search.clusterName)); + if (url.search.clusterName) { + this.clusterName = decodeURI(url.search.clusterName); + } this.clusterId = Number(url.search.clusterId); } public handleSubmit = (e: React.MouseEvent) => { e.preventDefault(); - this.setState({ loading: true }); this.props.form.validateFieldsAndScroll((err: any, values: any) => { + if (err) { + return; + } + this.setState({ loading: true }); this.brokerId = Number(values.brokerId); addRebalance({ brokerId: this.brokerId, clusterId: this.clusterId, dimension: 0 }).then(() => { cluster.getRebalance(this.clusterId).then(() => { diff --git a/console/src/store/broker.ts b/console/src/store/broker.ts index 863ac487..f09470bf 100644 --- a/console/src/store/broker.ts +++ b/console/src/store/broker.ts @@ -35,6 +35,7 @@ export interface IBrokerPartition extends IBroker { leaderCount: number; partitionCount: number; notUnderReplicatedPartitionCount: number; + underReplicatedPartitionCount?: number; regionName: string; bytesInPerSec: number; } @@ -74,6 +75,9 @@ interface IBrokerOption { } class Broker { + @observable + public loading: boolean = false; + @observable public brokerBaseInfo: IBrokerBaseInfo = {} as IBrokerBaseInfo; @@ -119,6 +123,11 @@ class Broker { @observable public BrokerOptions: IValueLabel[] = [{ value: null, label: '请选择Broker' }]; + @action.bound + public setLoading(value: boolean) { + this.loading = value; + } + @action.bound public setBrokerBaseInfo(data: IBrokerBaseInfo) { data.startTime = moment(data.startTime).format('YYYY-MM-DD HH:mm:ss'), @@ -216,7 +225,8 @@ class Broker { } public getBrokerList(clusterId: number) { - getBrokerList(clusterId).then(this.setBrokerList); + this.setLoading(true); + getBrokerList(clusterId).then(this.setBrokerList).finally(() => this.setLoading(false)); } public getBrokerNetwork(clusterId: number) { @@ -224,7 +234,8 @@ class Broker { } public getBrokerPartition(clusterId: number) { - getBrokerPartition(clusterId).then(this.setBrokerPartition); + this.setLoading(true); + getBrokerPartition(clusterId).then(this.setBrokerPartition).finally(() => this.setLoading(false)); } public getOneBrokerNetwork(clusterId: number, brokerId: number) { diff --git a/console/src/store/users.ts b/console/src/store/users.ts index de4012e6..0a74bd39 100644 --- a/console/src/store/users.ts +++ b/console/src/store/users.ts @@ -17,7 +17,7 @@ export class Users { @action.bound public setUserData(data: []) { this.userData = data.map((d: any) => { - d.role = this.roleMap[d.role]; + d.roleName = this.roleMap[d.role]; return d; }); } From 0cd31e0545a5469ddd1db260ccc9d48b50ed5e66 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 1 Jul 2020 10:56:55 +0800 Subject: [PATCH 03/13] fix retentionTime when create topic --- .../kafka/manager/web/api/versionone/OrderController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java b/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java index 29ed7ea7..be6e66ea 100644 --- a/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java +++ b/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java @@ -176,7 +176,7 @@ public class OrderController { TopicDO topicInfoDO = OrderConverter.convert2TopicInfoDO(orderTopicDO); List brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList()); Properties topicConfig = new Properties(); - topicConfig.setProperty("retention.ms", String.valueOf(reqObj.getRetentionTime())); + topicConfig.setProperty("retention.ms", String.valueOf(reqObj.getRetentionTime() * 60 * 60 * 1000)); try { TopicMetadata topicMetadata = new TopicMetadata(); topicMetadata.setTopic(orderTopicDO.getTopicName()); From dc5949d4971ccefa1defd5052dc7c72625c4ef96 Mon Sep 17 00:00:00 2001 From: Yang Jing Date: Thu, 2 Jul 2020 16:14:23 +0800 Subject: [PATCH 04/13] =?UTF-8?q?=E7=AE=A1=E7=90=86=E7=AB=AF=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E6=B7=BB=E5=8A=A0=20PostgreSQL=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E6=94=AF=E6=8C=81=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 22 +- console/package-lock.json | 57 ++--- dao/pom.xml | 4 + doc/create_postgresql_table.sql | 323 +++++++++++++++++++++++++ web/src/main/resources/application.yml | 9 +- 5 files changed, 372 insertions(+), 43 deletions(-) create mode 100644 doc/create_postgresql_table.sql diff --git a/README.md b/README.md index 0f3cd6ea..518965a5 100644 --- a/README.md +++ b/README.md @@ -45,12 +45,14 @@ - `Maven 3.5.0+`(后端打包依赖) - `node v8.12.0+`(前端打包依赖) - `Java 8+`(运行环境需要) -- `MySQL`(数据存储) +- `MySQL` 或 `PostgreSQL`(数据存储) --- ### 环境初始化 +**MySQL** + 执行[create_mysql_table.sql](doc/create_mysql_table.sql)中的SQL命令,从而创建所需的MySQL库及表,默认创建的库名是`kafka_manager`。 ``` @@ -58,6 +60,24 @@ mysql -uXXXX -pXXX -h XXX.XXX.XXX.XXX -PXXXX < ./create_mysql_table.sql ``` +**PostgreSQL** + +执行[create_postgresql_table.sql](doc/create_postgresql_table.sql)中的SQL命令,从而创建所需的PostgreSQL表。 + +``` +############# 示例: +psql -h XXX.XXX.XXX.XXX -U XXXX -d kafka_manager -f ./create_postgresql_table.sql +``` + +*PostgreSQL 用户、数据库创建方式* + +```sql +create user admin encrypted password 'admin'; +create database kafka_manager owner=admin template=template0 encoding='UTF-8' lc_collate='zh_CN.UTF-8' lc_ctype='zh_CN.UTF-8'; +``` + +***根据所选择的数据库,请修改 [application.yml](web/src/main/resources/application.yml) 相应配置。*** + --- diff --git a/console/package-lock.json b/console/package-lock.json index 1e6cc369..5b64b1b2 100644 --- a/console/package-lock.json +++ b/console/package-lock.json @@ -751,8 +751,7 @@ "version": "1.0.0", "resolved": "http://registry.npm.taobao.org/assert-plus/download/assert-plus-1.0.0.tgz", "integrity": "sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU=", - "dev": true, - "optional": true + "dev": true }, "assign-symbols": { "version": "1.0.0", @@ -1505,7 +1504,6 @@ "resolved": "http://registry.npm.taobao.org/combined-stream/download/combined-stream-1.0.7.tgz", "integrity": "sha1-LR0kMXr7ir6V1tLAsHtXgTU52Cg=", "dev": true, - "optional": true, "requires": { "delayed-stream": "~1.0.0" } @@ -2222,8 +2220,7 @@ "version": "1.0.0", "resolved": "http://registry.npm.taobao.org/delayed-stream/download/delayed-stream-1.0.0.tgz", "integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=", - "dev": true, - "optional": true + "dev": true }, "depd": { "version": "1.1.2", @@ -2941,8 +2938,7 @@ "version": "1.3.0", "resolved": "http://registry.npm.taobao.org/extsprintf/download/extsprintf-1.3.0.tgz", "integrity": "sha1-lpGEQOMEGnpBT4xS48V06zw+HgU=", - "dev": true, - "optional": true + "dev": true }, "fast-deep-equal": { "version": "2.0.1", @@ -3358,8 +3354,7 @@ "ansi-regex": { "version": "2.1.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "aproba": { "version": "1.2.0", @@ -3380,14 +3375,12 @@ "balanced-match": { "version": "1.0.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "brace-expansion": { "version": "1.1.11", "bundled": true, "dev": true, - "optional": true, "requires": { "balanced-match": "^1.0.0", "concat-map": "0.0.1" @@ -3402,20 +3395,17 @@ "code-point-at": { "version": "1.1.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "concat-map": { "version": "0.0.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "console-control-strings": { "version": "1.1.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "core-util-is": { "version": "1.0.2", @@ -3532,8 +3522,7 @@ "inherits": { "version": "2.0.3", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "ini": { "version": "1.3.5", @@ -3545,7 +3534,6 @@ "version": "1.0.0", "bundled": true, "dev": true, - "optional": true, "requires": { "number-is-nan": "^1.0.0" } @@ -3560,7 +3548,6 @@ "version": "3.0.4", "bundled": true, "dev": true, - "optional": true, "requires": { "brace-expansion": "^1.1.7" } @@ -3568,14 +3555,12 @@ "minimist": { "version": "0.0.8", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "minipass": { "version": "2.3.5", "bundled": true, "dev": true, - "optional": true, "requires": { "safe-buffer": "^5.1.2", "yallist": "^3.0.0" @@ -3594,7 +3579,6 @@ "version": "0.5.1", "bundled": true, "dev": true, - "optional": true, "requires": { "minimist": "0.0.8" } @@ -3675,8 +3659,7 @@ "number-is-nan": { "version": "1.0.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "object-assign": { "version": "4.1.1", @@ -3688,7 +3671,6 @@ "version": "1.4.0", "bundled": true, "dev": true, - "optional": true, "requires": { "wrappy": "1" } @@ -3774,8 +3756,7 @@ "safe-buffer": { "version": "5.1.2", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "safer-buffer": { "version": "2.1.2", @@ -3811,7 +3792,6 @@ "version": "1.0.2", "bundled": true, "dev": true, - "optional": true, "requires": { "code-point-at": "^1.0.0", "is-fullwidth-code-point": "^1.0.0", @@ -3831,7 +3811,6 @@ "version": "3.0.1", "bundled": true, "dev": true, - "optional": true, "requires": { "ansi-regex": "^2.0.0" } @@ -3875,14 +3854,12 @@ "wrappy": { "version": "1.0.2", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "yallist": { "version": "3.0.3", "bundled": true, - "dev": true, - "optional": true + "dev": true } } }, @@ -4864,8 +4841,7 @@ "version": "0.1.1", "resolved": "http://registry.npm.taobao.org/jsbn/download/jsbn-0.1.1.tgz", "integrity": "sha1-peZUwuWi3rXyAdls77yoDA7y9RM=", - "dev": true, - "optional": true + "dev": true }, "json-parse-better-errors": { "version": "1.0.2", @@ -8883,8 +8859,7 @@ "version": "0.14.5", "resolved": "http://registry.npm.taobao.org/tweetnacl/download/tweetnacl-0.14.5.tgz", "integrity": "sha1-WuaBd/GS1EViadEIr6k/+HQ/T2Q=", - "dev": true, - "optional": true + "dev": true }, "type-is": { "version": "1.6.16", diff --git a/dao/pom.xml b/dao/pom.xml index 4b789b3b..0e0cf71f 100644 --- a/dao/pom.xml +++ b/dao/pom.xml @@ -43,5 +43,9 @@ mariadb-java-client 2.5.4
+ + org.postgresql + postgresql + \ No newline at end of file diff --git a/doc/create_postgresql_table.sql b/doc/create_postgresql_table.sql new file mode 100644 index 00000000..cdf04388 --- /dev/null +++ b/doc/create_postgresql_table.sql @@ -0,0 +1,323 @@ +-- CREATE DATABASE kafka_manager; +-- \c kafka_manager; +SET TIME ZONE 'Asia/Chongqing'; +SET CLIENT_ENCODING TO 'UTF-8'; + +CREATE OR REPLACE FUNCTION on_update_timestamp() RETURNS TRIGGER AS +$$ +BEGIN + new.gmt_modify = current_timestamp; + return new; +END; +$$ LANGUAGE plpgsql; + +-- 账号表 +CREATE TABLE account +( + id bigserial NOT NULL, -- 'ID', + username varchar(64) NOT NULL UNIQUE DEFAULT '', -- '用户名', + password varchar(128) NOT NULL DEFAULT '', -- '密码', + role int NOT NULL DEFAULT 0, -- '角色类型, 0:普通用户', + status int NOT NULL DEFAULT 0, -- '0标识使用中,-1标识已废弃', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT account_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX account_uniq_username ON account (username); +INSERT INTO account(username, password, role) +VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2); +CREATE TRIGGER account_trig_gmt_modify + BEFORE UPDATE + ON account + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- 告警规则表 +CREATE TABLE alarm_rule +( + id bigserial, -- '自增ID', + alarm_name varchar(128) NOT NULL DEFAULT '', -- '告警名字', + strategy_expressions text, -- '表达式', + strategy_filters text, -- '过滤条件', + strategy_actions text, -- '响应', + principals varchar(512) NOT NULL DEFAULT '', -- '负责人', + status int2 NOT NULL DEFAULT 1, -- '-1:逻辑删除, 0:关闭, 1:正常', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT alarm_rule_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX alarm_rule_uniq_alarm_name ON alarm_rule (alarm_name); +CREATE TRIGGER alarm_rule_trig_gmt_modify + BEFORE UPDATE + ON alarm_rule + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- Broker信息表 +CREATE TABLE broker +( + id bigserial, -- 'id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID', + host varchar(128) NOT NULL DEFAULT '', -- 'Broker主机名', + port int NOT NULL DEFAULT '-1', -- 'Broker端口', + timestamp bigint NOT NULL DEFAULT '-1', -- '启动时间', + status int NOT NULL DEFAULT '0', -- '状态0有效,-1无效', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT broker_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX broker_uniq_cluster_id_broker_id ON broker (cluster_id, broker_id); +CREATE TRIGGER broker_trig_gmt_modify + BEFORE UPDATE + ON broker + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- BrokerMetric信息表 +CREATE TABLE broker_metrics +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID', + bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入', + bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出', + bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒被拒绝字节数', + messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数流入', + fail_fetch_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费失败数', + fail_produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒失败生产数', + fetch_consumer_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费请求数', + produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒生产数', + request_handler_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '请求处理器繁忙百分比', + network_processor_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '网络处理器繁忙百分比', + request_queue_size bigint NOT NULL DEFAULT '0', -- '请求列表大小', + response_queue_size bigint NOT NULL DEFAULT '0', -- '响应列表大小', + log_flush_time decimal(53, 2) NOT NULL DEFAULT '0.00', -- '刷日志时间', + total_time_produce_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-平均值', + total_time_produce_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-99分位', + total_time_fetch_consumer_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-平均值', + total_time_fetch_consumer_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-99分位', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + CONSTRAINT broker_metrics_pk PRIMARY KEY (id) +); +CREATE INDEX broker_metrics_idx_cluster_id_broker_id_gmt_create ON broker_metrics (cluster_id, broker_id, gmt_create); + +-- Cluster表 +CREATE TABLE cluster +( + id bigserial, -- '集群ID', + cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称', + zookeeper varchar(512) NOT NULL DEFAULT '', -- 'ZK地址', + bootstrap_servers varchar(512) NOT NULL DEFAULT '', -- 'Server地址', + kafka_version varchar(32) NOT NULL DEFAULT '', -- 'Kafka版本', + alarm_flag int2 NOT NULL DEFAULT '0', -- '0:不开启告警, 1开启告警', + security_protocol varchar(512) NOT NULL DEFAULT '', -- '安全协议', + sasl_mechanism varchar(512) NOT NULL DEFAULT '', -- '安全机制', + sasl_jaas_config varchar(512) NOT NULL DEFAULT '', -- 'Jaas配置', + status int2 NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT cluster_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX cluster_uniq_cluster_name ON cluster (cluster_name); +CREATE TRIGGER cluster_trig_gmt_modify + BEFORE UPDATE + ON cluster + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- ClusterMetrics信息 +CREATE TABLE cluster_metrics +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID', + topic_num int NOT NULL DEFAULT '0', -- 'Topic数', + partition_num int NOT NULL DEFAULT '0', -- '分区数', + broker_num int NOT NULL DEFAULT '0', -- 'Broker数', + bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流入(B)', + bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流出(B)', + bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝(B)', + messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数(条)', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + CONSTRAINT cluster_metrics_pk PRIMARY KEY (id) +); +CREATE INDEX cluster_metrics_idx_cluster_id_gmt_create ON cluster_metrics (cluster_id, gmt_create); + +-- Controller历史变更记录表 +CREATE TABLE controller +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + broker_id int NOT NULL DEFAULT '-1', -- 'BrokerId', + host varchar(256) NOT NULL DEFAULT '', -- '主机名', + timestamp bigint NOT NULL DEFAULT '-1', -- 'Controller变更时间', + version int NOT NULL DEFAULT '-1', -- 'Controller格式版本', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + CONSTRAINT controller_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX controller_uniq_cluster_id_broker_id_timestamp ON controller (cluster_id, broker_id, timestamp); + +-- Topic迁移信息 +CREATE TABLE migration_task +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + reassignment_json text, -- '任务参数', + real_throttle bigint NOT NULL DEFAULT '0', -- '实际限流值(B/s)', + operator varchar(128) NOT NULL DEFAULT '', -- '操作人', + description varchar(256) NOT NULL DEFAULT '', -- '备注说明', + status int NOT NULL DEFAULT '0', -- '任务状态', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务修改时间', + CONSTRAINT migration_task_pk PRIMARY KEY (id) +); +CREATE TRIGGER migration_task_trig_gmt_modify + BEFORE UPDATE + ON migration_task + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +CREATE TABLE operation_history +( + id bigserial, -- 'id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + operator varchar(128) NOT NULL DEFAULT '', -- '操作人', + operation varchar(256) NOT NULL DEFAULT '', -- '操作描述', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + PRIMARY KEY (id) +); +--='操作记录表'; + +-- 分区申请工单 +CREATE TABLE order_partition +( + id bigserial, -- 'id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + applicant varchar(128) NOT NULL DEFAULT '', -- '申请人', + peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流量', + description text, -- '备注信息', + order_status int NOT NULL DEFAULT '0', -- '工单状态', + approver varchar(128) NOT NULL DEFAULT '', -- '审批人', + opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见', + status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT order_partition_pk PRIMARY KEY (id) +); +CREATE TRIGGER order_partition_trig_gmt_modify + BEFORE UPDATE + ON order_partition + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- Topic申请工单 +CREATE TABLE order_topic +( + id bigserial, -- 'ID', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + retention_time bigint NOT NULL DEFAULT '-1', -- '保留时间(ms)', + partition_num int NOT NULL DEFAULT '-1', -- '分区数', + replica_num int NOT NULL DEFAULT '-1', -- '副本数', + regions varchar(128) NOT NULL DEFAULT '', -- 'RegionId列表', + brokers varchar(128) NOT NULL DEFAULT '', -- 'Broker列表', + peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流入流量(KB)', + applicant varchar(128) NOT NULL DEFAULT '', -- '申请人', + principals varchar(256) NOT NULL DEFAULT '', -- '负责人', + description text, -- '备注信息', + order_status int NOT NULL DEFAULT '0', -- '工单状态', + approver varchar(128) NOT NULL DEFAULT '', -- '审批人', + opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见', + status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT order_topic_pk PRIMARY KEY (id) +); +CREATE TRIGGER order_topic_trig_gmt_modify + BEFORE UPDATE + ON order_topic + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- Region信息表 +CREATE TABLE region +( + id bigserial, -- 'id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + region_name varchar(128) NOT NULL DEFAULT '', -- 'Region名称', + broker_list varchar(256) NOT NULL DEFAULT '', -- 'Broker列表', + level int NOT NULL DEFAULT '0', -- 'Region重要等级, 0级普通, 1极重要,2级极重要', + operator varchar(45) NOT NULL DEFAULT '', -- '操作人', + description text, -- '备注说明', + status int NOT NULL DEFAULT '0', -- '状态,0正常,-1废弃,1容量已满', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT region_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX region_uniq_cluster_id_region_name ON region (cluster_id, region_name); +CREATE TRIGGER region_trig_gmt_modify + BEFORE UPDATE + ON region + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- Topic信息表 +CREATE TABLE topic +( + id bigserial, -- 'ID', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + applicant varchar(256) NOT NULL DEFAULT '', -- '申请人', + principals varchar(256) NOT NULL DEFAULT '', -- '负责人', + description text, -- '备注信息', + status int NOT NULL DEFAULT '0', -- '0标识使用中,-1标识已废弃', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT topic_pk PRIMARY KEY (id) +); --=''; +CREATE UNIQUE INDEX topic_uniq_cluster_id_topic_name ON topic (cluster_id, topic_name); +CREATE TRIGGER topic_trig_gmt_modify + BEFORE UPDATE + ON topic + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- 用户收藏的Topic表 +CREATE TABLE topic_favorite +( + id bigserial, -- '自增Id', + username varchar(64) NOT NULL DEFAULT '', -- '用户名', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + status int NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT topic_favorite_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX topic_favorite_uniq_username_cluster_id_topic_name ON topic_favorite (username, cluster_id, topic_name); +CREATE TRIGGER topic_favorite_trig_gmt_modify + BEFORE UPDATE + ON topic_favorite + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- TopicMetrics表 +CREATE TABLE topic_metrics +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒进入消息条数', + bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入', + bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出', + bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝字节数', + total_produce_requests decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒请求数', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + CONSTRAINT topic_metrics_pk PRIMARY KEY (id) +); +CREATE INDEX topic_metrics_idx_cluster_id_topic_name_gmt_create ON topic_metrics (cluster_id, topic_name, gmt_create); diff --git a/web/src/main/resources/application.yml b/web/src/main/resources/application.yml index b0363881..5b1a5cff 100644 --- a/web/src/main/resources/application.yml +++ b/web/src/main/resources/application.yml @@ -11,10 +11,17 @@ spring: name: kafkamanager datasource: kafka-manager: + #driver-class-name: org.postgresql.Driver + #jdbc-url: jdbc:postgresql://localhost:5432/kafka_manager?reWriteBatchedInserts=true + driver-class-name: org.mariadb.jdbc.Driver jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8 username: admin password: admin - driver-class-name: org.mariadb.jdbc.Driver + type: com.zaxxer.hikari.HikariDataSource + hikari: + # PostgreSQL 使用 + #connection-init-sql: "SET TIME ZONE 'Asia/Chongqing';SET CLIENT_ENCODING TO 'UTF-8';" + connection-test-query: "select 1;" main: allow-bean-definition-overriding: true From 93eca239cb3d25f12924555d0b13d7cef7a2c030 Mon Sep 17 00:00:00 2001 From: Yang Jing Date: Thu, 2 Jul 2020 21:39:02 +0800 Subject: [PATCH 05/13] =?UTF-8?q?=E9=80=9A=E8=BF=87=20spring.profiles.acti?= =?UTF-8?q?ve=20=E6=8C=87=E5=AE=9A=E9=85=8D=E7=BD=AE=E6=9D=A5=E9=80=89?= =?UTF-8?q?=E6=8B=A9=E5=90=8E=E7=AB=AF=E4=B8=8D=E5=90=8C=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E7=9A=84=E9=85=8D=E7=BD=AE=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- web/src/main/resources/application-mysql.yml | 32 +++++++++++++++++ web/src/main/resources/application-pg.yml | 36 +++++++++++++++++++ web/src/main/resources/application.yml | 38 +------------------- 4 files changed, 70 insertions(+), 38 deletions(-) create mode 100644 web/src/main/resources/application-mysql.yml create mode 100644 web/src/main/resources/application-pg.yml diff --git a/README.md b/README.md index 518965a5..48e45106 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ create user admin encrypted password 'admin'; create database kafka_manager owner=admin template=template0 encoding='UTF-8' lc_collate='zh_CN.UTF-8' lc_ctype='zh_CN.UTF-8'; ``` -***根据所选择的数据库,请修改 [application.yml](web/src/main/resources/application.yml) 相应配置。*** +***根据所选择的数据库,使用 `-Dspring.profiles.active=pg` 指定配置文件,默认使用 `mysql`。*** --- diff --git a/web/src/main/resources/application-mysql.yml b/web/src/main/resources/application-mysql.yml new file mode 100644 index 00000000..81c49e28 --- /dev/null +++ b/web/src/main/resources/application-mysql.yml @@ -0,0 +1,32 @@ +server: + port: 8080 + tomcat: + accept-count: 100 + max-connections: 1000 + max-threads: 20 + min-spare-threads: 20 + +spring: + application: + name: kafkamanager + datasource: + kafka-manager: + driver-class-name: org.mariadb.jdbc.Driver + jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8 + username: admin + password: admin + main: + allow-bean-definition-overriding: true + + profiles: + active: dev + +logging: + config: classpath:logback-spring.xml + +# kafka监控 +kafka-monitor: + enabled: true + notify-kafka: + cluster-id: 95 + topic-name: kmo_monitor diff --git a/web/src/main/resources/application-pg.yml b/web/src/main/resources/application-pg.yml new file mode 100644 index 00000000..e3530369 --- /dev/null +++ b/web/src/main/resources/application-pg.yml @@ -0,0 +1,36 @@ +server: + port: 8080 + tomcat: + accept-count: 100 + max-connections: 1000 + max-threads: 20 + min-spare-threads: 20 + +spring: + application: + name: kafkamanager + datasource: + kafka-manager: + driver-class-name: org.postgresql.Driver + jdbc-url: jdbc:postgresql://localhost:5432/kafka_manager?reWriteBatchedInserts=true + username: admin + password: admin + type: com.zaxxer.hikari.HikariDataSource + hikari: + connection-init-sql: "SET TIME ZONE 'Asia/Chongqing';SET CLIENT_ENCODING TO 'UTF-8';" + connection-test-query: "select 1;" + main: + allow-bean-definition-overriding: true + + profiles: + active: dev + +logging: + config: classpath:logback-spring.xml + +# kafka监控 +kafka-monitor: + enabled: true + notify-kafka: + cluster-id: 95 + topic-name: kmo_monitor diff --git a/web/src/main/resources/application.yml b/web/src/main/resources/application.yml index 5b1a5cff..dc1be31b 100644 --- a/web/src/main/resources/application.yml +++ b/web/src/main/resources/application.yml @@ -1,39 +1,3 @@ -server: - port: 8080 - tomcat: - accept-count: 100 - max-connections: 1000 - max-threads: 20 - min-spare-threads: 20 - spring: - application: - name: kafkamanager - datasource: - kafka-manager: - #driver-class-name: org.postgresql.Driver - #jdbc-url: jdbc:postgresql://localhost:5432/kafka_manager?reWriteBatchedInserts=true - driver-class-name: org.mariadb.jdbc.Driver - jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8 - username: admin - password: admin - type: com.zaxxer.hikari.HikariDataSource - hikari: - # PostgreSQL 使用 - #connection-init-sql: "SET TIME ZONE 'Asia/Chongqing';SET CLIENT_ENCODING TO 'UTF-8';" - connection-test-query: "select 1;" - main: - allow-bean-definition-overriding: true - profiles: - active: dev - -logging: - config: classpath:logback-spring.xml - -# kafka监控 -kafka-monitor: - enabled: true - notify-kafka: - cluster-id: 95 - topic-name: kmo_monitor + active: mysql \ No newline at end of file From ac86f8aded5d02d61468a8906d950c181121a039 Mon Sep 17 00:00:00 2001 From: Yang Jing Date: Sun, 5 Jul 2020 00:42:55 +0800 Subject: [PATCH 06/13] =?UTF-8?q?=E5=BD=93=E4=BD=BF=E7=94=A8=20PostgreSQL?= =?UTF-8?q?=20=E6=95=B0=E6=8D=AE=E5=BA=93=E6=97=B6=EF=BC=8C=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=20insert=20on=20conflict=20=E6=9B=BF=E4=BB=A3=20MySQL?= =?UTF-8?q?=20=E7=9A=84=20replace=20SQL=E8=AF=AD=E5=8F=A5=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/manager/dao/impl/BrokerDaoImpl.java | 6 +++++ .../dao/impl/KafkaManagerProperties.java | 22 +++++++++++++++++++ .../kafka/manager/dao/impl/RegionDaoImpl.java | 6 +++++ .../kafka/manager/dao/impl/TopicDaoImpl.java | 6 +++++ .../dao/impl/TopicFavoriteDaoImpl.java | 6 +++++ dao/src/main/resources/mapper/BrokerDao.xml | 10 +++++++++ dao/src/main/resources/mapper/RegionDao.xml | 9 ++++++++ dao/src/main/resources/mapper/TopicDao.xml | 9 ++++++++ .../resources/mapper/TopicFavoriteDao.xml | 9 ++++++++ .../service/impl/AdminTopicServiceImpl.java | 2 ++ web/src/main/resources/application-mysql.yml | 3 --- web/src/main/resources/application-pg.yml | 3 --- 12 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/KafkaManagerProperties.java diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerDaoImpl.java index 397ef54c..16cda81f 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerDaoImpl.java @@ -19,12 +19,18 @@ public class BrokerDaoImpl implements BrokerDao { @Autowired private SqlSessionTemplate sqlSession; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @Override public int replace(BrokerDO brokerInfoDO) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("BrokerDao.replaceOnPG", brokerInfoDO); + } return sqlSession.insert("BrokerDao.replace", brokerInfoDO); } diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/KafkaManagerProperties.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/KafkaManagerProperties.java new file mode 100644 index 00000000..d762c4ce --- /dev/null +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/KafkaManagerProperties.java @@ -0,0 +1,22 @@ +package com.xiaojukeji.kafka.manager.dao.impl; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Component +@ConfigurationProperties("spring.datasource.kafka-manager") +public class KafkaManagerProperties { + private String jdbcUrl; + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public boolean hasPG() { + return jdbcUrl.startsWith("jdbc:postgres"); + } +} diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/RegionDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/RegionDaoImpl.java index 1b9b17ad..7f62ed5c 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/RegionDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/RegionDaoImpl.java @@ -17,12 +17,18 @@ public class RegionDaoImpl implements RegionDao { @Autowired private SqlSessionTemplate sqlSession; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @Override public int insert(RegionDO regionDO) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("RegionDao.insertOnPG", regionDO); + } return sqlSession.insert("RegionDao.insert", regionDO); } diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java index f5a65ac3..7c5bac5f 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java @@ -19,12 +19,18 @@ public class TopicDaoImpl implements TopicDao { @Autowired private SqlSessionTemplate sqlSession; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @Override public int replace(TopicDO topicDO) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("TopicDao.replaceOnPG", topicDO); + } return sqlSession.insert("TopicDao.replace", topicDO); } diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicFavoriteDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicFavoriteDaoImpl.java index 17170028..f8d0c7fc 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicFavoriteDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicFavoriteDaoImpl.java @@ -25,12 +25,18 @@ public class TopicFavoriteDaoImpl implements TopicFavoriteDao { @Autowired private TransactionTemplate transactionTemplate; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @Override public int batchAdd(List topicFavoriteDOList) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("TopicFavoriteDao.batchAddOnPG", topicFavoriteDOList); + } return sqlSession.insert("TopicFavoriteDao.batchAdd", topicFavoriteDOList); } diff --git a/dao/src/main/resources/mapper/BrokerDao.xml b/dao/src/main/resources/mapper/BrokerDao.xml index 4ecacd25..7ef90b01 100644 --- a/dao/src/main/resources/mapper/BrokerDao.xml +++ b/dao/src/main/resources/mapper/BrokerDao.xml @@ -20,6 +20,16 @@ (#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status}) + + insert into broker + (cluster_id, broker_id, host, port, timestamp, status) + values + on conflict (cluster_id, broker_id) do update set host = excluded.host, + port = excluded.port, + timestamp = excluded.timestamp, + status = excluded.status + + DELETE FROM broker WHERE cluster_id = #{clusterId} AND broker_id = #{brokerId} diff --git a/dao/src/main/resources/mapper/RegionDao.xml b/dao/src/main/resources/mapper/RegionDao.xml index 591decb9..7b1c7daa 100644 --- a/dao/src/main/resources/mapper/RegionDao.xml +++ b/dao/src/main/resources/mapper/RegionDao.xml @@ -21,6 +21,15 @@ (#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator}) + + insert into region + (region_name, cluster_id, broker_list, description, operator) + values (#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator}) + on conflict (region_name, cluster_id) do update set broker_list = excluded.broker_list, + description = excluded.description, + operator = excluded.operator + + DELETE FROM region WHERE id = #{id} diff --git a/dao/src/main/resources/mapper/TopicDao.xml b/dao/src/main/resources/mapper/TopicDao.xml index 27d99e48..b078d4cf 100644 --- a/dao/src/main/resources/mapper/TopicDao.xml +++ b/dao/src/main/resources/mapper/TopicDao.xml @@ -19,6 +19,15 @@ (#{clusterId}, #{topicName}, #{principals}, #{description}, #{status}) + + insert into topic + (cluster_id, topic_name, principals, description, status) + values (#{clusterId}, #{topicName}, #{principals}, #{description}, #{status}) + on conflict (cluster_id, topic_name) do update set principals = excluded.principals, + description = excluded.description, + status = excluded.status + + DELETE FROM topic WHERE id = #{id} diff --git a/dao/src/main/resources/mapper/TopicFavoriteDao.xml b/dao/src/main/resources/mapper/TopicFavoriteDao.xml index 308a75ea..218a714c 100644 --- a/dao/src/main/resources/mapper/TopicFavoriteDao.xml +++ b/dao/src/main/resources/mapper/TopicFavoriteDao.xml @@ -21,6 +21,15 @@ + + insert into topic_favorite (cluster_id, topic_name, username) + values + + (#{TopicFavoriteDO.clusterId}, #{TopicFavoriteDO.topicName}, #{TopicFavoriteDO.username}) + + on conflict (cluster_id, topic_name, username) do update set gmt_modify = now(); + + DELETE FROM topic_favorite WHERE id=#{id} diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminTopicServiceImpl.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminTopicServiceImpl.java index f941dc63..3d245e61 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminTopicServiceImpl.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminTopicServiceImpl.java @@ -76,6 +76,7 @@ public class AdminTopicServiceImpl implements AdminTopicService { OperationHistoryDO operationHistoryDO = OperationHistoryDO.newInstance(topicDO.getClusterId(), topicDO.getTopicName(), operator, OperationEnum.CREATE_TOPIC.message); operationHistoryDao.insert(operationHistoryDO); topicDao.replace(topicDO); + } catch (Exception e) { return AdminTopicStatusEnum.REPLACE_DB_FAILED; } @@ -188,4 +189,5 @@ public class AdminTopicServiceImpl implements AdminTopicService { } return AdminTopicStatusEnum.SUCCESS; } + } diff --git a/web/src/main/resources/application-mysql.yml b/web/src/main/resources/application-mysql.yml index 81c49e28..ea9dcc7c 100644 --- a/web/src/main/resources/application-mysql.yml +++ b/web/src/main/resources/application-mysql.yml @@ -18,9 +18,6 @@ spring: main: allow-bean-definition-overriding: true - profiles: - active: dev - logging: config: classpath:logback-spring.xml diff --git a/web/src/main/resources/application-pg.yml b/web/src/main/resources/application-pg.yml index e3530369..174d2002 100644 --- a/web/src/main/resources/application-pg.yml +++ b/web/src/main/resources/application-pg.yml @@ -22,9 +22,6 @@ spring: main: allow-bean-definition-overriding: true - profiles: - active: dev - logging: config: classpath:logback-spring.xml From 27ce4d6a0d8215d1c0f82b765efe4675546ca0da Mon Sep 17 00:00:00 2001 From: Yang Jing Date: Sun, 5 Jul 2020 00:55:55 +0800 Subject: [PATCH 07/13] =?UTF-8?q?=E4=B8=BA=20AccountDao.insert=20=E4=B9=9F?= =?UTF-8?q?=E6=8F=90=E4=BE=9B=20PostgreSQL=20=E7=9A=84=20AccountDao.insert?= =?UTF-8?q?OnPG=20=E7=89=88=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/manager/dao/impl/AccountDaoImpl.java | 8 +++++++- dao/src/main/resources/mapper/AccountDao.xml | 12 ++++++++++++ dao/src/main/resources/mapper/BrokerDao.xml | 10 +++++----- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/AccountDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/AccountDaoImpl.java index 315762b7..9283d40b 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/AccountDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/AccountDaoImpl.java @@ -18,6 +18,9 @@ public class AccountDaoImpl implements AccountDao { @Autowired private SqlSessionTemplate sqlSession; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @@ -25,7 +28,7 @@ public class AccountDaoImpl implements AccountDao { @Override public int addNewAccount(AccountDO accountDO) { accountDO.setStatus(DBStatusEnum.NORMAL.getStatus()); - return sqlSession.insert("AccountDao.insert", accountDO); + return updateAccount(accountDO); } @Override @@ -35,6 +38,9 @@ public class AccountDaoImpl implements AccountDao { @Override public int updateAccount(AccountDO accountDO) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("AccountDao.insertOnPG", accountDO); + } return sqlSession.insert("AccountDao.insert", accountDO); } diff --git a/dao/src/main/resources/mapper/AccountDao.xml b/dao/src/main/resources/mapper/AccountDao.xml index 5d5e79a8..feed2727 100644 --- a/dao/src/main/resources/mapper/AccountDao.xml +++ b/dao/src/main/resources/mapper/AccountDao.xml @@ -22,6 +22,18 @@ ]]> + + + + DELETE FROM account WHERE username = #{username} diff --git a/dao/src/main/resources/mapper/BrokerDao.xml b/dao/src/main/resources/mapper/BrokerDao.xml index 7ef90b01..50af2ef1 100644 --- a/dao/src/main/resources/mapper/BrokerDao.xml +++ b/dao/src/main/resources/mapper/BrokerDao.xml @@ -22,12 +22,12 @@ insert into broker - (cluster_id, broker_id, host, port, timestamp, status) - values + (cluster_id, broker_id, host, port, timestamp, status) + values (#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status}) on conflict (cluster_id, broker_id) do update set host = excluded.host, - port = excluded.port, - timestamp = excluded.timestamp, - status = excluded.status + port = excluded.port, + timestamp = excluded.timestamp, + status = excluded.status From a460e169ab21641a6325888dc4f4e49e3395a65e Mon Sep 17 00:00:00 2001 From: Yang Jing Date: Sun, 5 Jul 2020 16:07:33 +0800 Subject: [PATCH 08/13] =?UTF-8?q?=E4=BF=AE=E6=94=B9=20Spring=20=E9=BB=98?= =?UTF-8?q?=E8=AE=A4=E9=85=8D=E7=BD=AE=E4=B8=BA=E4=BD=BF=E7=94=A8=20MySQL?= =?UTF-8?q?=20=E6=95=B0=E6=8D=AE=E5=BA=93=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- web/src/main/resources/application-mysql.yml | 29 ------------------- web/src/main/resources/application.yml | 30 ++++++++++++++++++-- 3 files changed, 29 insertions(+), 32 deletions(-) delete mode 100644 web/src/main/resources/application-mysql.yml diff --git a/README.md b/README.md index 48e45106..02177bdc 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ create user admin encrypted password 'admin'; create database kafka_manager owner=admin template=template0 encoding='UTF-8' lc_collate='zh_CN.UTF-8' lc_ctype='zh_CN.UTF-8'; ``` -***根据所选择的数据库,使用 `-Dspring.profiles.active=pg` 指定配置文件,默认使用 `mysql`。*** +***默认配置使用 MySQL 数据库,若要使用 PostgreSQL 数据库,使用 `-Dspring.profiles.active=pg` 指定 `application-pg.yml` 配置文件。*** --- diff --git a/web/src/main/resources/application-mysql.yml b/web/src/main/resources/application-mysql.yml deleted file mode 100644 index ea9dcc7c..00000000 --- a/web/src/main/resources/application-mysql.yml +++ /dev/null @@ -1,29 +0,0 @@ -server: - port: 8080 - tomcat: - accept-count: 100 - max-connections: 1000 - max-threads: 20 - min-spare-threads: 20 - -spring: - application: - name: kafkamanager - datasource: - kafka-manager: - driver-class-name: org.mariadb.jdbc.Driver - jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8 - username: admin - password: admin - main: - allow-bean-definition-overriding: true - -logging: - config: classpath:logback-spring.xml - -# kafka监控 -kafka-monitor: - enabled: true - notify-kafka: - cluster-id: 95 - topic-name: kmo_monitor diff --git a/web/src/main/resources/application.yml b/web/src/main/resources/application.yml index dc1be31b..ea9dcc7c 100644 --- a/web/src/main/resources/application.yml +++ b/web/src/main/resources/application.yml @@ -1,3 +1,29 @@ +server: + port: 8080 + tomcat: + accept-count: 100 + max-connections: 1000 + max-threads: 20 + min-spare-threads: 20 + spring: - profiles: - active: mysql \ No newline at end of file + application: + name: kafkamanager + datasource: + kafka-manager: + driver-class-name: org.mariadb.jdbc.Driver + jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8 + username: admin + password: admin + main: + allow-bean-definition-overriding: true + +logging: + config: classpath:logback-spring.xml + +# kafka监控 +kafka-monitor: + enabled: true + notify-kafka: + cluster-id: 95 + topic-name: kmo_monitor From 4f4e7e80fc66ee6d2fba5b69048f8a587394d5e7 Mon Sep 17 00:00:00 2001 From: xuzhengxi Date: Mon, 6 Jul 2020 13:19:58 +0800 Subject: [PATCH 09/13] =?UTF-8?q?=E5=A2=9E=E5=8A=A0docker=E5=8F=8Adocker-c?= =?UTF-8?q?ompose=E9=83=A8=E7=BD=B2=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 7 + docker-compose.yml | 32 +++ docker/kafka-manager/Dockerfile | 7 + .../kafka-manager/application-standalone.yml | 32 +++ docker/kafka-manager/application.yml | 32 +++ docker/mysql/Dockerfile | 3 + docker/mysql/create_mysql_table.sql | 241 ++++++++++++++++++ 7 files changed, 354 insertions(+) create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 docker/kafka-manager/Dockerfile create mode 100644 docker/kafka-manager/application-standalone.yml create mode 100644 docker/kafka-manager/application.yml create mode 100644 docker/mysql/Dockerfile create mode 100644 docker/mysql/create_mysql_table.sql diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..71f0412f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM fabric8/java-alpine-openjdk8-jdk +MAINTAINER xuzhengxi +ENV LANG=C.UTF-8 LC_ALL=C.UTF-8 +ADD ./web/target/kafka-manager-web-1.0.0-SNAPSHOT.jar kafka-manager-web.jar +ADD ./docker/kafka-manager/application-standalone.yml application.yml +ENTRYPOINT ["java","-jar","/kafka-manager-web.jar","--spring.config.location=./application.yml"] +EXPOSE 8080 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..10c69adf --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,32 @@ +version: '2' +services: + mysqldbserver: + container_name: mysqldbserver + image: mysql:5.7 + build: + context: . + dockerfile: docker/mysql/Dockerfile + ports: + - "3306:3306" + command: [ + 'mysqld', + '--character-set-server=utf8', + '--collation-server=utf8_general_ci', + '--default-time-zone=+8:00' + ] + environment: + MYSQL_ROOT_PASSWORD: 12345678 + MYSQL_DATABASE: kafka_manager + MYSQL_USER: admin + MYSQL_PASSWORD: 12345678 + restart: always + kafka-manager-web: + container_name: kafka-manager-web + build: + context: . + dockerfile: Dockerfile + ports: + - "8080:8080" + links: + - mysqldbserver + restart: always \ No newline at end of file diff --git a/docker/kafka-manager/Dockerfile b/docker/kafka-manager/Dockerfile new file mode 100644 index 00000000..87647313 --- /dev/null +++ b/docker/kafka-manager/Dockerfile @@ -0,0 +1,7 @@ +FROM java:8 +MAINTAINER xuzhengxi +ENV LANG=C.UTF-8 LC_ALL=C.UTF-8 +ADD ../../web/target/kafka-manager-web-1.0.0-SNAPSHOT.jar kafka-manager-web.jar +ADD ./application.yml application.yml +ENTRYPOINT ["java","-jar","/kafka-manager-web.jar","--spring.config.location=./application.yml"] +EXPOSE 8080 \ No newline at end of file diff --git a/docker/kafka-manager/application-standalone.yml b/docker/kafka-manager/application-standalone.yml new file mode 100644 index 00000000..581170bc --- /dev/null +++ b/docker/kafka-manager/application-standalone.yml @@ -0,0 +1,32 @@ +server: + port: 8080 + tomcat: + accept-count: 100 + max-connections: 1000 + max-threads: 20 + min-spare-threads: 20 + +spring: + application: + name: kafkamanager + datasource: + kafka-manager: + jdbc-url: jdbc:mysql://mysqldbserver:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8 + username: admin + password: 12345678 + driver-class-name: org.mariadb.jdbc.Driver + main: + allow-bean-definition-overriding: true + + profiles: + active: dev + +logging: + config: classpath:logback-spring.xml + +# kafka监控 +kafka-monitor: + enabled: true + notify-kafka: + cluster-id: 95 + topic-name: kmo_monitor diff --git a/docker/kafka-manager/application.yml b/docker/kafka-manager/application.yml new file mode 100644 index 00000000..581170bc --- /dev/null +++ b/docker/kafka-manager/application.yml @@ -0,0 +1,32 @@ +server: + port: 8080 + tomcat: + accept-count: 100 + max-connections: 1000 + max-threads: 20 + min-spare-threads: 20 + +spring: + application: + name: kafkamanager + datasource: + kafka-manager: + jdbc-url: jdbc:mysql://mysqldbserver:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8 + username: admin + password: 12345678 + driver-class-name: org.mariadb.jdbc.Driver + main: + allow-bean-definition-overriding: true + + profiles: + active: dev + +logging: + config: classpath:logback-spring.xml + +# kafka监控 +kafka-monitor: + enabled: true + notify-kafka: + cluster-id: 95 + topic-name: kmo_monitor diff --git a/docker/mysql/Dockerfile b/docker/mysql/Dockerfile new file mode 100644 index 00000000..b2e4afed --- /dev/null +++ b/docker/mysql/Dockerfile @@ -0,0 +1,3 @@ +FROM mysql:5.7 +MAINTAINER xuzhengxi +COPY ./docker/mysql/create_mysql_table.sql /docker-entrypoint-initdb.d/ \ No newline at end of file diff --git a/docker/mysql/create_mysql_table.sql b/docker/mysql/create_mysql_table.sql new file mode 100644 index 00000000..8f0a3861 --- /dev/null +++ b/docker/mysql/create_mysql_table.sql @@ -0,0 +1,241 @@ +CREATE DATABASE IF NOT EXISTS `kafka_manager`; + +USE `kafka_manager`; + +CREATE TABLE `account` ( + `id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', + `username` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '用户名', + `password` varchar(128) NOT NULL DEFAULT '' COMMENT '密码', + `role` int(16) NOT NULL DEFAULT '0' COMMENT '角色类型, 0:普通用户', + `status` int(16) NOT NULL DEFAULT '0' COMMENT '0标识使用中,-1标识已废弃', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_username` (`username`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='账号表'; +INSERT INTO account(username, password, role) VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2); + + +CREATE TABLE `alarm_rule` ( + `id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID', + `alarm_name` varchar(128) NOT NULL DEFAULT '' COMMENT '告警名字', + `strategy_expressions` text COMMENT '表达式', + `strategy_filters` text COMMENT '过滤条件', + `strategy_actions` text COMMENT '响应', + `principals` varchar(512) NOT NULL DEFAULT '' COMMENT '负责人', + `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '-1:逻辑删除, 0:关闭, 1:正常', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_alarm_name` (`alarm_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='告警规则表'; + + +CREATE TABLE `broker` ( + `id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerID', + `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'Broker主机名', + `port` int(32) NOT NULL DEFAULT '-1' COMMENT 'Broker端口', + `timestamp` bigint(128) NOT NULL DEFAULT '-1' COMMENT '启动时间', + `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态0有效,-1无效', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_cluster_id_broker_id` (`cluster_id`,`broker_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Broker信息表'; + + +CREATE TABLE `broker_metrics` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerID', + `bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流入', + `bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流出', + `bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒被拒绝字节数', + `messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消息数流入', + `fail_fetch_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消费失败数', + `fail_produce_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒失败生产数', + `fetch_consumer_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消费请求数', + `produce_request` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒生产数', + `request_handler_idl_percent` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '请求处理器繁忙百分比', + `network_processor_idl_percent` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '网络处理器繁忙百分比', + `request_queue_size` bigint(20) NOT NULL DEFAULT '0' COMMENT '请求列表大小', + `response_queue_size` bigint(20) NOT NULL DEFAULT '0' COMMENT '响应列表大小', + `log_flush_time` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '刷日志时间', + `total_time_produce_mean` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'produce请求处理总时间-平均值', + `total_time_produce_99th` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'produce请求处理总时间-99分位', + `total_time_fetch_consumer_mean` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'fetch请求总时间-平均值', + `total_time_fetch_consumer_99th` double(53,2) NOT NULL DEFAULT '0.00' COMMENT 'fetch请求总时间-99分位', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + PRIMARY KEY (`id`), + KEY `idx_cluster_id_broker_id_gmt_create` (`cluster_id`,`broker_id`,`gmt_create`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='BrokerMetric信息表'; + + +CREATE TABLE `cluster` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '集群ID', + `cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称', + `zookeeper` varchar(512) NOT NULL DEFAULT '' COMMENT 'ZK地址', + `bootstrap_servers` varchar(512) NOT NULL DEFAULT '' COMMENT 'Server地址', + `kafka_version` varchar(32) NOT NULL DEFAULT '' COMMENT 'Kafka版本', + `alarm_flag` int(4) NOT NULL DEFAULT '0' COMMENT '0:不开启告警, 1开启告警', + `security_protocol` varchar(512) NOT NULL DEFAULT '' COMMENT '安全协议', + `sasl_mechanism` varchar(512) NOT NULL DEFAULT '' COMMENT '安全机制', + `sasl_jaas_config` varchar(512) NOT NULL DEFAULT '' COMMENT 'Jaas配置', + `status` int(4) NOT NULL DEFAULT '0' COMMENT '删除标记, 0表示未删除, -1表示删除', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_cluster_name` (`cluster_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Cluster表'; + + +CREATE TABLE `cluster_metrics` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id', + `cluster_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '集群ID', + `topic_num` int(11) NOT NULL DEFAULT '0' COMMENT 'Topic数', + `partition_num` int(11) NOT NULL DEFAULT '0' COMMENT '分区数', + `broker_num` int(11) NOT NULL DEFAULT '0' COMMENT 'Broker数', + `bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒流入(B)', + `bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒流出(B)', + `bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒拒绝(B)', + `messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒消息数(条)', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + PRIMARY KEY (`id`), + KEY `idx_cluster_id_gmt_create` (`cluster_id`,`gmt_create`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='ClusterMetrics信息'; + + +CREATE TABLE `controller` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `broker_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'BrokerId', + `host` varchar(256) NOT NULL DEFAULT '' COMMENT '主机名', + `timestamp` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Controller变更时间', + `version` int(11) NOT NULL DEFAULT '-1' COMMENT 'Controller格式版本', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_cluster_id_broker_id_timestamp` (`cluster_id`,`broker_id`,`timestamp`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Controller历史变更记录表'; + +CREATE TABLE `migration_task` ( + `id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id', + `cluster_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '集群ID', + `topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称', + `reassignment_json` text COMMENT '任务参数', + `real_throttle` bigint(20) NOT NULL DEFAULT '0' COMMENT '实际限流值(B/s)', + `operator` varchar(128) NOT NULL DEFAULT '' COMMENT '操作人', + `description` varchar(256) NOT NULL DEFAULT '' COMMENT '备注说明', + `status` int(11) NOT NULL DEFAULT '0' COMMENT '任务状态', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '任务创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '任务修改时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic迁移信息'; + +CREATE TABLE `operation_history` ( + `id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称', + `operator` varchar(128) NOT NULL DEFAULT '' COMMENT '操作人', + `operation` varchar(256) NOT NULL DEFAULT '' COMMENT '操作描述', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='操作记录表'; + + +CREATE TABLE `order_partition` ( + `id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称', + `topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称', + `applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人', + `peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量', + `description` text COMMENT '备注信息', + `order_status` int(16) NOT NULL DEFAULT '0' COMMENT '工单状态', + `approver` varchar(128) NOT NULL DEFAULT '' COMMENT '审批人', + `opinion` varchar(256) NOT NULL DEFAULT '' COMMENT '审批意见', + `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0标识有效,-1无效', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='分区申请工单'; + +CREATE TABLE `order_topic` ( + `id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称', + `topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称', + `retention_time` bigint(20) NOT NULL DEFAULT '-1' COMMENT '保留时间(ms)', + `partition_num` int(16) NOT NULL DEFAULT '-1' COMMENT '分区数', + `replica_num` int(16) NOT NULL DEFAULT '-1' COMMENT '副本数', + `regions` varchar(128) NOT NULL DEFAULT '' COMMENT 'RegionId列表', + `brokers` varchar(128) NOT NULL DEFAULT '' COMMENT 'Broker列表', + `peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流入流量(KB)', + `applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人', + `principals` varchar(256) NOT NULL DEFAULT '' COMMENT '负责人', + `description` text COMMENT '备注信息', + `order_status` int(16) NOT NULL DEFAULT '0' COMMENT '工单状态', + `approver` varchar(128) NOT NULL DEFAULT '' COMMENT '审批人', + `opinion` varchar(256) NOT NULL DEFAULT '' COMMENT '审批意见', + `status` int(16) NOT NULL DEFAULT '0' COMMENT '状态,0标识有效,-1无效', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic申请工单'; + +CREATE TABLE `region` ( + `id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `region_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Region名称', + `broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表', + `level` int(16) NOT NULL DEFAULT '0' COMMENT 'Region重要等级, 0级普通, 1极重要,2级极重要', + `operator` varchar(45) NOT NULL DEFAULT '' COMMENT '操作人', + `description` text COMMENT '备注说明', + `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0正常,-1废弃,1容量已满', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_cluster_id_region_name` (`cluster_id`,`region_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Region信息表'; + +CREATE TABLE `topic` ( + `id` bigint(128) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `topic_name` varchar(192) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT 'Topic名称', + `applicant` varchar(256) NOT NULL DEFAULT '' COMMENT '申请人', + `principals` varchar(256) NOT NULL DEFAULT '' COMMENT '负责人', + `description` text COMMENT '备注信息', + `status` int(16) NOT NULL DEFAULT '0' COMMENT '0标识使用中,-1标识已废弃', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_cluster_id_topic_name` (`cluster_id`,`topic_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Topic信息表'; + + +CREATE TABLE `topic_favorite` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增Id', + `username` varchar(64) NOT NULL DEFAULT '' COMMENT '用户名', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称', + `status` int(16) NOT NULL DEFAULT '0' COMMENT '删除标记, 0表示未删除, -1表示删除', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_username_cluster_id_topic_name` (`username`,`cluster_id`,`topic_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户收藏的Topic表'; + +CREATE TABLE `topic_metrics` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id', + `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', + `topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称', + `messages_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒进入消息条数', + `bytes_in` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流入', + `bytes_out` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒字节流出', + `bytes_rejected` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒拒绝字节数', + `total_produce_requests` double(53,2) NOT NULL DEFAULT '0.00' COMMENT '每秒请求数', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + PRIMARY KEY (`id`), + KEY `idx_cluster_id_topic_name_gmt_create` (`cluster_id`,`topic_name`,`gmt_create`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='TopicMetrics表'; \ No newline at end of file From 4b679be3102864ce29fb6f0ef08212c1d840fdbe Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 30 Jul 2020 19:30:31 +0800 Subject: [PATCH 10/13] fix execute order partition --- .../kafka/manager/service/service/OrderService.java | 2 +- .../kafka/manager/service/service/impl/JmxServiceImpl.java | 3 +++ .../manager/service/service/impl/OrderServiceImpl.java | 6 +++--- .../kafka/manager/web/api/versionone/OrderController.java | 4 ++-- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/OrderService.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/OrderService.java index cbf379e3..3d60bfc0 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/OrderService.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/OrderService.java @@ -45,7 +45,7 @@ public interface OrderService { * @date 19/6/23 * @return Result */ - Result modifyOrderPartition(OrderPartitionDO orderPartitionDO, String operator); + Result modifyOrderPartition(OrderPartitionDO orderPartitionDO, String operator, boolean admin); /** * 查询Topic工单 diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java index 48118c9f..b781cbbf 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java @@ -72,6 +72,9 @@ public class JmxServiceImpl implements JmxService { List attributeValueList = null; try { attributeValueList = connection.getAttributes(new ObjectName(mbean.getObjectName()), properties).asList(); + } catch (InstanceNotFoundException e) { + logger.warn("getSpecifiedBrokerMetricsFromJmx@JmxServiceImpl, get metrics fail, objectName:{}.", mbean.getObjectName(), e); + continue; } catch (Exception e) { logger.error("getSpecifiedBrokerMetricsFromJmx@JmxServiceImpl, get metrics fail, objectName:{}.", mbean.getObjectName(), e); continue; diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OrderServiceImpl.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OrderServiceImpl.java index d756ebda..f09d9ef0 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OrderServiceImpl.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OrderServiceImpl.java @@ -51,7 +51,7 @@ public class OrderServiceImpl implements OrderService { if (orderPartitionDO != null) { orderPartitionDO.setOrderStatus(OrderStatusEnum.CANCELLED.getCode()); } - return modifyOrderPartition(orderPartitionDO, operator); + return modifyOrderPartition(orderPartitionDO, operator, false); } return new Result(StatusCode.PARAM_ERROR, "order type illegal"); } @@ -74,10 +74,10 @@ public class OrderServiceImpl implements OrderService { } @Override - public Result modifyOrderPartition(OrderPartitionDO newOrderPartitionDO, String operator) { + public Result modifyOrderPartition(OrderPartitionDO newOrderPartitionDO, String operator, boolean admin) { if (newOrderPartitionDO == null) { return new Result(StatusCode.PARAM_ERROR, "param illegal, order not exist"); - } else if (!newOrderPartitionDO.getApplicant().equals(operator)) { + } else if (!admin && !newOrderPartitionDO.getApplicant().equals(operator)) { return new Result(StatusCode.PARAM_ERROR, "without authority to cancel the order"); } OrderPartitionDO oldOrderPartitionDO = orderPartitionDao.getById(newOrderPartitionDO.getId()); diff --git a/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java b/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java index be6e66ea..07bfb503 100644 --- a/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java +++ b/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java @@ -325,9 +325,9 @@ public class OrderController { orderPartitionDO.setApprover(username); orderPartitionDO.setOpinion(reqObj.getApprovalOpinions()); orderPartitionDO.setOrderStatus(reqObj.getOrderStatus()); - result = orderService.modifyOrderPartition(orderPartitionDO, username); + result = orderService.modifyOrderPartition(orderPartitionDO, username, true); if (!StatusCode.SUCCESS.equals(result.getCode())) { - return new Result(StatusCode.OPERATION_ERROR, "create topic success, but update order status failed, err:" + result.getMessage()); + return new Result(StatusCode.OPERATION_ERROR, "expand topic success, but update order status failed, err:" + result.getMessage()); } return new Result(); } From ba6abea6d8f502875ca0698d85d781d533892a50 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 30 Jul 2020 20:56:45 +0800 Subject: [PATCH 11/13] =?UTF-8?q?=E6=89=A9=E5=88=86=E5=8C=BA=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/entity/po/OrderPartitionDO.java | 22 +++++++++++++++++++ .../resources/mapper/OrderPartitionDao.xml | 12 ++++++++++ doc/create_mysql_table.sql | 2 ++ doc/create_postgresql_table.sql | 2 ++ docker/mysql/create_mysql_table.sql | 2 ++ .../web/api/versionone/OrderController.java | 6 ++++- .../web/converters/OrderConverter.java | 9 ++++++-- 7 files changed, 52 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/po/OrderPartitionDO.java b/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/po/OrderPartitionDO.java index 5fab1681..01666b18 100644 --- a/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/po/OrderPartitionDO.java +++ b/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/po/OrderPartitionDO.java @@ -9,6 +9,10 @@ public class OrderPartitionDO extends BaseDO{ private String applicant; + private Integer partitionNum; + + private String brokerList; + private Long peakBytesIn; private String description; @@ -51,6 +55,22 @@ public class OrderPartitionDO extends BaseDO{ this.applicant = applicant; } + public Integer getPartitionNum() { + return partitionNum; + } + + public void setPartitionNum(Integer partitionNum) { + this.partitionNum = partitionNum; + } + + public String getBrokerList() { + return brokerList; + } + + public void setBrokerList(String brokerList) { + this.brokerList = brokerList; + } + public Long getPeakBytesIn() { return peakBytesIn; } @@ -98,6 +118,8 @@ public class OrderPartitionDO extends BaseDO{ ", clusterName='" + clusterName + '\'' + ", topicName='" + topicName + '\'' + ", applicant='" + applicant + '\'' + + ", partitionNum=" + partitionNum + + ", brokerList='" + brokerList + '\'' + ", peakBytesIn=" + peakBytesIn + ", description='" + description + '\'' + ", orderStatus=" + orderStatus + diff --git a/dao/src/main/resources/mapper/OrderPartitionDao.xml b/dao/src/main/resources/mapper/OrderPartitionDao.xml index 3eb11c03..8214865b 100644 --- a/dao/src/main/resources/mapper/OrderPartitionDao.xml +++ b/dao/src/main/resources/mapper/OrderPartitionDao.xml @@ -11,6 +11,8 @@ + + @@ -38,6 +40,16 @@ cluster_name=#{clusterName}, topic_name=#{topicName}, applicant=#{applicant}, + + + partition_num=#{partitionNum}, + + + + + broker_list=#{brokerList}, + + peak_bytes_in=#{peakBytesIn}, description=#{description}, order_status=#{orderStatus}, diff --git a/doc/create_mysql_table.sql b/doc/create_mysql_table.sql index 8f0a3861..9a66ac67 100644 --- a/doc/create_mysql_table.sql +++ b/doc/create_mysql_table.sql @@ -149,6 +149,8 @@ CREATE TABLE `order_partition` ( `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', `cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称', `topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称', + `broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表, 逗号分割', + `partition_num` int(11) NOT NULL DEFAULT 0 COMMENT '新增分区数', `applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人', `peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量', `description` text COMMENT '备注信息', diff --git a/doc/create_postgresql_table.sql b/doc/create_postgresql_table.sql index cdf04388..6d03c005 100644 --- a/doc/create_postgresql_table.sql +++ b/doc/create_postgresql_table.sql @@ -198,6 +198,8 @@ CREATE TABLE order_partition cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称', topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', applicant varchar(128) NOT NULL DEFAULT '', -- '申请人', + partition_num int NOT NULL DEFAULT '0', -- '分区数', + broker_list varchar(128) NOT NULL DEFAULT '', -- 'Broker列表', peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流量', description text, -- '备注信息', order_status int NOT NULL DEFAULT '0', -- '工单状态', diff --git a/docker/mysql/create_mysql_table.sql b/docker/mysql/create_mysql_table.sql index 8f0a3861..9a66ac67 100644 --- a/docker/mysql/create_mysql_table.sql +++ b/docker/mysql/create_mysql_table.sql @@ -149,6 +149,8 @@ CREATE TABLE `order_partition` ( `cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '集群ID', `cluster_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称', `topic_name` varchar(192) NOT NULL DEFAULT '' COMMENT 'Topic名称', + `broker_list` varchar(256) NOT NULL DEFAULT '' COMMENT 'Broker列表, 逗号分割', + `partition_num` int(11) NOT NULL DEFAULT 0 COMMENT '新增分区数', `applicant` varchar(128) NOT NULL DEFAULT '' COMMENT '申请人', `peak_bytes_in` bigint(20) NOT NULL DEFAULT '0' COMMENT '峰值流量', `description` text COMMENT '备注信息', diff --git a/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java b/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java index 07bfb503..1b6723eb 100644 --- a/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java +++ b/web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/OrderController.java @@ -332,7 +332,9 @@ public class OrderController { return new Result(); } - private Result expandTopic(ClusterDO clusterDO, OrderPartitionExecModel reqObj, OrderPartitionDO orderPartitionDO) { + private Result expandTopic(ClusterDO clusterDO, + OrderPartitionExecModel reqObj, + OrderPartitionDO orderPartitionDO) { List brokerIdList = regionService.getFullBrokerId(clusterDO.getId(), reqObj.getRegionIdList(), reqObj.getBrokerIdList()); try { TopicMetadata topicMetadata = new TopicMetadata(); @@ -343,6 +345,8 @@ public class OrderController { if (!AdminTopicStatusEnum.SUCCESS.equals(adminTopicStatusEnum)) { return new Result(StatusCode.OPERATION_ERROR, adminTopicStatusEnum.getMessage()); } + orderPartitionDO.setPartitionNum(reqObj.getPartitionNum()); + orderPartitionDO.setBrokerList(ListUtils.intList2String(brokerIdList)); } catch (Exception e) { logger.error("expandTopic@OrderController, create failed, req:{}.", reqObj); return new Result(StatusCode.OPERATION_ERROR, Constant.KAFKA_MANAGER_INNER_ERROR); diff --git a/web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java b/web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java index 55041cf2..df07cbd2 100644 --- a/web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java +++ b/web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java @@ -86,7 +86,8 @@ public class OrderConverter { public static OrderPartitionVO convert2OrderPartitionVO(OrderPartitionDO orderPartitionDO, TopicMetadata topicMetadata, - Long maxAvgBytes, List regionDOList) { + Long maxAvgBytes, + List regionDOList) { if (orderPartitionDO == null) { return null; } @@ -100,8 +101,12 @@ public class OrderConverter { if (topicMetadata == null) { return orderPartitionVO; } - orderPartitionVO.setPartitionNum(topicMetadata.getPartitionNum()); + + orderPartitionVO.setPartitionNum(null); orderPartitionVO.setBrokerIdList(new ArrayList<>(topicMetadata.getBrokerIdSet())); + if (OrderStatusEnum.PASSED.getCode().equals(orderPartitionDO.getOrderStatus())) { + orderPartitionVO.setPartitionNum(orderPartitionDO.getPartitionNum()); + } if (regionDOList == null || regionDOList.isEmpty()) { orderPartitionVO.setRegionNameList(new ArrayList<>()); From dd2e29dd40af2138f31e6bef759d0cab6cc53317 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 16 Sep 2020 21:04:47 +0800 Subject: [PATCH 12/13] bugfix, fix collect consumer metrics task --- .../entity/dto/consumer/ConsumerDTO.java | 61 +++++-- .../collector/CollectConsumerMetricsTask.java | 42 +++-- .../service/service/ConsumerService.java | 3 +- .../service/impl/ConsumerServiceImpl.java | 172 +++++++++--------- .../service/impl/TopicServiceImpl.java | 6 +- 5 files changed, 150 insertions(+), 134 deletions(-) diff --git a/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/consumer/ConsumerDTO.java b/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/consumer/ConsumerDTO.java index 5f686efa..c96c6496 100644 --- a/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/consumer/ConsumerDTO.java +++ b/common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/consumer/ConsumerDTO.java @@ -1,8 +1,5 @@ package com.xiaojukeji.kafka.manager.common.entity.dto.consumer; -import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState; - -import java.util.List; import java.util.Map; /** @@ -11,20 +8,33 @@ import java.util.Map; * @date 2015/11/12 */ public class ConsumerDTO { - /** - * 消费group名 - */ + private Long clusterId; + + private String topicName; + private String consumerGroup; - /** - * 消费类型,一般为static - */ private String location; - /** - * 订阅的每个topic的partition状态列表 - */ - private Map> topicPartitionMap; + private Map partitionOffsetMap; + + private Map consumerOffsetMap; + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } public String getConsumerGroup() { return consumerGroup; @@ -42,20 +52,31 @@ public class ConsumerDTO { this.location = location; } - public Map> getTopicPartitionMap() { - return topicPartitionMap; + public Map getPartitionOffsetMap() { + return partitionOffsetMap; } - public void setTopicPartitionMap(Map> topicPartitionMap) { - this.topicPartitionMap = topicPartitionMap; + public void setPartitionOffsetMap(Map partitionOffsetMap) { + this.partitionOffsetMap = partitionOffsetMap; + } + + public Map getConsumerOffsetMap() { + return consumerOffsetMap; + } + + public void setConsumerOffsetMap(Map consumerOffsetMap) { + this.consumerOffsetMap = consumerOffsetMap; } @Override public String toString() { - return "Consumer{" + - "consumerGroup='" + consumerGroup + '\'' + + return "ConsumerDTO{" + + "clusterId=" + clusterId + + ", topicName='" + topicName + '\'' + + ", consumerGroup='" + consumerGroup + '\'' + ", location='" + location + '\'' + - ", topicPartitionMap=" + topicPartitionMap + + ", partitionOffsetMap=" + partitionOffsetMap + + ", consumerOffsetMap=" + consumerOffsetMap + '}'; } } diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/collector/CollectConsumerMetricsTask.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/collector/CollectConsumerMetricsTask.java index d589b4d0..78770927 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/collector/CollectConsumerMetricsTask.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/collector/CollectConsumerMetricsTask.java @@ -2,12 +2,12 @@ package com.xiaojukeji.kafka.manager.service.collector; import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetrics; -import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState; import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerDTO; import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO; import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache; import com.xiaojukeji.kafka.manager.service.service.ConsumerService; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +34,8 @@ public class CollectConsumerMetricsTask extends BaseCollectTask { if (clusterDO == null) { return; } - Map> topicNamePartitionStateListMap = new HashMap<>(); - List consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, topicNamePartitionStateListMap); + Map allPartitionOffsetMap = new HashMap<>(); + List consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, allPartitionOffsetMap); List consumerMetricsList = convert2ConsumerMetrics(consumerDTOList); KafkaMetricsCache.putConsumerMetricsToCache(clusterId, consumerMetricsList); @@ -47,23 +47,27 @@ public class CollectConsumerMetricsTask extends BaseCollectTask { private List convert2ConsumerMetrics(List consumerDTOList) { List consumerMetricsList = new ArrayList<>(); for (ConsumerDTO consumerDTO : consumerDTOList) { - Map> topicNamePartitionStateListMap = consumerDTO.getTopicPartitionMap(); - for(Map.Entry> entry : topicNamePartitionStateListMap.entrySet()){ - String topicName = entry.getKey(); - List partitionStateList = entry.getValue(); - ConsumerMetrics consumerMetrics = new ConsumerMetrics(); - consumerMetrics.setClusterId(clusterId); - consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup()); - consumerMetrics.setLocation(consumerDTO.getLocation()); - consumerMetrics.setTopicName(topicName); - long sumLag = 0; - for (PartitionState partitionState : partitionStateList) { - Map.Entry offsetEntry = new AbstractMap.SimpleEntry<>(partitionState.getOffset(), partitionState.getConsumeOffset()); - sumLag += (offsetEntry.getKey() - offsetEntry.getValue() > 0 ? offsetEntry.getKey() - offsetEntry.getValue(): 0); - } - consumerMetrics.setSumLag(sumLag); - consumerMetricsList.add(consumerMetrics); + if (consumerDTO.getPartitionOffsetMap() == null || consumerDTO.getConsumerOffsetMap() == null) { + continue; } + + ConsumerMetrics consumerMetrics = new ConsumerMetrics(); + consumerMetrics.setClusterId(consumerDTO.getClusterId()); + consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup()); + consumerMetrics.setLocation(consumerDTO.getLocation()); + consumerMetrics.setTopicName(consumerDTO.getTopicName()); + + long sumLag = 0; + for(Map.Entry entry : consumerDTO.getPartitionOffsetMap().entrySet()){ + Long partitionOffset = entry.getValue(); + Long consumerOffset = consumerDTO.getConsumerOffsetMap().get(entry.getKey()); + if (partitionOffset == null || consumerOffset == null) { + continue; + } + sumLag += Math.max(partitionOffset - consumerOffset, 0); + } + consumerMetrics.setSumLag(sumLag); + consumerMetricsList.add(consumerMetrics); } return consumerMetricsList; } diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java index 94f4e3b3..929aa11a 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java @@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerGroupDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.PartitionOffsetDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO; import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO; +import org.apache.kafka.common.TopicPartition; import java.util.List; import java.util.Map; @@ -57,7 +58,7 @@ public interface ConsumerService { * @return */ List getMonitoredConsumerList(ClusterDO clusterDO, - Map> topicNamePartitionStateListMap); + Map partitionOffsetMap); /** * 重置offset diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java index fb7d635e..1ae6a26f 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java @@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.constant.OffsetStoreLocation; import com.xiaojukeji.kafka.manager.common.constant.StatusCode; -import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO; import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO; @@ -18,7 +17,6 @@ import com.xiaojukeji.kafka.manager.service.cache.ConsumerMetadataCache; import com.xiaojukeji.kafka.manager.service.cache.KafkaClientCache; import com.xiaojukeji.kafka.manager.service.service.ConsumerService; import com.xiaojukeji.kafka.manager.service.service.TopicService; -import com.xiaojukeji.kafka.manager.service.service.ZookeeperService; import com.xiaojukeji.kafka.manager.common.utils.zk.ZkPathUtil; import kafka.admin.AdminClient; import org.apache.commons.lang.StringUtils; @@ -49,9 +47,6 @@ public class ConsumerServiceImpl implements ConsumerService { @Autowired private TopicService topicService; - @Autowired - private ZookeeperService zkService; - private final ExecutorService consumerListThreadPool = Executors.newFixedThreadPool(50, new DefaultThreadFactory("ConsumerPool")); @Override @@ -135,20 +130,20 @@ public class ConsumerServiceImpl implements ConsumerService { @Override public List getMonitoredConsumerList(final ClusterDO clusterDO, - final Map> partitionStateListMap) { + final Map allPartitionOffsetMap) { List consumerGroupDTOList = getConsumerGroupList(clusterDO.getId()); if (consumerGroupDTOList == null || consumerGroupDTOList.isEmpty()) { return new ArrayList<>(); } - FutureTask[] taskList = new FutureTask[consumerGroupDTOList.size()]; + FutureTask>[] taskList = new FutureTask[consumerGroupDTOList.size()]; for (int i = 0; i < consumerGroupDTOList.size(); i++) { final ConsumerGroupDTO consumerGroupDTO = consumerGroupDTOList.get(i); - taskList[i] = new FutureTask<>(new Callable() { + taskList[i] = new FutureTask<>(new Callable>() { @Override - public ConsumerDTO call() throws Exception { + public List call() throws Exception { try { - return getMonitoredConsumer(clusterDO, consumerGroupDTO, partitionStateListMap); + return getMonitoredConsumer(clusterDO, consumerGroupDTO, allPartitionOffsetMap); } catch (Exception e) { logger.error("get monitored consumer error, group:{}", consumerGroupDTO.getConsumerGroup(), e); } @@ -159,31 +154,70 @@ public class ConsumerServiceImpl implements ConsumerService { } List consumerList = new ArrayList<>(); - for (FutureTask task : taskList) { - ConsumerDTO consumer = null; + for (FutureTask> task : taskList) { + List dtoList = null; try { - consumer = task.get(); + dtoList = task.get(); } catch (Exception e) { logger.error("getMonitoredConsumerList@ConsumeServiceImpl, ", e); } - if (consumer == null) { + if (dtoList == null) { continue; } - consumerList.add(consumer); + consumerList.addAll(dtoList); } return consumerList; } - private ConsumerDTO getMonitoredConsumer(ClusterDO cluster, ConsumerGroupDTO consumerGroupDTO, Map> globalTopicNamePartitionStateListMap) { - // 获取当前consumerGroup下的所有的topic的partitionState信息 - Map> topicNamePartitionStateListMap = getConsumerGroupPartitionStateList(cluster, consumerGroupDTO, globalTopicNamePartitionStateListMap); + private List getMonitoredConsumer(ClusterDO clusterDO, + ConsumerGroupDTO consumerGroupDTO, + Map allPartitionOffsetMap) { + List dtoList = new ArrayList<>(); - //将没有对应consumer的partition信息统一放到一个consumer中 - ConsumerDTO consumerDTO = new ConsumerDTO(); - consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup()); - consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().name()); - consumerDTO.setTopicPartitionMap(topicNamePartitionStateListMap); - return consumerDTO; + List topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList( + clusterDO.getId(), + consumerGroupDTO.getOffsetStoreLocation().getLocation(), + consumerGroupDTO.getConsumerGroup() + ); + for (String topicName : topicNameList) { + TopicMetadata metadata = ClusterMetadataManager.getTopicMetaData(clusterDO.getId(), topicName); + if (metadata == null || metadata.getPartitionNum() <= 0) { + continue; + } + if (!allPartitionOffsetMap.containsKey(new TopicPartition(topicName, 0))) { + Map offsetMap = topicService.getTopicPartitionOffset(clusterDO, topicName); + if (offsetMap == null) { + offsetMap = new HashMap<>(); + } + allPartitionOffsetMap.putAll(offsetMap); + } + + Map consumerOffsetMap = null; + if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) { + consumerOffsetMap = getTopicConsumerOffsetInZK(clusterDO, metadata, consumerGroupDTO); + } else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) { + consumerOffsetMap = getTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO); + } + + Map partitionOffsetMap = new HashMap<>(); + for (int partitionId = 0; partitionId < metadata.getPartitionNum(); ++partitionId) { + Long offset = allPartitionOffsetMap.get(new TopicPartition(topicName, partitionId)); + if (offset == null) { + continue; + } + partitionOffsetMap.put(partitionId, offset); + } + + ConsumerDTO consumerDTO = new ConsumerDTO(); + consumerDTO.setClusterId(clusterDO.getId()); + consumerDTO.setTopicName(topicName); + consumerDTO.setConsumerGroup(consumerGroupDTO.getConsumerGroup()); + consumerDTO.setLocation(consumerGroupDTO.getOffsetStoreLocation().getLocation()); + consumerDTO.setPartitionOffsetMap(partitionOffsetMap); + consumerDTO.setConsumerOffsetMap(consumerOffsetMap); + dtoList.add(consumerDTO); + } + return dtoList; } @Override @@ -264,52 +298,15 @@ public class ConsumerServiceImpl implements ConsumerService { kafkaConsumer.commitSync(); } - /** - * 获取属于该集群和consumerGroup下的所有topic的信息 - */ - private Map> getConsumerGroupPartitionStateList(ClusterDO clusterDO, - ConsumerGroupDTO consumerGroupDTO, - Map> globalTopicNamePartitionStateListMap) { - Map> topicNamePartitionStateListMap = new HashMap<>(2); + private Map getTopicConsumerOffsetInZK(ClusterDO clusterDO, + TopicMetadata topicMetadata, + ConsumerGroupDTO consumerGroupDTO) { + Map offsetMap = new HashMap<>(); - List topicNameList = ConsumerMetadataCache.getConsumerGroupConsumedTopicList(clusterDO.getId(),consumerGroupDTO.getOffsetStoreLocation().getLocation(), consumerGroupDTO.getConsumerGroup()); - for (String topicName : topicNameList) { - if (!ClusterMetadataManager.isTopicExist(clusterDO.getId(), topicName)) { - continue; - } - - List partitionStateList = globalTopicNamePartitionStateListMap.get(topicName); - if (partitionStateList == null) { - try { - partitionStateList = zkService.getTopicPartitionState(clusterDO.getId(), topicName); - } catch (Exception e) { - logger.error("get topic partition state failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e); - } - if (partitionStateList == null) { - continue; - } - globalTopicNamePartitionStateListMap.put(topicName, partitionStateList); - } - List consumerGroupPartitionStateList = new ArrayList<>(); - for (PartitionState partitionState: partitionStateList) { - consumerGroupPartitionStateList.add((PartitionState) partitionState.clone()); - } - - if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.ZOOKEEPER)) { - updateTopicConsumerOffsetInZK(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList); - } else if (consumerGroupDTO.getOffsetStoreLocation().equals(OffsetStoreLocation.BROKER)) { - updateTopicConsumerOffsetInBroker(clusterDO, topicName, consumerGroupDTO, consumerGroupPartitionStateList); - } - topicNamePartitionStateListMap.put(topicName, consumerGroupPartitionStateList); - } - return topicNamePartitionStateListMap; - } - - private void updateTopicConsumerOffsetInZK(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List partitionStateList) { - ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(cluster.getId()); - for (PartitionState partitionState : partitionStateList) { + ZkConfigImpl zkConfig = ClusterMetadataManager.getZKConfig(clusterDO.getId()); + for (int partitionId = 0; partitionId < topicMetadata.getPartitionNum(); ++partitionId) { //offset存储于zk中 - String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId()); + String consumerGroupOffsetLocation = ZkPathUtil.getConsumerGroupOffsetTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicMetadata.getTopic(), partitionId); String offset = null; try { Stat stat = zkConfig.getNodeStat(consumerGroupOffsetLocation); @@ -317,39 +314,32 @@ public class ConsumerServiceImpl implements ConsumerService { continue; } offset = zkConfig.get(consumerGroupOffsetLocation); + offsetMap.put(partitionId, Long.valueOf(offset)); } catch (ConfigException e) { e.printStackTrace(); } - - String consumerId = null; - try { - consumerId = zkConfig.get(ZkPathUtil.getConsumerGroupOwnersTopicPartitionNode(consumerGroupDTO.getConsumerGroup(), topicName, partitionState.getPartitionId())); - } catch (ConfigException e) { -// logger.error("get consumerId error in updateTopicConsumerOffsetInZK cluster:{} topic:{} consumerGroup:{}", cluster.getClusterName(), topicName, consumerGroupDTO.getConsumerGroup()); - } - partitionState.setConsumerGroup(consumerGroupDTO.getConsumerGroup()); - updatePartitionStateOffset(partitionState, offset, consumerId); } + return offsetMap; } - private void updateTopicConsumerOffsetInBroker(ClusterDO cluster, String topicName, ConsumerGroupDTO consumerGroupDTO, List partitionStateList) { - Map offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(cluster, consumerGroupDTO.getConsumerGroup(), topicName); + private Map getTopicConsumerOffsetInBroker(ClusterDO clusterDO, + String topicName, + ConsumerGroupDTO consumerGroupDTO) { + Map offsetsFromBroker = getOffsetByGroupAndTopicFromBroker(clusterDO, consumerGroupDTO.getConsumerGroup(), topicName); if (offsetsFromBroker == null || offsetsFromBroker.isEmpty()) { - return; + return new HashMap<>(0); } - for (PartitionState partitionState : partitionStateList) { - int partitionId = partitionState.getPartitionId(); - updatePartitionStateOffset(partitionState, offsetsFromBroker.get(partitionId), null); + Map offsetMap = new HashMap<>(offsetsFromBroker.size()); + for (Map.Entry entry: offsetsFromBroker.entrySet()) { + try { + offsetMap.put(entry.getKey(), Long.valueOf(entry.getValue())); + } catch (Exception e) { + logger.error("get topic consumer offset failed, clusterId:{} topicName:{} consumerGroup:{}." + , clusterDO.getId(), topicName, consumerGroupDTO.getConsumerGroup()); + } } - } - - private void updatePartitionStateOffset(PartitionState partitionState, String offset, String consumerId) { - partitionState.setConsumeOffset(0); - if (!StringUtils.isEmpty(offset)) { - partitionState.setConsumeOffset(Long.parseLong(offset)); - } - partitionState.setConsumerGroup(consumerId); + return offsetMap; } private Map getConsumeIdMap(Long clusterId, String topicName, String consumerGroup) { diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 282e8562..0bbc6f55 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -343,9 +343,9 @@ public class TopicServiceImpl implements TopicService { topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec()); topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec()); } else { - topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true); - topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec()); - topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec()); +// topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true); + topicOverviewDTO.setBytesInPerSec(0.0); + topicOverviewDTO.setProduceRequestPerSec(0.0); } return topicOverviewDTO; } From c23870e020350c9d3266a9002683ab89d96c830a Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 17 Sep 2020 19:04:33 +0800 Subject: [PATCH 13/13] bugfix, fix topic overview page --- .../manager/service/service/impl/TopicServiceImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 0bbc6f55..0a5de14b 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -343,9 +343,9 @@ public class TopicServiceImpl implements TopicService { topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec()); topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getTotalProduceRequestsPerSec()); } else { -// topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true); - topicOverviewDTO.setBytesInPerSec(0.0); - topicOverviewDTO.setProduceRequestPerSec(0.0); + topicMetrics = jmxService.getSpecifiedTopicMetricsFromJmx(clusterId, topicName, TopicMetrics.getFieldNameList(MetricsType.TOPIC_FLOW_DETAIL), true); + topicOverviewDTO.setBytesInPerSec(topicMetrics.getBytesInPerSec()); + topicOverviewDTO.setProduceRequestPerSec(topicMetrics.getBytesOutPerSec()); } return topicOverviewDTO; }