diff --git a/README.md b/README.md index 0f3cd6ea..02177bdc 100644 --- a/README.md +++ b/README.md @@ -45,12 +45,14 @@ - `Maven 3.5.0+`(后端打包依赖) - `node v8.12.0+`(前端打包依赖) - `Java 8+`(运行环境需要) -- `MySQL`(数据存储) +- `MySQL` 或 `PostgreSQL`(数据存储) --- ### 环境初始化 +**MySQL** + 执行[create_mysql_table.sql](doc/create_mysql_table.sql)中的SQL命令,从而创建所需的MySQL库及表,默认创建的库名是`kafka_manager`。 ``` @@ -58,6 +60,24 @@ mysql -uXXXX -pXXX -h XXX.XXX.XXX.XXX -PXXXX < ./create_mysql_table.sql ``` +**PostgreSQL** + +执行[create_postgresql_table.sql](doc/create_postgresql_table.sql)中的SQL命令,从而创建所需的PostgreSQL表。 + +``` +############# 示例: +psql -h XXX.XXX.XXX.XXX -U XXXX -d kafka_manager -f ./create_postgresql_table.sql +``` + +*PostgreSQL 用户、数据库创建方式* + +```sql +create user admin encrypted password 'admin'; +create database kafka_manager owner=admin template=template0 encoding='UTF-8' lc_collate='zh_CN.UTF-8' lc_ctype='zh_CN.UTF-8'; +``` + +***默认配置使用 MySQL 数据库,若要使用 PostgreSQL 数据库,使用 `-Dspring.profiles.active=pg` 指定 `application-pg.yml` 配置文件。*** + --- diff --git a/console/package-lock.json b/console/package-lock.json index 1e6cc369..5b64b1b2 100644 --- a/console/package-lock.json +++ b/console/package-lock.json @@ -751,8 +751,7 @@ "version": "1.0.0", "resolved": "http://registry.npm.taobao.org/assert-plus/download/assert-plus-1.0.0.tgz", "integrity": "sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU=", - "dev": true, - "optional": true + "dev": true }, "assign-symbols": { "version": "1.0.0", @@ -1505,7 +1504,6 @@ "resolved": "http://registry.npm.taobao.org/combined-stream/download/combined-stream-1.0.7.tgz", "integrity": "sha1-LR0kMXr7ir6V1tLAsHtXgTU52Cg=", "dev": true, - "optional": true, "requires": { "delayed-stream": "~1.0.0" } @@ -2222,8 +2220,7 @@ "version": "1.0.0", "resolved": "http://registry.npm.taobao.org/delayed-stream/download/delayed-stream-1.0.0.tgz", "integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=", - "dev": true, - "optional": true + "dev": true }, "depd": { "version": "1.1.2", @@ -2941,8 +2938,7 @@ "version": "1.3.0", "resolved": "http://registry.npm.taobao.org/extsprintf/download/extsprintf-1.3.0.tgz", "integrity": "sha1-lpGEQOMEGnpBT4xS48V06zw+HgU=", - "dev": true, - "optional": true + "dev": true }, "fast-deep-equal": { "version": "2.0.1", @@ -3358,8 +3354,7 @@ "ansi-regex": { "version": "2.1.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "aproba": { "version": "1.2.0", @@ -3380,14 +3375,12 @@ "balanced-match": { "version": "1.0.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "brace-expansion": { "version": "1.1.11", "bundled": true, "dev": true, - "optional": true, "requires": { "balanced-match": "^1.0.0", "concat-map": "0.0.1" @@ -3402,20 +3395,17 @@ "code-point-at": { "version": "1.1.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "concat-map": { "version": "0.0.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "console-control-strings": { "version": "1.1.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "core-util-is": { "version": "1.0.2", @@ -3532,8 +3522,7 @@ "inherits": { "version": "2.0.3", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "ini": { "version": "1.3.5", @@ -3545,7 +3534,6 @@ "version": "1.0.0", "bundled": true, "dev": true, - "optional": true, "requires": { "number-is-nan": "^1.0.0" } @@ -3560,7 +3548,6 @@ "version": "3.0.4", "bundled": true, "dev": true, - "optional": true, "requires": { "brace-expansion": "^1.1.7" } @@ -3568,14 +3555,12 @@ "minimist": { "version": "0.0.8", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "minipass": { "version": "2.3.5", "bundled": true, "dev": true, - "optional": true, "requires": { "safe-buffer": "^5.1.2", "yallist": "^3.0.0" @@ -3594,7 +3579,6 @@ "version": "0.5.1", "bundled": true, "dev": true, - "optional": true, "requires": { "minimist": "0.0.8" } @@ -3675,8 +3659,7 @@ "number-is-nan": { "version": "1.0.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "object-assign": { "version": "4.1.1", @@ -3688,7 +3671,6 @@ "version": "1.4.0", "bundled": true, "dev": true, - "optional": true, "requires": { "wrappy": "1" } @@ -3774,8 +3756,7 @@ "safe-buffer": { "version": "5.1.2", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "safer-buffer": { "version": "2.1.2", @@ -3811,7 +3792,6 @@ "version": "1.0.2", "bundled": true, "dev": true, - "optional": true, "requires": { "code-point-at": "^1.0.0", "is-fullwidth-code-point": "^1.0.0", @@ -3831,7 +3811,6 @@ "version": "3.0.1", "bundled": true, "dev": true, - "optional": true, "requires": { "ansi-regex": "^2.0.0" } @@ -3875,14 +3854,12 @@ "wrappy": { "version": "1.0.2", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "yallist": { "version": "3.0.3", "bundled": true, - "dev": true, - "optional": true + "dev": true } } }, @@ -4864,8 +4841,7 @@ "version": "0.1.1", "resolved": "http://registry.npm.taobao.org/jsbn/download/jsbn-0.1.1.tgz", "integrity": "sha1-peZUwuWi3rXyAdls77yoDA7y9RM=", - "dev": true, - "optional": true + "dev": true }, "json-parse-better-errors": { "version": "1.0.2", @@ -8883,8 +8859,7 @@ "version": "0.14.5", "resolved": "http://registry.npm.taobao.org/tweetnacl/download/tweetnacl-0.14.5.tgz", "integrity": "sha1-WuaBd/GS1EViadEIr6k/+HQ/T2Q=", - "dev": true, - "optional": true + "dev": true }, "type-is": { "version": "1.6.16", diff --git a/dao/pom.xml b/dao/pom.xml index 4b789b3b..0e0cf71f 100644 --- a/dao/pom.xml +++ b/dao/pom.xml @@ -43,5 +43,9 @@ mariadb-java-client 2.5.4 + + org.postgresql + postgresql + \ No newline at end of file diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/AccountDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/AccountDaoImpl.java index 315762b7..9283d40b 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/AccountDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/AccountDaoImpl.java @@ -18,6 +18,9 @@ public class AccountDaoImpl implements AccountDao { @Autowired private SqlSessionTemplate sqlSession; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @@ -25,7 +28,7 @@ public class AccountDaoImpl implements AccountDao { @Override public int addNewAccount(AccountDO accountDO) { accountDO.setStatus(DBStatusEnum.NORMAL.getStatus()); - return sqlSession.insert("AccountDao.insert", accountDO); + return updateAccount(accountDO); } @Override @@ -35,6 +38,9 @@ public class AccountDaoImpl implements AccountDao { @Override public int updateAccount(AccountDO accountDO) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("AccountDao.insertOnPG", accountDO); + } return sqlSession.insert("AccountDao.insert", accountDO); } diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerDaoImpl.java index 397ef54c..16cda81f 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerDaoImpl.java @@ -19,12 +19,18 @@ public class BrokerDaoImpl implements BrokerDao { @Autowired private SqlSessionTemplate sqlSession; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @Override public int replace(BrokerDO brokerInfoDO) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("BrokerDao.replaceOnPG", brokerInfoDO); + } return sqlSession.insert("BrokerDao.replace", brokerInfoDO); } diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/KafkaManagerProperties.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/KafkaManagerProperties.java new file mode 100644 index 00000000..d762c4ce --- /dev/null +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/KafkaManagerProperties.java @@ -0,0 +1,22 @@ +package com.xiaojukeji.kafka.manager.dao.impl; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Component +@ConfigurationProperties("spring.datasource.kafka-manager") +public class KafkaManagerProperties { + private String jdbcUrl; + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public boolean hasPG() { + return jdbcUrl.startsWith("jdbc:postgres"); + } +} diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/RegionDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/RegionDaoImpl.java index 1b9b17ad..7f62ed5c 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/RegionDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/RegionDaoImpl.java @@ -17,12 +17,18 @@ public class RegionDaoImpl implements RegionDao { @Autowired private SqlSessionTemplate sqlSession; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @Override public int insert(RegionDO regionDO) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("RegionDao.insertOnPG", regionDO); + } return sqlSession.insert("RegionDao.insert", regionDO); } diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java index f5a65ac3..7c5bac5f 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicDaoImpl.java @@ -19,12 +19,18 @@ public class TopicDaoImpl implements TopicDao { @Autowired private SqlSessionTemplate sqlSession; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @Override public int replace(TopicDO topicDO) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("TopicDao.replaceOnPG", topicDO); + } return sqlSession.insert("TopicDao.replace", topicDO); } diff --git a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicFavoriteDaoImpl.java b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicFavoriteDaoImpl.java index 17170028..f8d0c7fc 100644 --- a/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicFavoriteDaoImpl.java +++ b/dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicFavoriteDaoImpl.java @@ -25,12 +25,18 @@ public class TopicFavoriteDaoImpl implements TopicFavoriteDao { @Autowired private TransactionTemplate transactionTemplate; + @Autowired + private KafkaManagerProperties kafkaManagerProperties; + public void setSqlSession(SqlSessionTemplate sqlSession) { this.sqlSession = sqlSession; } @Override public int batchAdd(List topicFavoriteDOList) { + if (kafkaManagerProperties.hasPG()) { + return sqlSession.insert("TopicFavoriteDao.batchAddOnPG", topicFavoriteDOList); + } return sqlSession.insert("TopicFavoriteDao.batchAdd", topicFavoriteDOList); } diff --git a/dao/src/main/resources/mapper/AccountDao.xml b/dao/src/main/resources/mapper/AccountDao.xml index 5d5e79a8..feed2727 100644 --- a/dao/src/main/resources/mapper/AccountDao.xml +++ b/dao/src/main/resources/mapper/AccountDao.xml @@ -22,6 +22,18 @@ ]]> + + + + DELETE FROM account WHERE username = #{username} diff --git a/dao/src/main/resources/mapper/BrokerDao.xml b/dao/src/main/resources/mapper/BrokerDao.xml index 4ecacd25..50af2ef1 100644 --- a/dao/src/main/resources/mapper/BrokerDao.xml +++ b/dao/src/main/resources/mapper/BrokerDao.xml @@ -20,6 +20,16 @@ (#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status}) + + insert into broker + (cluster_id, broker_id, host, port, timestamp, status) + values (#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status}) + on conflict (cluster_id, broker_id) do update set host = excluded.host, + port = excluded.port, + timestamp = excluded.timestamp, + status = excluded.status + + DELETE FROM broker WHERE cluster_id = #{clusterId} AND broker_id = #{brokerId} diff --git a/dao/src/main/resources/mapper/RegionDao.xml b/dao/src/main/resources/mapper/RegionDao.xml index 591decb9..7b1c7daa 100644 --- a/dao/src/main/resources/mapper/RegionDao.xml +++ b/dao/src/main/resources/mapper/RegionDao.xml @@ -21,6 +21,15 @@ (#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator}) + + insert into region + (region_name, cluster_id, broker_list, description, operator) + values (#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator}) + on conflict (region_name, cluster_id) do update set broker_list = excluded.broker_list, + description = excluded.description, + operator = excluded.operator + + DELETE FROM region WHERE id = #{id} diff --git a/dao/src/main/resources/mapper/TopicDao.xml b/dao/src/main/resources/mapper/TopicDao.xml index 27d99e48..b078d4cf 100644 --- a/dao/src/main/resources/mapper/TopicDao.xml +++ b/dao/src/main/resources/mapper/TopicDao.xml @@ -19,6 +19,15 @@ (#{clusterId}, #{topicName}, #{principals}, #{description}, #{status}) + + insert into topic + (cluster_id, topic_name, principals, description, status) + values (#{clusterId}, #{topicName}, #{principals}, #{description}, #{status}) + on conflict (cluster_id, topic_name) do update set principals = excluded.principals, + description = excluded.description, + status = excluded.status + + DELETE FROM topic WHERE id = #{id} diff --git a/dao/src/main/resources/mapper/TopicFavoriteDao.xml b/dao/src/main/resources/mapper/TopicFavoriteDao.xml index 308a75ea..218a714c 100644 --- a/dao/src/main/resources/mapper/TopicFavoriteDao.xml +++ b/dao/src/main/resources/mapper/TopicFavoriteDao.xml @@ -21,6 +21,15 @@ + + insert into topic_favorite (cluster_id, topic_name, username) + values + + (#{TopicFavoriteDO.clusterId}, #{TopicFavoriteDO.topicName}, #{TopicFavoriteDO.username}) + + on conflict (cluster_id, topic_name, username) do update set gmt_modify = now(); + + DELETE FROM topic_favorite WHERE id=#{id} diff --git a/doc/create_postgresql_table.sql b/doc/create_postgresql_table.sql new file mode 100644 index 00000000..cdf04388 --- /dev/null +++ b/doc/create_postgresql_table.sql @@ -0,0 +1,323 @@ +-- CREATE DATABASE kafka_manager; +-- \c kafka_manager; +SET TIME ZONE 'Asia/Chongqing'; +SET CLIENT_ENCODING TO 'UTF-8'; + +CREATE OR REPLACE FUNCTION on_update_timestamp() RETURNS TRIGGER AS +$$ +BEGIN + new.gmt_modify = current_timestamp; + return new; +END; +$$ LANGUAGE plpgsql; + +-- 账号表 +CREATE TABLE account +( + id bigserial NOT NULL, -- 'ID', + username varchar(64) NOT NULL UNIQUE DEFAULT '', -- '用户名', + password varchar(128) NOT NULL DEFAULT '', -- '密码', + role int NOT NULL DEFAULT 0, -- '角色类型, 0:普通用户', + status int NOT NULL DEFAULT 0, -- '0标识使用中,-1标识已废弃', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT account_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX account_uniq_username ON account (username); +INSERT INTO account(username, password, role) +VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2); +CREATE TRIGGER account_trig_gmt_modify + BEFORE UPDATE + ON account + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- 告警规则表 +CREATE TABLE alarm_rule +( + id bigserial, -- '自增ID', + alarm_name varchar(128) NOT NULL DEFAULT '', -- '告警名字', + strategy_expressions text, -- '表达式', + strategy_filters text, -- '过滤条件', + strategy_actions text, -- '响应', + principals varchar(512) NOT NULL DEFAULT '', -- '负责人', + status int2 NOT NULL DEFAULT 1, -- '-1:逻辑删除, 0:关闭, 1:正常', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT alarm_rule_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX alarm_rule_uniq_alarm_name ON alarm_rule (alarm_name); +CREATE TRIGGER alarm_rule_trig_gmt_modify + BEFORE UPDATE + ON alarm_rule + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- Broker信息表 +CREATE TABLE broker +( + id bigserial, -- 'id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID', + host varchar(128) NOT NULL DEFAULT '', -- 'Broker主机名', + port int NOT NULL DEFAULT '-1', -- 'Broker端口', + timestamp bigint NOT NULL DEFAULT '-1', -- '启动时间', + status int NOT NULL DEFAULT '0', -- '状态0有效,-1无效', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT broker_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX broker_uniq_cluster_id_broker_id ON broker (cluster_id, broker_id); +CREATE TRIGGER broker_trig_gmt_modify + BEFORE UPDATE + ON broker + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- BrokerMetric信息表 +CREATE TABLE broker_metrics +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID', + bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入', + bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出', + bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒被拒绝字节数', + messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数流入', + fail_fetch_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费失败数', + fail_produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒失败生产数', + fetch_consumer_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费请求数', + produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒生产数', + request_handler_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '请求处理器繁忙百分比', + network_processor_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '网络处理器繁忙百分比', + request_queue_size bigint NOT NULL DEFAULT '0', -- '请求列表大小', + response_queue_size bigint NOT NULL DEFAULT '0', -- '响应列表大小', + log_flush_time decimal(53, 2) NOT NULL DEFAULT '0.00', -- '刷日志时间', + total_time_produce_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-平均值', + total_time_produce_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-99分位', + total_time_fetch_consumer_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-平均值', + total_time_fetch_consumer_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-99分位', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + CONSTRAINT broker_metrics_pk PRIMARY KEY (id) +); +CREATE INDEX broker_metrics_idx_cluster_id_broker_id_gmt_create ON broker_metrics (cluster_id, broker_id, gmt_create); + +-- Cluster表 +CREATE TABLE cluster +( + id bigserial, -- '集群ID', + cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称', + zookeeper varchar(512) NOT NULL DEFAULT '', -- 'ZK地址', + bootstrap_servers varchar(512) NOT NULL DEFAULT '', -- 'Server地址', + kafka_version varchar(32) NOT NULL DEFAULT '', -- 'Kafka版本', + alarm_flag int2 NOT NULL DEFAULT '0', -- '0:不开启告警, 1开启告警', + security_protocol varchar(512) NOT NULL DEFAULT '', -- '安全协议', + sasl_mechanism varchar(512) NOT NULL DEFAULT '', -- '安全机制', + sasl_jaas_config varchar(512) NOT NULL DEFAULT '', -- 'Jaas配置', + status int2 NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT cluster_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX cluster_uniq_cluster_name ON cluster (cluster_name); +CREATE TRIGGER cluster_trig_gmt_modify + BEFORE UPDATE + ON cluster + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- ClusterMetrics信息 +CREATE TABLE cluster_metrics +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID', + topic_num int NOT NULL DEFAULT '0', -- 'Topic数', + partition_num int NOT NULL DEFAULT '0', -- '分区数', + broker_num int NOT NULL DEFAULT '0', -- 'Broker数', + bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流入(B)', + bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流出(B)', + bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝(B)', + messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数(条)', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + CONSTRAINT cluster_metrics_pk PRIMARY KEY (id) +); +CREATE INDEX cluster_metrics_idx_cluster_id_gmt_create ON cluster_metrics (cluster_id, gmt_create); + +-- Controller历史变更记录表 +CREATE TABLE controller +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + broker_id int NOT NULL DEFAULT '-1', -- 'BrokerId', + host varchar(256) NOT NULL DEFAULT '', -- '主机名', + timestamp bigint NOT NULL DEFAULT '-1', -- 'Controller变更时间', + version int NOT NULL DEFAULT '-1', -- 'Controller格式版本', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + CONSTRAINT controller_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX controller_uniq_cluster_id_broker_id_timestamp ON controller (cluster_id, broker_id, timestamp); + +-- Topic迁移信息 +CREATE TABLE migration_task +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + reassignment_json text, -- '任务参数', + real_throttle bigint NOT NULL DEFAULT '0', -- '实际限流值(B/s)', + operator varchar(128) NOT NULL DEFAULT '', -- '操作人', + description varchar(256) NOT NULL DEFAULT '', -- '备注说明', + status int NOT NULL DEFAULT '0', -- '任务状态', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务修改时间', + CONSTRAINT migration_task_pk PRIMARY KEY (id) +); +CREATE TRIGGER migration_task_trig_gmt_modify + BEFORE UPDATE + ON migration_task + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +CREATE TABLE operation_history +( + id bigserial, -- 'id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + operator varchar(128) NOT NULL DEFAULT '', -- '操作人', + operation varchar(256) NOT NULL DEFAULT '', -- '操作描述', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + PRIMARY KEY (id) +); +--='操作记录表'; + +-- 分区申请工单 +CREATE TABLE order_partition +( + id bigserial, -- 'id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + applicant varchar(128) NOT NULL DEFAULT '', -- '申请人', + peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流量', + description text, -- '备注信息', + order_status int NOT NULL DEFAULT '0', -- '工单状态', + approver varchar(128) NOT NULL DEFAULT '', -- '审批人', + opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见', + status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT order_partition_pk PRIMARY KEY (id) +); +CREATE TRIGGER order_partition_trig_gmt_modify + BEFORE UPDATE + ON order_partition + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- Topic申请工单 +CREATE TABLE order_topic +( + id bigserial, -- 'ID', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + retention_time bigint NOT NULL DEFAULT '-1', -- '保留时间(ms)', + partition_num int NOT NULL DEFAULT '-1', -- '分区数', + replica_num int NOT NULL DEFAULT '-1', -- '副本数', + regions varchar(128) NOT NULL DEFAULT '', -- 'RegionId列表', + brokers varchar(128) NOT NULL DEFAULT '', -- 'Broker列表', + peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流入流量(KB)', + applicant varchar(128) NOT NULL DEFAULT '', -- '申请人', + principals varchar(256) NOT NULL DEFAULT '', -- '负责人', + description text, -- '备注信息', + order_status int NOT NULL DEFAULT '0', -- '工单状态', + approver varchar(128) NOT NULL DEFAULT '', -- '审批人', + opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见', + status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT order_topic_pk PRIMARY KEY (id) +); +CREATE TRIGGER order_topic_trig_gmt_modify + BEFORE UPDATE + ON order_topic + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- Region信息表 +CREATE TABLE region +( + id bigserial, -- 'id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + region_name varchar(128) NOT NULL DEFAULT '', -- 'Region名称', + broker_list varchar(256) NOT NULL DEFAULT '', -- 'Broker列表', + level int NOT NULL DEFAULT '0', -- 'Region重要等级, 0级普通, 1极重要,2级极重要', + operator varchar(45) NOT NULL DEFAULT '', -- '操作人', + description text, -- '备注说明', + status int NOT NULL DEFAULT '0', -- '状态,0正常,-1废弃,1容量已满', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT region_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX region_uniq_cluster_id_region_name ON region (cluster_id, region_name); +CREATE TRIGGER region_trig_gmt_modify + BEFORE UPDATE + ON region + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- Topic信息表 +CREATE TABLE topic +( + id bigserial, -- 'ID', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + applicant varchar(256) NOT NULL DEFAULT '', -- '申请人', + principals varchar(256) NOT NULL DEFAULT '', -- '负责人', + description text, -- '备注信息', + status int NOT NULL DEFAULT '0', -- '0标识使用中,-1标识已废弃', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT topic_pk PRIMARY KEY (id) +); --=''; +CREATE UNIQUE INDEX topic_uniq_cluster_id_topic_name ON topic (cluster_id, topic_name); +CREATE TRIGGER topic_trig_gmt_modify + BEFORE UPDATE + ON topic + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- 用户收藏的Topic表 +CREATE TABLE topic_favorite +( + id bigserial, -- '自增Id', + username varchar(64) NOT NULL DEFAULT '', -- '用户名', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + status int NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间', + CONSTRAINT topic_favorite_pk PRIMARY KEY (id) +); +CREATE UNIQUE INDEX topic_favorite_uniq_username_cluster_id_topic_name ON topic_favorite (username, cluster_id, topic_name); +CREATE TRIGGER topic_favorite_trig_gmt_modify + BEFORE UPDATE + ON topic_favorite + FOR EACH ROW +EXECUTE PROCEDURE on_update_timestamp(); + +-- TopicMetrics表 +CREATE TABLE topic_metrics +( + id bigserial, -- '自增id', + cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID', + topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称', + messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒进入消息条数', + bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入', + bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出', + bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝字节数', + total_produce_requests decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒请求数', + gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间', + CONSTRAINT topic_metrics_pk PRIMARY KEY (id) +); +CREATE INDEX topic_metrics_idx_cluster_id_topic_name_gmt_create ON topic_metrics (cluster_id, topic_name, gmt_create); diff --git a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminTopicServiceImpl.java b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminTopicServiceImpl.java index f941dc63..3d245e61 100644 --- a/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminTopicServiceImpl.java +++ b/service/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminTopicServiceImpl.java @@ -76,6 +76,7 @@ public class AdminTopicServiceImpl implements AdminTopicService { OperationHistoryDO operationHistoryDO = OperationHistoryDO.newInstance(topicDO.getClusterId(), topicDO.getTopicName(), operator, OperationEnum.CREATE_TOPIC.message); operationHistoryDao.insert(operationHistoryDO); topicDao.replace(topicDO); + } catch (Exception e) { return AdminTopicStatusEnum.REPLACE_DB_FAILED; } @@ -188,4 +189,5 @@ public class AdminTopicServiceImpl implements AdminTopicService { } return AdminTopicStatusEnum.SUCCESS; } + } diff --git a/web/src/main/resources/application-pg.yml b/web/src/main/resources/application-pg.yml new file mode 100644 index 00000000..174d2002 --- /dev/null +++ b/web/src/main/resources/application-pg.yml @@ -0,0 +1,33 @@ +server: + port: 8080 + tomcat: + accept-count: 100 + max-connections: 1000 + max-threads: 20 + min-spare-threads: 20 + +spring: + application: + name: kafkamanager + datasource: + kafka-manager: + driver-class-name: org.postgresql.Driver + jdbc-url: jdbc:postgresql://localhost:5432/kafka_manager?reWriteBatchedInserts=true + username: admin + password: admin + type: com.zaxxer.hikari.HikariDataSource + hikari: + connection-init-sql: "SET TIME ZONE 'Asia/Chongqing';SET CLIENT_ENCODING TO 'UTF-8';" + connection-test-query: "select 1;" + main: + allow-bean-definition-overriding: true + +logging: + config: classpath:logback-spring.xml + +# kafka监控 +kafka-monitor: + enabled: true + notify-kafka: + cluster-id: 95 + topic-name: kmo_monitor diff --git a/web/src/main/resources/application.yml b/web/src/main/resources/application.yml index b0363881..ea9dcc7c 100644 --- a/web/src/main/resources/application.yml +++ b/web/src/main/resources/application.yml @@ -11,16 +11,13 @@ spring: name: kafkamanager datasource: kafka-manager: + driver-class-name: org.mariadb.jdbc.Driver jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8 username: admin password: admin - driver-class-name: org.mariadb.jdbc.Driver main: allow-bean-definition-overriding: true - profiles: - active: dev - logging: config: classpath:logback-spring.xml