diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index 9ca3226e..6f868666 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -14,9 +14,10 @@ XXXX
请遵循此清单,以帮助我们快速轻松地整合您的贡献:
-* [ ] 确保有针对更改提交的 Github issue(通常在您开始处理之前)。诸如拼写错误之类的琐碎更改不需要 Github issue。您的Pull Request应该只解决这个问题,而不需要进行其他更改—— 一个 PR 解决一个问题。
-* [ ] 格式化 Pull Request 标题,如[ISSUE #123] support Confluent Schema Registry。 Pull Request 中的每个提交都应该有一个有意义的主题行和正文。
-* [ ] 编写足够详细的Pull Request描述,以了解Pull Request的作用、方式和原因。
-* [ ] 编写必要的单元测试来验证您的逻辑更正。如果提交了新功能或重大更改,请记住在test 模块中添加 integration-test
-* [ ] 确保编译通过,集成测试通过
+* [ ] 一个 PR(Pull Request的简写)只解决一个问题,禁止一个 PR 解决多个问题;
+* [ ] 确保 PR 有对应的 Issue(通常在您开始处理之前创建),除非是书写错误之类的琐碎更改不需要 Issue ;
+* [ ] 格式化 PR 及 Commit-Log 的标题及内容,例如 #861 。PS:Commit-Log 需要在 Git Commit 代码时进行填写,在 GitHub 上修改不了;
+* [ ] 编写足够详细的 PR 描述,以了解 PR 的作用、方式和原因;
+* [ ] 编写必要的单元测试来验证您的逻辑更正。如果提交了新功能或重大更改,请记住在 test 模块中添加 integration-test;
+* [ ] 确保编译通过,集成测试通过;
diff --git a/.gitignore b/.gitignore
index 045ec395..cfef6e76 100644
--- a/.gitignore
+++ b/.gitignore
@@ -109,4 +109,8 @@ out/*
dist/
dist/*
km-rest/src/main/resources/templates/
-*dependency-reduced-pom*
\ No newline at end of file
+*dependency-reduced-pom*
+#filter flattened xml
+*/.flattened-pom.xml
+.flattened-pom.xml
+*/*/.flattened-pom.xml
\ No newline at end of file
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
index a70c8889..5d8023ba 100644
--- a/CODE_OF_CONDUCT.md
+++ b/CODE_OF_CONDUCT.md
@@ -4,7 +4,7 @@
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
-contributors and maintainers pledge to making participation in our project and
+contributors and maintainers pledge to making participation in our project, and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, gender identity and expression, level of experience,
education, socio-economic status, nationality, personal appearance, race,
@@ -56,7 +56,7 @@ further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
-reported by contacting the project team at shirenchuang@didiglobal.com . All
+reported by contacting the project team at https://knowstreaming.com/support-center . All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
diff --git a/README.md b/README.md
index 9cc19762..8f526268 100644
--- a/README.md
+++ b/README.md
@@ -143,7 +143,7 @@ PS: 提问请尽量把问题一次性描述清楚,并告知环境信息情况
**`2、微信群`**
-微信加群:添加`mike_zhangliang`、`PenceXie`的微信号备注KnowStreaming加群。
+微信加群:添加`mike_zhangliang`、`PenceXie` 、`szzdzhp001`的微信号备注KnowStreaming加群。
加群之前有劳点一下 star,一个小小的 star 是对KnowStreaming作者们努力建设社区的动力。
diff --git a/Releases_Notes.md b/Releases_Notes.md
index ad89a3e9..a606ef72 100644
--- a/Releases_Notes.md
+++ b/Releases_Notes.md
@@ -1,4 +1,113 @@
+## v3.3.0
+
+**问题修复**
+- 修复 Connect 的 JMX-Port 配置未生效问题;
+- 修复 不存在 Connector 时,OverView 页面的数据一直处于加载中的问题;
+- 修复 Group 分区信息,分页时展示不全的问题;
+- 修复采集副本指标时,参数传递错误的问题;
+- 修复用户信息修改后,用户列表会抛出空指针异常的问题;
+- 修复 Topic 详情页面,查看消息时,选择分区不生效问题;
+- 修复对 ZK 客户端进行配置后不生效的问题;
+- 修复 connect 模块,指标中缺少健康巡检项通过数的问题;
+- 修复 connect 模块,指标获取方法存在映射错误的问题;
+- 修复 connect 模块,max 纬度指标获取错误的问题;
+- 修复 Topic 指标大盘 TopN 指标显示信息错误的问题;
+- 修复 Broker Similar Config 显示错误的问题;
+- 修复解析 ZK 四字命令时,数据类型设置错误导致空指针的问题;
+- 修复新增 Topic 时,清理策略选项版本控制错误的问题;
+- 修复新接入集群时 Controller-Host 信息不显示的问题;
+- 修复 Connector 和 MM2 列表搜索不生效的问题;
+- 修复 Zookeeper 页面,Leader 显示存在异常的问题;
+- 修复前端打包失败的问题;
+
+
+**产品优化**
+- ZK Overview 页面补充默认展示的指标;
+- 统一初始化 ES 索引模版的脚本为 init_es_template.sh,同时新增缺失的 connect 索引模版初始化脚本,去除多余的 replica 和 zookeper 索引模版初始化脚本;
+- 指标大盘页面,优化指标筛选操作后,无指标数据的指标卡片由不显示改为显示,并增加无数据的兜底;
+- 删除从 ES 读写 replica 指标的相关代码;
+- 优化 Topic 健康巡检的日志,明确错误的原因;
+- 优化无 ZK 模块时,巡检详情忽略对 ZK 的展示;
+- 优化本地缓存大小为可配置;
+- Task 模块中的返回中,补充任务的分组信息;
+- FAQ 补充 Ldap 的配置说明;
+- FAQ 补充接入 Kerberos 认证的 Kafka 集群的配置说明;
+- ks_km_kafka_change_record 表增加时间纬度的索引,优化查询性能;
+- 优化 ZK 健康巡检的日志,便于问题的排查;
+
+**功能新增**
+- 新增基于滴滴 Kafka 的 Topic 复制功能(需使用滴滴 Kafka 才可具备该能力);
+- Topic 指标大盘,新增 Topic 复制相关的指标;
+- 新增基于 TestContainers 的单测;
+
+
+**Kafka MM2 Beta版 (v3.3.0版本新增发布)**
+- MM2 任务的增删改查;
+- MM2 任务的指标大盘;
+- MM2 任务的健康状态;
+
+---
+
+
+## v3.2.0
+
+**问题修复**
+- 修复健康巡检结果更新至 DB 时,出现死锁问题;
+- 修复 KafkaJMXClient 类中,logger错误的问题;
+- 后端修复 Topic 过期策略在 0.10.1.0 版本能多选的问题,实际应该只能二选一;
+- 修复接入集群时,不填写集群配置会报错的问题;
+- 升级 spring-context 至 5.3.19 版本,修复安全漏洞;
+- 修复 Broker & Topic 修改配置时,多版本兼容配置的版本信息错误的问题;
+- 修复 Topic 列表的健康分为健康状态;
+- 修复 Broker LogSize 指标存储名称错误导致查询不到的问题;
+- 修复 Prometheus 中,缺少 Group 部分指标的问题;
+- 修复因缺少健康状态指标导致集群数错误的问题;
+- 修复后台任务记录操作日志时,因缺少操作用户信息导致出现异常的问题;
+- 修复 Replica 指标查询时,DSL 错误的问题;
+- 关闭 errorLogger,修复错误日志重复输出的问题;
+- 修复系统管理更新用户信息失败的问题;
+- 修复因原AR信息丢失,导致迁移任务一直处于执行中的错误;
+- 修复集群 Topic 列表实时数据查询时,出现失败的问题;
+- 修复集群 Topic 列表,页面白屏问题;
+- 修复副本变更时,因AR数据异常,导致数组访问越界的问题;
+
+
+**产品优化**
+- 优化健康巡检为按照资源维度多线程并发处理;
+- 统一日志输出格式,并优化部分输出的日志;
+- 优化 ZK 四字命令结果解析过程中,容易引起误解的 WARN 日志;
+- 优化 Zookeeper 详情中,目录结构的搜索文案;
+- 优化线程池的名称,方便第三方系统进行相关问题的分析;
+- 去除 ESClient 的并发访问控制,降低 ESClient 创建数及提升利用率;
+- 优化 Topic Messages 抽屉文案;
+- 优化 ZK 健康巡检失败时的错误日志信息;
+- 提高 Offset 信息获取的超时时间,降低并发过高时出现请求超时的概率;
+- 优化 Topic & Partition 元信息的更新策略,降低对 DB 连接的占用;
+- 优化 Sonar 代码扫码问题;
+- 优化分区 Offset 指标的采集;
+- 优化前端图表相关组件逻辑;
+- 优化产品主题色;
+- Consumer 列表刷新按钮新增 hover 提示;
+- 优化配置 Topic 的消息大小时的测试弹框体验;
+- 优化 Overview 页面 TopN 查询的流程;
+
+
+**功能新增**
+- 新增页面无数据排查文档;
+- 增加 ES 索引删除的功能;
+- 支持拆分API服务和Job服务部署;
+
+
+**Kafka Connect Beta版 (v3.2.0版本新增发布)**
+- Connect 集群的纳管;
+- Connector 的增删改查;
+- Connect 集群 & Connector 的指标大盘;
+
+
+---
+
+
## v3.1.0
**Bug修复**
diff --git a/bin/init_es_template.sh b/bin/init_es_template.sh
index 86fcfb66..e570d285 100644
--- a/bin/init_es_template.sh
+++ b/bin/init_es_template.sh
@@ -13,7 +13,7 @@ curl -s --connect-timeout 10 -o /dev/null -X POST -H 'cache-control: no-cache' -
],
"settings" : {
"index" : {
- "number_of_shards" : "10"
+ "number_of_shards" : "2"
}
},
"mappings" : {
@@ -115,7 +115,7 @@ curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: appl
],
"settings" : {
"index" : {
- "number_of_shards" : "10"
+ "number_of_shards" : "2"
}
},
"mappings" : {
@@ -302,7 +302,7 @@ curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: appl
],
"settings" : {
"index" : {
- "number_of_shards" : "10"
+ "number_of_shards" : "6"
}
},
"mappings" : {
@@ -377,7 +377,7 @@ curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: appl
],
"settings" : {
"index" : {
- "number_of_shards" : "10"
+ "number_of_shards" : "6"
}
},
"mappings" : {
@@ -436,72 +436,6 @@ curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: appl
"aliases" : { }
}'
-curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: application/json' http://${esaddr}:${port}/_template/ks_kafka_replication_metric -d '{
- "order" : 10,
- "index_patterns" : [
- "ks_kafka_replication_metric*"
- ],
- "settings" : {
- "index" : {
- "number_of_shards" : "10"
- }
- },
- "mappings" : {
- "properties" : {
- "brokerId" : {
- "type" : "long"
- },
- "partitionId" : {
- "type" : "long"
- },
- "routingValue" : {
- "type" : "text",
- "fields" : {
- "keyword" : {
- "ignore_above" : 256,
- "type" : "keyword"
- }
- }
- },
- "clusterPhyId" : {
- "type" : "long"
- },
- "topic" : {
- "type" : "keyword"
- },
- "metrics" : {
- "properties" : {
- "LogStartOffset" : {
- "type" : "float"
- },
- "Messages" : {
- "type" : "float"
- },
- "LogEndOffset" : {
- "type" : "float"
- }
- }
- },
- "key" : {
- "type" : "text",
- "fields" : {
- "keyword" : {
- "ignore_above" : 256,
- "type" : "keyword"
- }
- }
- },
- "timestamp" : {
- "format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
- "index" : true,
- "type" : "date",
- "doc_values" : true
- }
- }
- },
- "aliases" : { }
- }'
-
curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: application/json' http://${esaddr}:${port}/_template/ks_kafka_topic_metric -d '{
"order" : 10,
"index_patterns" : [
@@ -509,7 +443,7 @@ curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: appl
],
"settings" : {
"index" : {
- "number_of_shards" : "10"
+ "number_of_shards" : "6"
}
},
"mappings" : {
@@ -626,7 +560,7 @@ curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: appl
],
"settings" : {
"index" : {
- "number_of_shards" : "10"
+ "number_of_shards" : "2"
}
},
"mappings" : {
@@ -704,6 +638,388 @@ curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: appl
"aliases" : { }
}'
+curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: application/json' http://${SERVER_ES_ADDRESS}/_template/ks_kafka_connect_cluster_metric -d '{
+ "order" : 10,
+ "index_patterns" : [
+ "ks_kafka_connect_cluster_metric*"
+ ],
+ "settings" : {
+ "index" : {
+ "number_of_shards" : "2"
+ }
+ },
+ "mappings" : {
+ "properties" : {
+ "connectClusterId" : {
+ "type" : "long"
+ },
+ "routingValue" : {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "ignore_above" : 256,
+ "type" : "keyword"
+ }
+ }
+ },
+ "clusterPhyId" : {
+ "type" : "long"
+ },
+ "metrics" : {
+ "properties" : {
+ "ConnectorCount" : {
+ "type" : "float"
+ },
+ "TaskCount" : {
+ "type" : "float"
+ },
+ "ConnectorStartupAttemptsTotal" : {
+ "type" : "float"
+ },
+ "ConnectorStartupFailurePercentage" : {
+ "type" : "float"
+ },
+ "ConnectorStartupFailureTotal" : {
+ "type" : "float"
+ },
+ "ConnectorStartupSuccessPercentage" : {
+ "type" : "float"
+ },
+ "ConnectorStartupSuccessTotal" : {
+ "type" : "float"
+ },
+ "TaskStartupAttemptsTotal" : {
+ "type" : "float"
+ },
+ "TaskStartupFailurePercentage" : {
+ "type" : "float"
+ },
+ "TaskStartupFailureTotal" : {
+ "type" : "float"
+ },
+ "TaskStartupSuccessPercentage" : {
+ "type" : "float"
+ },
+ "TaskStartupSuccessTotal" : {
+ "type" : "float"
+ }
+ }
+ },
+ "key" : {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "ignore_above" : 256,
+ "type" : "keyword"
+ }
+ }
+ },
+ "timestamp" : {
+ "format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
+ "index" : true,
+ "type" : "date",
+ "doc_values" : true
+ }
+ }
+ },
+ "aliases" : { }
+ }'
+
+curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: application/json' http://${SERVER_ES_ADDRESS}/_template/ks_kafka_connect_connector_metric -d '{
+ "order" : 10,
+ "index_patterns" : [
+ "ks_kafka_connect_connector_metric*"
+ ],
+ "settings" : {
+ "index" : {
+ "number_of_shards" : "2"
+ }
+ },
+ "mappings" : {
+ "properties" : {
+ "connectClusterId" : {
+ "type" : "long"
+ },
+ "routingValue" : {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "ignore_above" : 256,
+ "type" : "keyword"
+ }
+ }
+ },
+ "connectorName" : {
+ "type" : "keyword"
+ },
+ "connectorNameAndClusterId" : {
+ "type" : "keyword"
+ },
+ "clusterPhyId" : {
+ "type" : "long"
+ },
+ "metrics" : {
+ "properties" : {
+ "HealthState" : {
+ "type" : "float"
+ },
+ "ConnectorTotalTaskCount" : {
+ "type" : "float"
+ },
+ "HealthCheckPassed" : {
+ "type" : "float"
+ },
+ "HealthCheckTotal" : {
+ "type" : "float"
+ },
+ "ConnectorRunningTaskCount" : {
+ "type" : "float"
+ },
+ "ConnectorPausedTaskCount" : {
+ "type" : "float"
+ },
+ "ConnectorFailedTaskCount" : {
+ "type" : "float"
+ },
+ "ConnectorUnassignedTaskCount" : {
+ "type" : "float"
+ },
+ "BatchSizeAvg" : {
+ "type" : "float"
+ },
+ "BatchSizeMax" : {
+ "type" : "float"
+ },
+ "OffsetCommitAvgTimeMs" : {
+ "type" : "float"
+ },
+ "OffsetCommitMaxTimeMs" : {
+ "type" : "float"
+ },
+ "OffsetCommitFailurePercentage" : {
+ "type" : "float"
+ },
+ "OffsetCommitSuccessPercentage" : {
+ "type" : "float"
+ },
+ "PollBatchAvgTimeMs" : {
+ "type" : "float"
+ },
+ "PollBatchMaxTimeMs" : {
+ "type" : "float"
+ },
+ "SourceRecordActiveCount" : {
+ "type" : "float"
+ },
+ "SourceRecordActiveCountAvg" : {
+ "type" : "float"
+ },
+ "SourceRecordActiveCountMax" : {
+ "type" : "float"
+ },
+ "SourceRecordPollRate" : {
+ "type" : "float"
+ },
+ "SourceRecordPollTotal" : {
+ "type" : "float"
+ },
+ "SourceRecordWriteRate" : {
+ "type" : "float"
+ },
+ "SourceRecordWriteTotal" : {
+ "type" : "float"
+ },
+ "OffsetCommitCompletionRate" : {
+ "type" : "float"
+ },
+ "OffsetCommitCompletionTotal" : {
+ "type" : "float"
+ },
+ "OffsetCommitSkipRate" : {
+ "type" : "float"
+ },
+ "OffsetCommitSkipTotal" : {
+ "type" : "float"
+ },
+ "PartitionCount" : {
+ "type" : "float"
+ },
+ "PutBatchAvgTimeMs" : {
+ "type" : "float"
+ },
+ "PutBatchMaxTimeMs" : {
+ "type" : "float"
+ },
+ "SinkRecordActiveCount" : {
+ "type" : "float"
+ },
+ "SinkRecordActiveCountAvg" : {
+ "type" : "float"
+ },
+ "SinkRecordActiveCountMax" : {
+ "type" : "float"
+ },
+ "SinkRecordLagMax" : {
+ "type" : "float"
+ },
+ "SinkRecordReadRate" : {
+ "type" : "float"
+ },
+ "SinkRecordReadTotal" : {
+ "type" : "float"
+ },
+ "SinkRecordSendRate" : {
+ "type" : "float"
+ },
+ "SinkRecordSendTotal" : {
+ "type" : "float"
+ },
+ "DeadletterqueueProduceFailures" : {
+ "type" : "float"
+ },
+ "DeadletterqueueProduceRequests" : {
+ "type" : "float"
+ },
+ "LastErrorTimestamp" : {
+ "type" : "float"
+ },
+ "TotalErrorsLogged" : {
+ "type" : "float"
+ },
+ "TotalRecordErrors" : {
+ "type" : "float"
+ },
+ "TotalRecordFailures" : {
+ "type" : "float"
+ },
+ "TotalRecordsSkipped" : {
+ "type" : "float"
+ },
+ "TotalRetries" : {
+ "type" : "float"
+ }
+ }
+ },
+ "key" : {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "ignore_above" : 256,
+ "type" : "keyword"
+ }
+ }
+ },
+ "timestamp" : {
+ "format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
+ "index" : true,
+ "type" : "date",
+ "doc_values" : true
+ }
+ }
+ },
+ "aliases" : { }
+ }'
+
+curl -s -o /dev/null -X POST -H 'cache-control: no-cache' -H 'content-type: application/json' http://${SERVER_ES_ADDRESS}/_template/ks_kafka_connect_mirror_maker_metric -d '{
+ "order" : 10,
+ "index_patterns" : [
+ "ks_kafka_connect_mirror_maker_metric*"
+ ],
+ "settings" : {
+ "index" : {
+ "number_of_shards" : "2"
+ }
+ },
+ "mappings" : {
+ "properties" : {
+ "connectClusterId" : {
+ "type" : "long"
+ },
+ "routingValue" : {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "ignore_above" : 256,
+ "type" : "keyword"
+ }
+ }
+ },
+ "connectorName" : {
+ "type" : "keyword"
+ },
+ "connectorNameAndClusterId" : {
+ "type" : "keyword"
+ },
+ "clusterPhyId" : {
+ "type" : "long"
+ },
+ "metrics" : {
+ "properties" : {
+ "HealthState" : {
+ "type" : "float"
+ },
+ "HealthCheckTotal" : {
+ "type" : "float"
+ },
+ "ByteCount" : {
+ "type" : "float"
+ },
+ "ByteRate" : {
+ "type" : "float"
+ },
+ "RecordAgeMs" : {
+ "type" : "float"
+ },
+ "RecordAgeMsAvg" : {
+ "type" : "float"
+ },
+ "RecordAgeMsMax" : {
+ "type" : "float"
+ },
+ "RecordAgeMsMin" : {
+ "type" : "float"
+ },
+ "RecordCount" : {
+ "type" : "float"
+ },
+ "RecordRate" : {
+ "type" : "float"
+ },
+ "ReplicationLatencyMs" : {
+ "type" : "float"
+ },
+ "ReplicationLatencyMsAvg" : {
+ "type" : "float"
+ },
+ "ReplicationLatencyMsMax" : {
+ "type" : "float"
+ },
+ "ReplicationLatencyMsMin" : {
+ "type" : "float"
+ }
+ }
+ },
+ "key" : {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "ignore_above" : 256,
+ "type" : "keyword"
+ }
+ }
+ },
+ "timestamp" : {
+ "format" : "yyyy-MM-dd HH:mm:ss Z||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSS Z||yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss,SSS||yyyy/MM/dd HH:mm:ss||yyyy-MM-dd HH:mm:ss,SSS Z||yyyy/MM/dd HH:mm:ss,SSS Z||epoch_millis",
+ "index" : true,
+ "type" : "date",
+ "doc_values" : true
+ }
+ }
+ },
+ "aliases" : { }
+ }'
+
+
for i in {0..6};
do
logdate=_$(date -d "${i} day ago" +%Y-%m-%d)
@@ -711,8 +1027,10 @@ do
curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_cluster_metric${logdate} && \
curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_group_metric${logdate} && \
curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_partition_metric${logdate} && \
- curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_replication_metric${logdate} && \
curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_zookeeper_metric${logdate} && \
+ curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_connect_cluster_metric${logdate} && \
+ curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_connect_connector_metric${logdate} && \
+ curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_connect_mirror_maker_metric${logdate} && \
curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_topic_metric${logdate} || \
exit 2
done
diff --git a/docs/contribute_guide/assets/分支管理.drawio b/docs/contribute_guide/assets/分支管理.drawio
new file mode 100644
index 00000000..0e7e3d37
--- /dev/null
+++ b/docs/contribute_guide/assets/分支管理.drawio
@@ -0,0 +1,111 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docs/contribute_guide/assets/分支管理.png b/docs/contribute_guide/assets/分支管理.png
new file mode 100644
index 00000000..867fecd4
Binary files /dev/null and b/docs/contribute_guide/assets/分支管理.png differ
diff --git a/docs/contribute_guide/assets/环境初始化.jpg b/docs/contribute_guide/assets/环境初始化.jpg
new file mode 100644
index 00000000..31ff5f28
Binary files /dev/null and b/docs/contribute_guide/assets/环境初始化.jpg differ
diff --git a/docs/contribute_guide/assets/申请合并.jpg b/docs/contribute_guide/assets/申请合并.jpg
new file mode 100644
index 00000000..d02a7f50
Binary files /dev/null and b/docs/contribute_guide/assets/申请合并.jpg differ
diff --git a/docs/contribute_guide/assets/问题认领.jpg b/docs/contribute_guide/assets/问题认领.jpg
new file mode 100644
index 00000000..62da4728
Binary files /dev/null and b/docs/contribute_guide/assets/问题认领.jpg differ
diff --git a/docs/contributer_guide/代码规范.md b/docs/contribute_guide/代码规范.md
similarity index 100%
rename from docs/contributer_guide/代码规范.md
rename to docs/contribute_guide/代码规范.md
diff --git a/docs/contribute_guide/贡献名单.md b/docs/contribute_guide/贡献名单.md
new file mode 100644
index 00000000..41481787
--- /dev/null
+++ b/docs/contribute_guide/贡献名单.md
@@ -0,0 +1,100 @@
+# 贡献名单
+
+- [贡献名单](#贡献名单)
+ - [1、贡献者角色](#1贡献者角色)
+ - [1.1、Maintainer](#11maintainer)
+ - [1.2、Committer](#12committer)
+ - [1.3、Contributor](#13contributor)
+ - [2、贡献者名单](#2贡献者名单)
+
+
+## 1、贡献者角色
+
+KnowStreaming 开发者包含 Maintainer、Committer、Contributor 三种角色,每种角色的标准定义如下。
+
+### 1.1、Maintainer
+
+Maintainer 是对 KnowStreaming 项目的演进和发展做出显著贡献的个人。具体包含以下的标准:
+
+- 完成多个关键模块或者工程的设计与开发,是项目的核心开发人员;
+- 持续的投入和激情,能够积极参与社区、官网、issue、PR 等项目相关事项的维护;
+- 在社区中具有有目共睹的影响力,能够代表 KnowStreaming 参加重要的社区会议和活动;
+- 具有培养 Committer 和 Contributor 的意识和能力;
+
+### 1.2、Committer
+
+Committer 是具有 KnowStreaming 仓库写权限的个人,包含以下的标准:
+
+- 能够在长时间内做持续贡献 issue、PR 的个人;
+- 参与 issue 列表的维护及重要 feature 的讨论;
+- 参与 code review;
+
+### 1.3、Contributor
+
+Contributor 是对 KnowStreaming 项目有贡献的个人,标准为:
+
+- 提交过 PR 并被合并;
+
+---
+
+## 2、贡献者名单
+
+开源贡献者名单(不定期更新)
+
+在名单内,但是没有收到贡献者礼品的同学,可以联系:szzdzhp001
+
+| 姓名 | Github | 角色 | 公司 |
+| ------------------- | ---------------------------------------------------------- | ----------- | -------- |
+| 张亮 | [@zhangliangboy](https://github.com/zhangliangboy) | Maintainer | 滴滴出行 |
+| 谢鹏 | [@PenceXie](https://github.com/PenceXie) | Maintainer | 滴滴出行 |
+| 赵情融 | [@zqrferrari](https://github.com/zqrferrari) | Maintainer | 滴滴出行 |
+| 石臻臻 | [@shirenchuang](https://github.com/shirenchuang) | Maintainer | 滴滴出行 |
+| 曾巧 | [@ZQKC](https://github.com/ZQKC) | Maintainer | 滴滴出行 |
+| 孙超 | [@lucasun](https://github.com/lucasun) | Maintainer | 滴滴出行 |
+| 洪华驰 | [@brodiehong](https://github.com/brodiehong) | Maintainer | 滴滴出行 |
+| 许喆 | [@potaaaaaato](https://github.com/potaaaaaato) | Committer | 滴滴出行 |
+| 郭宇航 | [@GraceWalk](https://github.com/GraceWalk) | Committer | 滴滴出行 |
+| 李伟 | [@velee](https://github.com/velee) | Committer | 滴滴出行 |
+| 张占昌 | [@zzccctv](https://github.com/zzccctv) | Committer | 滴滴出行 |
+| 王东方 | [@wangdongfang-aden](https://github.com/wangdongfang-aden) | Committer | 滴滴出行 |
+| 王耀波 | [@WYAOBO](https://github.com/WYAOBO) | Committer | 滴滴出行 |
+| 赵寅锐 | [@ZHAOYINRUI](https://github.com/ZHAOYINRUI) | Maintainer | 字节跳动 |
+| haoqi123 | [@haoqi123](https://github.com/haoqi123) | Contributor | 前程无忧 |
+| chaixiaoxue | [@chaixiaoxue](https://github.com/chaixiaoxue) | Contributor | SYNNEX |
+| 陆晗 | [@luhea](https://github.com/luhea) | Contributor | 竞技世界 |
+| Mengqi777 | [@Mengqi777](https://github.com/Mengqi777) | Contributor | 腾讯 |
+| ruanliang-hualun | [@ruanliang-hualun](https://github.com/ruanliang-hualun) | Contributor | 网易 |
+| 17hao | [@17hao](https://github.com/17hao) | Contributor | |
+| Huyueeer | [@Huyueeer](https://github.com/Huyueeer) | Contributor | INVENTEC |
+| lomodays207 | [@lomodays207](https://github.com/lomodays207) | Contributor | 建信金科 |
+| Super .Wein(星痕) | [@superspeedone](https://github.com/superspeedone) | Contributor | 韵达 |
+| Hongten | [@Hongten](https://github.com/Hongten) | Contributor | Shopee |
+| 徐正熙 | [@hyper-xx)](https://github.com/hyper-xx) | Contributor | 滴滴出行 |
+| RichardZhengkay | [@RichardZhengkay](https://github.com/RichardZhengkay) | Contributor | 趣街 |
+| 罐子里的茶 | [@gzldc](https://github.com/gzldc) | Contributor | 道富 |
+| 陈忠玉 | [@paula](https://github.com/chenzhongyu11) | Contributor | 平安产险 |
+| 杨光 | [@yaangvipguang](https://github.com/yangvipguang) | Contributor |
+| 王亚聪 | [@wangyacongi](https://github.com/wangyacongi) | Contributor |
+| Yang Jing | [@yangbajing](https://github.com/yangbajing) | Contributor | |
+| 刘新元 Liu XinYuan | [@Liu-XinYuan](https://github.com/Liu-XinYuan) | Contributor | |
+| Joker | [@LiubeyJokerQueue](https://github.com/JokerQueue) | Contributor | 丰巢 |
+| Eason Lau | [@Liubey](https://github.com/Liubey) | Contributor | |
+| hailanxin | [@hailanxin](https://github.com/hailanxin) | Contributor | |
+| Qi Zhang | [@zzzhangqi](https://github.com/zzzhangqi) | Contributor | 好雨科技 |
+| fengxsong | [@fengxsong](https://github.com/fengxsong) | Contributor | |
+| 谢晓东 | [@Strangevy](https://github.com/Strangevy) | Contributor | 花生日记 |
+| ZhaoXinlong | [@ZhaoXinlong](https://github.com/ZhaoXinlong) | Contributor | |
+| xuehaipeng | [@xuehaipeng](https://github.com/xuehaipeng) | Contributor | |
+| 孔令续 | [@mrazkong](https://github.com/mrazkong) | Contributor | |
+| pierre xiong | [@pierre94](https://github.com/pierre94) | Contributor | |
+| PengShuaixin | [@PengShuaixin](https://github.com/PengShuaixin) | Contributor | |
+| 梁壮 | [@lz](https://github.com/silent-night-no-trace) | Contributor | |
+| 张晓寅 | [@ahu0605](https://github.com/ahu0605) | Contributor | 电信数智 |
+| 黄海婷 | [@Huanghaiting](https://github.com/Huanghaiting) | Contributor | 云徙科技 |
+| 任祥德 | [@RenChauncy](https://github.com/RenChauncy) | Contributor | 探马企服 |
+| 胡圣林 | [@slhu997](https://github.com/slhu997) | Contributor | |
+| 史泽颖 | [@shizeying](https://github.com/shizeying) | Contributor | |
+| 王玉博 | [@Wyb7290](https://github.com/Wyb7290) | Committer | |
+| 伍璇 | [@Luckywustone](https://github.com/Luckywustone) | Contributor ||
+| 邓苑 | [@CatherineDY](https://github.com/CatherineDY) | Contributor ||
+| 封琼凤 | [@Luckywustone](https://github.com/fengqiongfeng) | Committer ||
diff --git a/docs/contribute_guide/贡献指南.md b/docs/contribute_guide/贡献指南.md
new file mode 100644
index 00000000..37cf89bc
--- /dev/null
+++ b/docs/contribute_guide/贡献指南.md
@@ -0,0 +1,167 @@
+# 贡献指南
+
+- [贡献指南](#贡献指南)
+ - [1、行为准则](#1行为准则)
+ - [2、仓库规范](#2仓库规范)
+ - [2.1、Issue 规范](#21issue-规范)
+ - [2.2、Commit-Log 规范](#22commit-log-规范)
+ - [2.3、Pull-Request 规范](#23pull-request-规范)
+ - [3、操作示例](#3操作示例)
+ - [3.1、初始化环境](#31初始化环境)
+ - [3.2、认领问题](#32认领问题)
+ - [3.3、处理问题 \& 提交解决](#33处理问题--提交解决)
+ - [3.4、请求合并](#34请求合并)
+ - [4、常见问题](#4常见问题)
+ - [4.1、如何将多个 Commit-Log 合并为一个?](#41如何将多个-commit-log-合并为一个)
+
+
+---
+
+
+欢迎 👏🏻 👏🏻 👏🏻 来到 `KnowStreaming`。本文档是关于如何为 `KnowStreaming` 做出贡献的指南。如果您发现不正确或遗漏的内容, 请留下您的意见/建议。
+
+
+---
+
+
+## 1、行为准则
+
+请务必阅读并遵守我们的:[行为准则](https://github.com/didi/KnowStreaming/blob/master/CODE_OF_CONDUCT.md)。
+
+
+## 2、仓库规范
+
+### 2.1、Issue 规范
+
+按要求,在 [创建Issue](https://github.com/didi/KnowStreaming/issues/new/choose) 中创建ISSUE即可。
+
+需要重点说明的是:
+- 提供出现问题的环境信息,包括使用的系统,使用的KS版本等;
+- 提供出现问题的复现方式;
+
+
+### 2.2、Commit-Log 规范
+
+`Commit-Log` 包含三部分 `Header`、`Body`、`Footer`。其中 `Header` 是必须的,格式固定,`Body` 在变更有必要详细解释时使用。
+
+
+**1、`Header` 规范**
+
+`Header` 格式为 `[Type]Message(#IssueID)`, 主要有三部分组成,分别是`Type`、`Message`、`IssueID`,
+
+- `Type`:说明这个提交是哪一个类型的,比如有 Bugfix、Feature、Optimize等;
+- `Message`:说明提交的信息,比如修复xx问题;
+- `IssueID`:该提交,关联的Issue的编号;
+
+
+实际例子:[`[Bugfix]修复新接入的集群,Controller-Host不显示的问题(#927)`](https://github.com/didi/KnowStreaming/pull/933/commits)
+
+
+
+**2、`Body` 规范**
+
+一般不需要,如果解决了较复杂问题,或者代码较多,需要 `Body` 说清楚解决的问题,解决的思路等信息。
+
+---
+
+**3、实际例子**
+
+```
+[Optimize]优化 MySQL & ES 测试容器的初始化(#906)
+
+主要的变更
+1、knowstreaming/knowstreaming-manager 容器;
+2、knowstreaming/knowstreaming-mysql 容器调整为使用 mysql:5.7 容器;
+3、初始化 mysql:5.7 容器后,增加初始化 MySQL 表及数据的动作;
+
+被影响的变更:
+1、移动 km-dist/init/sql 下的MySQL初始化脚本至 km-persistence/src/main/resource/sql 下,以便项目测试时加载到所需的初始化 SQL;
+2、删除无用的 km-dist/init/template 目录;
+3、因为 km-dist/init/sql 和 km-dist/init/template 目录的调整,因此也调整 ReleaseKnowStreaming.xml 内的文件内容;
+```
+
+
+**TODO : 后续有兴趣的同学,可以考虑引入 Git 的 Hook 进行更好的 Commit-Log 的管理。**
+
+
+### 2.3、Pull-Request 规范
+
+详细见:[PULL-REQUEST 模版](../../.github/PULL_REQUEST_TEMPLATE.md)
+
+需要重点说明的是:
+
+- 任何 PR 都必须与有效 ISSUE 相关联。否则, PR 将被拒绝;
+- 一个分支只修改一件事,一个 PR 只修改一件事;
+
+---
+
+
+## 3、操作示例
+
+本节主要介绍对 `KnowStreaming` 进行代码贡献时,相关的操作方式及操作命令。
+
+名词说明:
+- 主仓库:https://github.com/didi/KnowStreaming 这个仓库为主仓库。
+- 分仓库:Fork 到自己账号下的 KnowStreaming 仓库为分仓库;
+
+
+### 3.1、初始化环境
+
+1. `Fork KnowStreaming` 主仓库至自己账号下,见 https://github.com/didi/KnowStreaming 地址右上角的 `Fork` 按钮;
+2. 克隆分仓库至本地:`git clone git@github.com:xxxxxxx/KnowStreaming.git`,该仓库的简写名通常是`origin`;
+3. 添加主仓库至本地:`git remote add upstream https://github.com/didi/KnowStreaming`,`upstream`是主仓库在本地的简写名,可以随意命名,前后保持一致即可;
+4. 拉取主仓库代码:`git fetch upstream`;
+5. 拉取分仓库代码:`git fetch origin`;
+6. 将主仓库的`master`分支,拉取到本地并命名为`github_master`:`git checkout -b upstream/master`;
+
+最后,我们来看一下初始化完成之后的大致效果,具体如下图所示:
+
+
+
+至此,我们的环境就初始化好了。后续,`github_master` 分支就是主仓库的`master`分支,我们可以使用`git pull`拉取该分支的最新代码,还可以使用`git checkout -b xxx`拉取我们想要的分支。
+
+
+
+### 3.2、认领问题
+
+在文末评论说明自己要处理该问题即可,具体如下图所示:
+
+
+
+
+### 3.3、处理问题 & 提交解决
+
+本节主要介绍一下处理问题 & 提交解决过程中的分支管理,具体如下图所示:
+
+
+
+1. 切换到主分支:`git checkout github_master`;
+2. 主分支拉最新代码:`git pull`;
+3. 基于主分支拉新分支:`git checkout -b fix_928`;
+4. 提交代码,安装commit的规范进行提交,例如:`git commit -m "[Optimize]优化xxx问题(#928)"`;
+5. 提交到自己远端仓库:`git push --set-upstream origin fix_928`;
+6. `GitHub` 页面发起 `Pull Request` 请求,管理员合入主仓库。这部分详细见下一节;
+
+
+### 3.4、请求合并
+
+代码在提交到 `GitHub` 分仓库之后,就可以在 `GitHub` 的网站创建 `Pull Request`,申请将代码合入主仓库了。 `Pull Request` 具体见下图所示:
+
+
+
+
+
+[Pull Request 创建的例子](https://github.com/didi/KnowStreaming/pull/945)
+
+
+
+---
+
+
+## 4、常见问题
+
+### 4.1、如何将多个 Commit-Log 合并为一个?
+
+可以使用 `git rebase -i` 命令进行解决。
+
+
diff --git a/docs/contributer_guide/开发者名单.md b/docs/contributer_guide/开发者名单.md
deleted file mode 100644
index 3f2c708d..00000000
--- a/docs/contributer_guide/开发者名单.md
+++ /dev/null
@@ -1,6 +0,0 @@
-
-开源贡献者证书发放名单(定期更新)
-
-
-贡献者名单请看:[贡献者名单](https://doc.knowstreaming.com/product/10-contribution#106-贡献者名单)
-
diff --git a/docs/contributer_guide/贡献流程.md b/docs/contributer_guide/贡献流程.md
deleted file mode 100644
index 42679379..00000000
--- a/docs/contributer_guide/贡献流程.md
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-请点击:[贡献流程](https://doc.knowstreaming.com/product/10-contribution#102-贡献流程)
\ No newline at end of file
diff --git a/docs/dev_guide/无数据排查文档.md b/docs/dev_guide/无数据排查文档.md
new file mode 100644
index 00000000..fd7886bc
--- /dev/null
+++ b/docs/dev_guide/无数据排查文档.md
@@ -0,0 +1,285 @@
+## 1、集群接入错误
+
+### 1.1、异常现象
+
+如下图所示,集群非空时,大概率为地址配置错误导致。
+
+
+
+
+
+### 1.2、解决方案
+
+接入集群时,依据提示的错误,进行相应的解决。例如:
+
+
+
+### 1.3、正常情况
+
+接入集群时,页面信息都自动正常出现,没有提示错误。
+
+
+
+## 2、JMX连接失败(需使用3.0.1及以上版本)
+
+### 2.1异常现象
+
+Broker列表的JMX Port列出现红色感叹号,则该Broker的JMX连接异常。
+
+
+
+
+
+#### 2.1.1、原因一:JMX未开启
+
+##### 2.1.1.1、异常现象
+
+broker列表的JMX Port值为-1,对应Broker的JMX未开启。
+
+
+
+##### 2.1.1.2、解决方案
+
+开启JMX,开启流程如下:
+
+1、修改kafka的bin目录下面的:`kafka-server-start.sh`文件
+
+```
+# 在这个下面增加JMX端口的配置
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+ export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
+ export JMX_PORT=9999 # 增加这个配置, 这里的数值并不一定是要9999
+fi
+```
+
+
+
+2、修改kafka的bin目录下面对的:`kafka-run-class.sh`文件
+
+```
+# JMX settings
+if [ -z "$KAFKA_JMX_OPTS" ]; then
+ KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false
+ -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${当前机器的IP}"
+fi
+
+# JMX port to use
+if [ $JMX_PORT ]; then
+ KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT - Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
+fi
+```
+
+
+
+3、重启Kafka-Broker。
+
+
+
+#### 2.1.2、原因二:JMX配置错误
+
+##### 2.1.2.1、异常现象
+
+错误日志:
+
+```
+# 错误一: 错误提示的是真实的IP,这样的话基本就是JMX配置的有问题了。
+2021-01-27 10:06:20.730 ERROR 50901 --- [ics-Thread-1-62] c.x.k.m.c.utils.jmx.JmxConnectorWrap : JMX connect exception, host:192.168.0.1 port:9999. java.rmi.ConnectException: Connection refused to host: 192.168.0.1; nested exception is:
+
+# 错误二:错误提示的是127.0.0.1这个IP,这个是机器的hostname配置的可能有问题。
+2021-01-27 10:06:20.730 ERROR 50901 --- [ics-Thread-1-62] c.x.k.m.c.utils.jmx.JmxConnectorWrap : JMX connect exception, host:127.0.0.1 port:9999. java.rmi.ConnectException: Connection refused to host: 127.0.0.1;; nested exception is:
+```
+
+
+
+##### 2.1.2.2、解决方案
+
+开启JMX,开启流程如下:
+
+1、修改kafka的bin目录下面的:`kafka-server-start.sh`文件
+
+```
+# 在这个下面增加JMX端口的配置
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+ export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
+ export JMX_PORT=9999 # 增加这个配置, 这里的数值并不一定是要9999
+fi
+```
+
+
+
+2、修改kafka的bin目录下面对的:`kafka-run-class.sh`文件
+
+```
+# JMX settings
+if [ -z "$KAFKA_JMX_OPTS" ]; then
+ KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false
+ -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${当前机器的IP}"
+fi
+
+# JMX port to use
+if [ $JMX_PORT ]; then
+ KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT - Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
+fi
+```
+
+
+
+3、重启Kafka-Broker。
+
+
+
+#### 2.1.3、原因三:JMX开启SSL
+
+##### 2.1.3.1、解决方案
+
+
+
+#### 2.1.4、原因四:连接了错误IP
+
+##### 2.1.4.1、异常现象
+
+Broker 配置了内外网,而JMX在配置时,可能配置了内网IP或者外网IP,此时`KnowStreaming` 需要连接到特定网络的IP才可以进行访问。
+
+ 比如:Broker在ZK的存储结构如下所示,我们期望连接到 `endpoints` 中标记为 `INTERNAL` 的地址,但是 `KnowStreaming` 却连接了 `EXTERNAL` 的地址。
+
+```json
+{
+ "listener_security_protocol_map": {
+ "EXTERNAL": "SASL_PLAINTEXT",
+ "INTERNAL": "SASL_PLAINTEXT"
+ },
+ "endpoints": [
+ "EXTERNAL://192.168.0.1:7092",
+ "INTERNAL://192.168.0.2:7093"
+ ],
+ "jmx_port": 8099,
+ "host": "192.168.0.1",
+ "timestamp": "1627289710439",
+ "port": -1,
+ "version": 4
+}
+```
+
+##### 2.1.4.2、解决方案
+
+可以手动往`ks_km_physical_cluster`表的`jmx_properties`字段增加一个`useWhichEndpoint`字段,从而控制 `KnowStreaming` 连接到特定的JMX IP及PORT。
+
+`jmx_properties`格式:
+
+```json
+{
+ "maxConn": 100, // KM对单台Broker的最大JMX连接数
+ "username": "xxxxx", //用户名,可以不填写
+ "password": "xxxx", // 密码,可以不填写
+ "openSSL": true, //开启SSL, true表示开启ssl, false表示关闭
+ "useWhichEndpoint": "EXTERNAL" //指定要连接的网络名称,填写EXTERNAL就是连接endpoints里面的EXTERNAL地址
+}
+```
+
+
+
+SQL例子:
+
+```sql
+UPDATE ks_km_physical_cluster SET jmx_properties='{ "maxConn": 10, "username": "xxxxx", "password": "xxxx", "openSSL": false , "useWhichEndpoint": "xxx"}' where id={xxx};
+```
+
+### 2.2、正常情况
+
+修改完成后,如果看到 JMX PORT这一列全部为绿色,则表示JMX已正常。
+
+
+
+
+
+## 3、Elasticsearch问题
+
+注意:mac系统在执行curl指令时,可能报zsh错误。可参考以下操作。
+
+```
+1 进入.zshrc 文件 vim ~/.zshrc
+2.在.zshrc中加入 setopt no_nomatch
+3.更新配置 source ~/.zshrc
+```
+
+### 3.1、原因一:缺少索引
+
+#### 3.1.1、异常现象
+
+报错信息
+
+```
+com.didiglobal.logi.elasticsearch.client.model.exception.ESIndexNotFoundException: method [GET], host[http://127.0.0.1:9200], URI [/ks_kafka_broker_metric_2022-10-21,ks_kafka_broker_metric_2022-10-22/_search], status line [HTTP/1.1 404 Not Found]
+```
+
+curl http://{ES的IP地址}:{ES的端口号}/_cat/indices/ks_kafka* 查看KS索引列表,发现没有索引。
+
+#### 3.1.2、解决方案
+
+执行 [ES索引及模版初始化](https://github.com/didi/KnowStreaming/blob/master/bin/init_es_template.sh) 脚本,来创建索引及模版。
+
+
+
+### 3.2、原因二:索引模板错误
+
+#### 3.2.1、异常现象
+
+多集群列表有数据,集群详情页图标无数据。查询KS索引模板列表,发现不存在。
+
+```
+curl {ES的IP地址}:{ES的端口号}/_cat/templates/ks_kafka*?v&h=name
+```
+
+正常KS模板如下图所示。
+
+
+
+
+
+#### 3.2.2、解决方案
+
+删除KS索引模板和索引
+
+```
+curl -XDELETE {ES的IP地址}:{ES的端口号}/ks_kafka*
+curl -XDELETE {ES的IP地址}:{ES的端口号}/_template/ks_kafka*
+```
+
+执行 [ES索引及模版初始化](https://github.com/didi/KnowStreaming/blob/master/bin/init_es_template.sh) 脚本,来创建索引及模版。
+
+
+### 3.3、原因三:集群Shard满
+
+#### 3.3.1、异常现象
+
+报错信息
+
+```
+com.didiglobal.logi.elasticsearch.client.model.exception.ESIndexNotFoundException: method [GET], host[http://127.0.0.1:9200], URI [/ks_kafka_broker_metric_2022-10-21,ks_kafka_broker_metric_2022-10-22/_search], status line [HTTP/1.1 404 Not Found]
+```
+
+尝试手动创建索引失败。
+
+```
+#创建ks_kafka_cluster_metric_test索引的指令
+curl -s -XPUT http://{ES的IP地址}:{ES的端口号}/ks_kafka_cluster_metric_test
+```
+
+#### 3.3.2、解决方案
+
+ES索引的默认分片数量为1000,达到数量以后,索引创建失败。
+
++ 扩大ES索引数量上限,执行指令
+
+```
+curl -XPUT -H"content-type:application/json" http://{ES的IP地址}:{ES的端口号}/_cluster/settings -d '
+{
+ "persistent": {
+ "cluster": {
+ "max_shards_per_node":{索引上限,默认为1000}
+ }
+ }
+}'
+```
+
+执行 [ES索引及模版初始化](https://github.com/didi/KnowStreaming/blob/master/bin/init_es_template.sh) 脚本,来补全索引。
diff --git a/docs/install_guide/版本升级手册.md b/docs/install_guide/版本升级手册.md
index 3cd580b8..061c080d 100644
--- a/docs/install_guide/版本升级手册.md
+++ b/docs/install_guide/版本升级手册.md
@@ -4,11 +4,180 @@
- 如果想升级至具体版本,需要将你当前版本至你期望使用版本的变更统统执行一遍,然后才能正常使用。
- 如果中间某个版本没有升级信息,则表示该版本直接替换安装包即可从前一个版本升级至当前版本。
-### 6.2.0、升级至 `master` 版本
+### 升级至 `master` 版本
-暂无
-### 6.2.1、升级至 `v3.1.0` 版本
+### 升级至 `3.3.0` 版本
+
+**SQL 变更**
+```sql
+ALTER TABLE `logi_security_user`
+ CHANGE COLUMN `phone` `phone` VARCHAR(20) NOT NULL DEFAULT '' COMMENT 'mobile' ;
+
+ALTER TABLE ks_kc_connector ADD `heartbeat_connector_name` varchar(512) DEFAULT '' COMMENT '心跳检测connector名称';
+ALTER TABLE ks_kc_connector ADD `checkpoint_connector_name` varchar(512) DEFAULT '' COMMENT '进度确认connector名称';
+
+INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`, `value_group`, `value_name`, `value`, `description`, `operator`) VALUES ('-1', 'HEALTH', 'HC_MIRROR_MAKER_TOTAL_RECORD_ERRORS', '{\"value\" : 1}', 'MirrorMaker消息处理错误的次数', 'admin');
+INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`, `value_group`, `value_name`, `value`, `description`, `operator`) VALUES ('-1', 'HEALTH', 'HC_MIRROR_MAKER_REPLICATION_LATENCY_MS_MAX', '{\"value\" : 6000}', 'MirrorMaker消息复制最大延迟时间', 'admin');
+INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`, `value_group`, `value_name`, `value`, `description`, `operator`) VALUES ('-1', 'HEALTH', 'HC_MIRROR_MAKER_UNASSIGNED_TASK_COUNT', '{\"value\" : 20}', 'MirrorMaker未被分配的任务数量', 'admin');
+INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`, `value_group`, `value_name`, `value`, `description`, `operator`) VALUES ('-1', 'HEALTH', 'HC_MIRROR_MAKER_FAILED_TASK_COUNT', '{\"value\" : 10}', 'MirrorMaker失败状态的任务数量', 'admin');
+
+
+-- 多集群管理权限2023-01-05新增
+INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2012', 'Topic-新增Topic复制', '1593', '1', '2', 'Topic-新增Topic复制', '0', 'know-streaming');
+INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2014', 'Topic-详情-取消Topic复制', '1593', '1', '2', 'Topic-详情-取消Topic复制', '0', 'know-streaming');
+
+INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2012', '0', 'know-streaming');
+INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2014', '0', 'know-streaming');
+
+
+-- 多集群管理权限2023-01-18新增
+INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2016', 'MM2-新增', '1593', '1', '2', 'MM2-新增', '0', 'know-streaming');
+INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2018', 'MM2-编辑', '1593', '1', '2', 'MM2-编辑', '0', 'know-streaming');
+INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2020', 'MM2-删除', '1593', '1', '2', 'MM2-删除', '0', 'know-streaming');
+INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2022', 'MM2-重启', '1593', '1', '2', 'MM2-重启', '0', 'know-streaming');
+INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2024', 'MM2-暂停&恢复', '1593', '1', '2', 'MM2-暂停&恢复', '0', 'know-streaming');
+
+INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2016', '0', 'know-streaming');
+INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2018', '0', 'know-streaming');
+INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2020', '0', 'know-streaming');
+INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2022', '0', 'know-streaming');
+INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2024', '0', 'know-streaming');
+
+
+DROP TABLE IF EXISTS `ks_ha_active_standby_relation`;
+CREATE TABLE `ks_ha_active_standby_relation` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `active_cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '主集群ID',
+ `standby_cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT '备集群ID',
+ `res_name` varchar(192) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '资源名称',
+ `res_type` int(11) NOT NULL DEFAULT '-1' COMMENT '资源类型,0:集群,1:镜像Topic,2:主备Topic',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uniq_cluster_res` (`res_type`,`active_cluster_phy_id`,`standby_cluster_phy_id`,`res_name`),
+ UNIQUE KEY `uniq_res_type_standby_cluster_res_name` (`res_type`,`standby_cluster_phy_id`,`res_name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='HA主备关系表';
+
+
+-- 删除idx_cluster_phy_id 索引并新增idx_cluster_update_time索引
+ALTER TABLE `ks_km_kafka_change_record` DROP INDEX `idx_cluster_phy_id` ,
+ADD INDEX `idx_cluster_update_time` (`cluster_phy_id` ASC, `update_time` ASC);
+```
+
+### 升级至 `3.2.0` 版本
+
+**配置变更**
+
+```yaml
+# 新增如下配置
+
+spring:
+ logi-job: # know-streaming 依赖的 logi-job 模块的数据库的配置,默认与 know-streaming 的数据库配置保持一致即可
+ enable: true # true表示开启job任务, false表关闭。KS在部署上可以考虑部署两套服务,一套处理前端请求,一套执行job任务,此时可以通过该字段进行控制
+
+# 线程池大小相关配置
+thread-pool:
+ es:
+ search: # es查询线程池
+ thread-num: 20 # 线程池大小
+ queue-size: 10000 # 队列大小
+
+# 客户端池大小相关配置
+client-pool:
+ kafka-admin:
+ client-cnt: 1 # 每个Kafka集群创建的KafkaAdminClient数
+
+# ES客户端配置
+es:
+ index:
+ expire: 15 # 索引过期天数,15表示超过15天的索引会被KS过期删除
+```
+
+**SQL 变更**
+```sql
+DROP TABLE IF EXISTS `ks_kc_connect_cluster`;
+CREATE TABLE `ks_kc_connect_cluster` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'Connect集群ID',
+ `kafka_cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Kafka集群ID',
+ `name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群名称',
+ `group_name` varchar(128) NOT NULL DEFAULT '' COMMENT '集群Group名称',
+ `cluster_url` varchar(1024) NOT NULL DEFAULT '' COMMENT '集群地址',
+ `member_leader_url` varchar(1024) NOT NULL DEFAULT '' COMMENT 'URL地址',
+ `version` varchar(64) NOT NULL DEFAULT '' COMMENT 'connect版本',
+ `jmx_properties` text COMMENT 'JMX配置',
+ `state` tinyint(4) NOT NULL DEFAULT '1' COMMENT '集群使用的消费组状态,也表示集群状态:-1 Unknown,0 ReBalance,1 Active,2 Dead,3 Empty',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '接入时间',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uniq_id_group_name` (`id`,`group_name`),
+ UNIQUE KEY `uniq_name_kafka_cluster` (`name`,`kafka_cluster_phy_id`),
+ KEY `idx_kafka_cluster_phy_id` (`kafka_cluster_phy_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Connect集群信息表';
+
+
+DROP TABLE IF EXISTS `ks_kc_connector`;
+CREATE TABLE `ks_kc_connector` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `kafka_cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Kafka集群ID',
+ `connect_cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Connect集群ID',
+ `connector_name` varchar(512) NOT NULL DEFAULT '' COMMENT 'Connector名称',
+ `connector_class_name` varchar(512) NOT NULL DEFAULT '' COMMENT 'Connector类',
+ `connector_type` varchar(32) NOT NULL DEFAULT '' COMMENT 'Connector类型',
+ `state` varchar(45) NOT NULL DEFAULT '' COMMENT '状态',
+ `topics` text COMMENT '访问过的Topics',
+ `task_count` int(11) NOT NULL DEFAULT '0' COMMENT '任务数',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uniq_connect_cluster_id_connector_name` (`connect_cluster_id`,`connector_name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Connector信息表';
+
+
+DROP TABLE IF EXISTS `ks_kc_worker`;
+CREATE TABLE `ks_kc_worker` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `kafka_cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Kafka集群ID',
+ `connect_cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Connect集群ID',
+ `member_id` varchar(512) NOT NULL DEFAULT '' COMMENT '成员ID',
+ `host` varchar(128) NOT NULL DEFAULT '' COMMENT '主机名',
+ `jmx_port` int(16) NOT NULL DEFAULT '-1' COMMENT 'Jmx端口',
+ `url` varchar(1024) NOT NULL DEFAULT '' COMMENT 'URL信息',
+ `leader_url` varchar(1024) NOT NULL DEFAULT '' COMMENT 'leaderURL信息',
+ `leader` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1是leader,0不是leader',
+ `worker_id` varchar(128) NOT NULL COMMENT 'worker地址',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uniq_cluster_id_member_id` (`connect_cluster_id`,`member_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='worker信息表';
+
+
+DROP TABLE IF EXISTS `ks_kc_worker_connector`;
+CREATE TABLE `ks_kc_worker_connector` (
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `kafka_cluster_phy_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Kafka集群ID',
+ `connect_cluster_id` bigint(20) NOT NULL DEFAULT '-1' COMMENT 'Connect集群ID',
+ `connector_name` varchar(512) NOT NULL DEFAULT '' COMMENT 'Connector名称',
+ `worker_member_id` varchar(256) NOT NULL DEFAULT '',
+ `task_id` int(16) NOT NULL DEFAULT '-1' COMMENT 'Task的ID',
+ `state` varchar(128) DEFAULT NULL COMMENT '任务状态',
+ `worker_id` varchar(128) DEFAULT NULL COMMENT 'worker信息',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uniq_relation` (`connect_cluster_id`,`connector_name`,`task_id`,`worker_member_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Worker和Connector关系表';
+
+INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`, `value_group`, `value_name`, `value`, `description`, `operator`) VALUES ('-1', 'HEALTH', 'HC_CONNECTOR_FAILED_TASK_COUNT', '{\"value\" : 1}', 'connector失败状态的任务数量', 'admin');
+INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`, `value_group`, `value_name`, `value`, `description`, `operator`) VALUES ('-1', 'HEALTH', 'HC_CONNECTOR_UNASSIGNED_TASK_COUNT', '{\"value\" : 1}', 'connector未被分配的任务数量', 'admin');
+INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`, `value_group`, `value_name`, `value`, `description`, `operator`) VALUES ('-1', 'HEALTH', 'HC_CONNECT_CLUSTER_TASK_STARTUP_FAILURE_PERCENTAGE', '{\"value\" : 0.05}', 'Connect集群任务启动失败概率', 'admin');
+```
+
+---
+
+### 升级至 `v3.1.0` 版本
```sql
INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`, `value_group`, `value_name`, `value`, `description`, `operator`) VALUES ('-1', 'HEALTH', 'HC_ZK_BRAIN_SPLIT', '{ \"value\": 1} ', 'ZK 脑裂', 'admin');
@@ -20,7 +189,7 @@ INSERT INTO `ks_km_platform_cluster_config` (`cluster_id`, `value_group`, `value
```
-### 6.2.2、升级至 `v3.0.1` 版本
+### 升级至 `v3.0.1` 版本
**ES 索引模版**
```bash
@@ -155,7 +324,7 @@ CREATE TABLE `ks_km_group` (
```
-### 6.2.3、升级至 `v3.0.0` 版本
+### 升级至 `v3.0.0` 版本
**SQL 变更**
@@ -167,7 +336,7 @@ ADD COLUMN `zk_properties` TEXT NULL COMMENT 'ZK配置' AFTER `jmx_properties`;
---
-### 6.2.4、升级至 `v3.0.0-beta.2`版本
+### 升级至 `v3.0.0-beta.2`版本
**配置变更**
@@ -238,7 +407,7 @@ ALTER TABLE `logi_security_oplog`
---
-### 6.2.5、升级至 `v3.0.0-beta.1`版本
+### 升级至 `v3.0.0-beta.1`版本
**SQL 变更**
@@ -257,7 +426,7 @@ ALTER COLUMN `operation_methods` set default '';
---
-### 6.2.6、`2.x`版本 升级至 `v3.0.0-beta.0`版本
+### `2.x`版本 升级至 `v3.0.0-beta.0`版本
**升级步骤:**
diff --git a/docs/user_guide/faq.md b/docs/user_guide/faq.md
index a91cdf79..1656ec37 100644
--- a/docs/user_guide/faq.md
+++ b/docs/user_guide/faq.md
@@ -182,3 +182,47 @@ Node 版本: v12.22.12
+ 原因:由于数据库编码和我们提供的脚本不一致,数据库里的数据发生了乱码,因此出现权限识别失败问题。
+ 解决方案:清空数据库数据,将数据库字符集调整为utf8,最后重新执行[dml-logi.sql](https://github.com/didi/KnowStreaming/blob/master/km-dist/init/sql/dml-logi.sql)脚本导入数据即可。
+
+
+## 8.13、接入开启kerberos认证的kafka集群
+
+1. 部署KnowStreaming的机器上安装krb客户端;
+2. 替换/etc/krb5.conf配置文件;
+3. 把kafka对应的keytab复制到改机器目录下;
+4. 接入集群时认证配置,配置信息根据实际情况填写;
+```json
+{
+ "security.protocol": "SASL_PLAINTEXT",
+ "sasl.mechanism": "GSSAPI",
+ "sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"/etc/keytab/kafka.keytab\" storeKey=true useTicketCache=false principal=\"kafka/kafka@TEST.COM\";",
+ "sasl.kerberos.service.name": "kafka"
+}
+```
+
+
+## 8.14、对接Ldap的配置
+
+```yaml
+# 需要在application.yml中增加如下配置。相关配置的信息,按实际情况进行调整
+account:
+ ldap:
+ url: ldap://127.0.0.1:8080/
+ basedn: DC=senz,DC=local
+ factory: com.sun.jndi.ldap.LdapCtxFactory
+ filter: sAMAccountName
+ security:
+ authentication: simple
+ principal: CN=search,DC=senz,DC=local
+ credentials: xxxxxxx
+ auth-user-registration: false # 是否注册到mysql,默认false
+ auth-user-registration-role: 1677 # 1677是超级管理员角色的id,如果赋予想默认赋予普通角色,可以到ks新建一个。
+
+# 需要在application.yml中修改如下配置
+spring:
+ logi-security:
+ login-extend-bean-name: ksLdapLoginService # 表示使用ldap的service
+```
+
+## 8.15、测试时使用Testcontainers的说明
+1. 需要docker运行环境 [Testcontainers运行环境说明](https://www.testcontainers.org/supported_docker_environment/)
+2. 如果本机没有docker,可以使用[远程访问docker](https://docs.docker.com/config/daemon/remote-access/) [Testcontainers配置说明](https://www.testcontainers.org/features/configuration/#customizing-docker-host-detection)
\ No newline at end of file
diff --git a/km-biz/pom.xml b/km-biz/pom.xml
index 54399210..b8c3457b 100644
--- a/km-biz/pom.xml
+++ b/km-biz/pom.xml
@@ -62,10 +62,6 @@
commons-lang
commons-lang
-
- junit
- junit
-
commons-codec
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterConnectorsManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterConnectorsManager.java
new file mode 100644
index 00000000..c20c5c77
--- /dev/null
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/ClusterConnectorsManager.java
@@ -0,0 +1,15 @@
+package com.xiaojukeji.know.streaming.km.biz.cluster;
+
+import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterConnectorsOverviewDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectStateVO;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO;
+
+/**
+ * Kafka集群Connector概览
+ */
+public interface ClusterConnectorsManager {
+ PaginationResult getClusterConnectorsOverview(Long clusterPhyId, ClusterConnectorsOverviewDTO dto);
+
+ ConnectStateVO getClusterConnectorsState(Long clusterPhyId);
+}
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/MultiClusterPhyManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/MultiClusterPhyManager.java
index 0bd2f6e4..2d57d719 100644
--- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/MultiClusterPhyManager.java
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/MultiClusterPhyManager.java
@@ -4,8 +4,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysHe
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysState;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.MultiClusterDashboardDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO;
+import java.util.List;
+
/**
* 多集群总体状态
*/
@@ -24,4 +28,6 @@ public interface MultiClusterPhyManager {
* @return
*/
PaginationResult getClusterPhysDashboard(MultiClusterDashboardDTO dto);
+
+ Result> getClusterPhysBasic();
}
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java
index 6b180126..ab5d6a6d 100644
--- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterBrokersManagerImpl.java
@@ -6,6 +6,8 @@ import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterBrokersManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterBrokersOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
@@ -16,6 +18,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBroker
import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
+import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterRunStateEnum;
+import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
@@ -24,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
+import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -83,9 +88,13 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
Map jmxConnectedMap = new HashMap<>();
brokerList.forEach(elem -> jmxConnectedMap.put(elem.getBrokerId(), kafkaJMXClient.getClientWithCheck(clusterPhyId, elem.getBrokerId()) != null));
+
+ ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
+
// 格式转换
return PaginationResult.buildSuc(
this.convert2ClusterBrokersOverviewVOList(
+ clusterPhy,
paginationResult.getData().getBizData(),
brokerList,
metricsResult.getData(),
@@ -131,7 +140,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
clusterBrokersStateVO.setKafkaControllerAlive(true);
}
- clusterBrokersStateVO.setConfigSimilar(brokerConfigService.countBrokerConfigDiffsFromDB(clusterPhyId, Arrays.asList("broker.id", "listeners", "name", "value")) <= 0);
+ clusterBrokersStateVO.setConfigSimilar(brokerConfigService.countBrokerConfigDiffsFromDB(clusterPhyId, KafkaConstant.CONFIG_SIMILAR_IGNORED_CONFIG_KEY_LIST) <= 0
+ );
return clusterBrokersStateVO;
}
@@ -169,7 +179,8 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
);
}
- private List convert2ClusterBrokersOverviewVOList(List pagedBrokerIdList,
+ private List convert2ClusterBrokersOverviewVOList(ClusterPhy clusterPhy,
+ List pagedBrokerIdList,
List brokerList,
List metricsList,
Topic groupTopic,
@@ -185,9 +196,15 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
Broker broker = brokerMap.get(brokerId);
BrokerMetrics brokerMetrics = metricsMap.get(brokerId);
Boolean jmxConnected = jmxConnectedMap.get(brokerId);
-
voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController, jmxConnected));
}
+
+ //补充非zk模式的JMXPort信息
+ if (!clusterPhy.getRunState().equals(ClusterRunStateEnum.RUN_ZK.getRunState())) {
+ JmxConfig jmxConfig = ConvertUtil.str2ObjByJson(clusterPhy.getJmxProperties(), JmxConfig.class);
+ voList.forEach(elem -> elem.setJmxPort(jmxConfig.getJmxPort() == null ? -1 : jmxConfig.getJmxPort()));
+ }
+
return voList;
}
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterConnectorsManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterConnectorsManagerImpl.java
new file mode 100644
index 00000000..e982c588
--- /dev/null
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterConnectorsManagerImpl.java
@@ -0,0 +1,152 @@
+package com.xiaojukeji.know.streaming.km.biz.cluster.impl;
+
+import com.didiglobal.logi.log.ILog;
+import com.didiglobal.logi.log.LogFactory;
+import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterConnectorsManager;
+import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterConnectorsOverviewDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.connect.MetricsConnectorsDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectWorker;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
+import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connect.ConnectStateVO;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ClusterConnectorOverviewVO;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
+import com.xiaojukeji.know.streaming.km.common.converter.ConnectConverter;
+import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
+import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
+import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
+import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
+import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorMetricService;
+import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
+import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
+import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+@Service
+public class ClusterConnectorsManagerImpl implements ClusterConnectorsManager {
+ private static final ILog LOGGER = LogFactory.getLog(ClusterConnectorsManagerImpl.class);
+
+ @Autowired
+ private ConnectorService connectorService;
+
+ @Autowired
+ private ConnectClusterService connectClusterService;
+
+ @Autowired
+ private ConnectorMetricService connectorMetricService;
+
+ @Autowired
+ private WorkerService workerService;
+
+ @Autowired
+ private WorkerConnectorService workerConnectorService;
+
+ @Override
+ public PaginationResult getClusterConnectorsOverview(Long clusterPhyId, ClusterConnectorsOverviewDTO dto) {
+ List clusterList = connectClusterService.listByKafkaCluster(clusterPhyId);
+
+ List poList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId);
+
+ // 查询实时指标
+ Result> latestMetricsResult = connectorMetricService.getLatestMetricsFromES(
+ clusterPhyId,
+ poList.stream().map(elem -> new ClusterConnectorDTO(elem.getConnectClusterId(), elem.getConnectorName())).collect(Collectors.toList()),
+ dto.getLatestMetricNames()
+ );
+
+ if (latestMetricsResult.failed()) {
+ LOGGER.error("method=getClusterConnectorsOverview||clusterPhyId={}||result={}||errMsg=get latest metric failed", clusterPhyId, latestMetricsResult);
+ return PaginationResult.buildFailure(latestMetricsResult, dto);
+ }
+
+ // 转换成vo
+ List voList = ConnectConverter.convert2ClusterConnectorOverviewVOList(clusterList, poList,latestMetricsResult.getData());
+
+ // 请求分页信息
+ PaginationResult voPaginationResult = this.pagingConnectorInLocal(voList, dto);
+ if (voPaginationResult.failed()) {
+ LOGGER.error("method=getClusterConnectorsOverview||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult);
+
+ return PaginationResult.buildFailure(voPaginationResult, dto);
+ }
+
+ // 查询历史指标
+ Result> lineMetricsResult = connectorMetricService.listConnectClusterMetricsFromES(
+ clusterPhyId,
+ this.buildMetricsConnectorsDTO(
+ voPaginationResult.getData().getBizData().stream().map(elem -> new ClusterConnectorDTO(elem.getConnectClusterId(), elem.getConnectorName())).collect(Collectors.toList()),
+ dto.getMetricLines()
+ )
+ );
+
+
+ return PaginationResult.buildSuc(
+ ConnectConverter.supplyData2ClusterConnectorOverviewVOList(
+ voPaginationResult.getData().getBizData(),
+ lineMetricsResult.getData()
+ ),
+ voPaginationResult
+ );
+ }
+
+ @Override
+ public ConnectStateVO getClusterConnectorsState(Long clusterPhyId) {
+ //获取Connect集群Id列表
+ List connectClusterList = connectClusterService.listByKafkaCluster(clusterPhyId);
+ List connectorPOList = connectorService.listByKafkaClusterIdFromDB(clusterPhyId);
+ List workerConnectorList = workerConnectorService.listByKafkaClusterIdFromDB(clusterPhyId);
+ List connectWorkerList = workerService.listByKafkaClusterIdFromDB(clusterPhyId);
+
+ return convert2ConnectStateVO(connectClusterList, connectorPOList, workerConnectorList, connectWorkerList);
+ }
+
+ /**************************************************** private method ****************************************************/
+
+ private MetricsConnectorsDTO buildMetricsConnectorsDTO(List connectorDTOList, MetricDTO metricDTO) {
+ MetricsConnectorsDTO dto = ConvertUtil.obj2Obj(metricDTO, MetricsConnectorsDTO.class);
+ dto.setConnectorNameList(connectorDTOList == null? new ArrayList<>(): connectorDTOList);
+
+ return dto;
+ }
+
+ private ConnectStateVO convert2ConnectStateVO(List connectClusterList, List connectorPOList, List workerConnectorList, List connectWorkerList) {
+ ConnectStateVO connectStateVO = new ConnectStateVO();
+ connectStateVO.setConnectClusterCount(connectClusterList.size());
+ connectStateVO.setTotalConnectorCount(connectorPOList.size());
+ connectStateVO.setAliveConnectorCount(connectorPOList.stream().filter(elem -> elem.getState().equals(AbstractStatus.State.RUNNING.name())).collect(Collectors.toList()).size());
+ connectStateVO.setWorkerCount(connectWorkerList.size());
+ connectStateVO.setTotalTaskCount(workerConnectorList.size());
+ connectStateVO.setAliveTaskCount(workerConnectorList.stream().filter(elem -> elem.getState().equals(AbstractStatus.State.RUNNING.name())).collect(Collectors.toList()).size());
+ return connectStateVO;
+ }
+
+ private PaginationResult pagingConnectorInLocal(List connectorVOList, ClusterConnectorsOverviewDTO dto) {
+ //模糊匹配
+ connectorVOList = PaginationUtil.pageByFuzzyFilter(connectorVOList, dto.getSearchKeywords(), Arrays.asList("connectorName"));
+
+ //排序
+ if (!dto.getLatestMetricNames().isEmpty()) {
+ PaginationMetricsUtil.sortMetrics(connectorVOList, "latestMetrics", dto.getSortMetricNameList(), "connectorName", dto.getSortType());
+ } else {
+ PaginationUtil.pageBySort(connectorVOList, dto.getSortField(), dto.getSortType(), "connectorName", dto.getSortType());
+ }
+
+ //分页
+ return PaginationUtil.pageBySubData(connectorVOList, dto);
+ }
+
+}
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java
index c68f9b9a..3a2b11ef 100644
--- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java
@@ -14,10 +14,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterPhyTop
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
+import com.xiaojukeji.know.streaming.km.common.enums.ha.HaResTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
+import com.xiaojukeji.know.streaming.km.core.service.ha.HaActiveStandbyRelationService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import org.springframework.beans.factory.annotation.Autowired;
@@ -38,16 +40,22 @@ public class ClusterTopicsManagerImpl implements ClusterTopicsManager {
@Autowired
private TopicMetricService topicMetricService;
+ @Autowired
+ private HaActiveStandbyRelationService haActiveStandbyRelationService;
+
@Override
public PaginationResult getClusterPhyTopicsOverview(Long clusterPhyId, ClusterTopicsOverviewDTO dto) {
// 获取集群所有的Topic信息
List topicList = topicService.listTopicsFromDB(clusterPhyId);
// 获取集群所有Topic的指标
- Map metricsMap = topicMetricService.getLatestMetricsFromCacheFirst(clusterPhyId);
+ Map metricsMap = topicMetricService.getLatestMetricsFromCache(clusterPhyId);
+
+ // 获取HA信息
+ Set haTopicNameSet = haActiveStandbyRelationService.listByClusterAndType(clusterPhyId, HaResTypeEnum.MIRROR_TOPIC).stream().map(elem -> elem.getResName()).collect(Collectors.toSet());
// 转换成vo
- List voList = TopicVOConverter.convert2ClusterPhyTopicsOverviewVOList(topicList, metricsMap);
+ List voList = TopicVOConverter.convert2ClusterPhyTopicsOverviewVOList(topicList, metricsMap, haTopicNameSet);
// 请求分页信息
PaginationResult voPaginationResult = this.pagingTopicInLocal(voList, dto);
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java
index 7783b40b..aca30269 100644
--- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterZookeepersManagerImpl.java
@@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.zookeeper.ZKRoleEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
-import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ZookeeperMetricVersionItems;
+import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ZookeeperMetricVersionItems;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZnodeService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperMetricService;
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
@@ -94,7 +94,7 @@ public class ClusterZookeepersManagerImpl implements ClusterZookeepersManager {
);
if (metricsResult.failed()) {
LOGGER.error(
- "class=ClusterZookeepersManagerImpl||method=getClusterPhyZookeepersState||clusterPhyId={}||errMsg={}",
+ "method=getClusterPhyZookeepersState||clusterPhyId={}||errMsg={}",
clusterPhyId, metricsResult.getMessage()
);
return Result.buildSuc(vo);
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java
index 68dd4ac7..7e379c99 100644
--- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/MultiClusterPhyManagerImpl.java
@@ -9,13 +9,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysHe
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysState;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.MultiClusterDashboardDTO;
-import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
-import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.converter.ClusterVOConverter;
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
@@ -24,15 +23,11 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
-import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
-import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ClusterMetricVersionItems;
+import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.stream.Collectors;
@Service
@@ -45,38 +40,26 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
@Autowired
private ClusterMetricService clusterMetricService;
- @Autowired
- private KafkaControllerService kafkaControllerService;
-
@Override
public ClusterPhysState getClusterPhysState() {
List clusterPhyList = clusterPhyService.listAllClusters();
+ ClusterPhysState physState = new ClusterPhysState(0, 0, 0, clusterPhyList.size());
- Map controllerMap = kafkaControllerService.getKafkaControllersFromDB(
- clusterPhyList.stream().map(elem -> elem.getId()).collect(Collectors.toList()),
- false
- );
-
- // TODO 后续产品上,看是否需要增加一个未知的状态,否则新接入的集群,因为新接入的集群,数据存在延迟
- ClusterPhysState physState = new ClusterPhysState(0, 0, clusterPhyList.size());
- for (ClusterPhy clusterPhy: clusterPhyList) {
- KafkaController kafkaController = controllerMap.get(clusterPhy.getId());
-
- if (kafkaController != null && !kafkaController.alive()) {
- // 存在明确的信息表示controller挂了
- physState.setDownCount(physState.getDownCount() + 1);
- } else if ((System.currentTimeMillis() - clusterPhy.getCreateTime().getTime() >= 5 * 60 * 1000) && kafkaController == null) {
- // 集群接入时间是在近5分钟内,同时kafkaController信息不存在,则设置为down
+ for (ClusterPhy clusterPhy : clusterPhyList) {
+ ClusterMetrics metrics = clusterMetricService.getLatestMetricsFromCache(clusterPhy.getId());
+ Float state = metrics.getMetric(ClusterMetricVersionItems.CLUSTER_METRIC_HEALTH_STATE);
+ if (state == null) {
+ physState.setUnknownCount(physState.getUnknownCount() + 1);
+ } else if (state.intValue() == HealthStateEnum.DEAD.getDimension()) {
physState.setDownCount(physState.getDownCount() + 1);
} else {
- // 其他情况都设置为alive
physState.setLiveCount(physState.getLiveCount() + 1);
}
}
-
return physState;
}
+
@Override
public ClusterPhysHealthState getClusterPhysHealthState() {
List clusterPhyList = clusterPhyService.listAllClusters();
@@ -111,24 +94,6 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
// 转为vo格式,方便后续进行分页筛选等
List voList = ConvertUtil.list2List(clusterPhyList, ClusterPhyDashboardVO.class);
- // TODO 后续产品上,看是否需要增加一个未知的状态,否则新接入的集群,因为新接入的集群,数据存在延迟
- // 获取集群controller信息并补充到vo中,
- Map controllerMap = kafkaControllerService.getKafkaControllersFromDB(clusterPhyList.stream().map(elem -> elem.getId()).collect(Collectors.toList()), false);
- for (ClusterPhyDashboardVO vo: voList) {
- KafkaController kafkaController = controllerMap.get(vo.getId());
-
- if (kafkaController != null && !kafkaController.alive()) {
- // 存在明确的信息表示controller挂了
- vo.setAlive(Constant.DOWN);
- } else if ((System.currentTimeMillis() - vo.getCreateTime().getTime() >= 5 * 60L * 1000L) && kafkaController == null) {
- // 集群接入时间是在近5分钟内,同时kafkaController信息不存在,则设置为down
- vo.setAlive(Constant.DOWN);
- } else {
- // 其他情况都设置为alive
- vo.setAlive(Constant.ALIVE);
- }
- }
-
// 本地分页过滤
voList = this.getAndPagingDataInLocal(voList, dto);
@@ -153,6 +118,15 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
);
}
+ @Override
+ public Result> getClusterPhysBasic() {
+ // 获取集群
+ List clusterPhyList = clusterPhyService.listAllClusters();
+
+ // 转为vo格式,方便后续进行分页筛选等
+ return Result.buildSuc(ConvertUtil.list2List(clusterPhyList, ClusterPhyBaseVO.class));
+ }
+
/**************************************************** private method ****************************************************/
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java
new file mode 100644
index 00000000..3752504d
--- /dev/null
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/ConnectorManager.java
@@ -0,0 +1,16 @@
+package com.xiaojukeji.know.streaming.km.biz.connect.connector;
+
+import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
+
+import java.util.Properties;
+
+public interface ConnectorManager {
+ Result updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator);
+
+ Result createConnector(ConnectorCreateDTO dto, String operator);
+ Result createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator);
+
+ Result getConnectorStateVO(Long connectClusterId, String connectorName);
+}
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/WorkerConnectorManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/WorkerConnectorManager.java
new file mode 100644
index 00000000..eaf82423
--- /dev/null
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/WorkerConnectorManager.java
@@ -0,0 +1,16 @@
+package com.xiaojukeji.know.streaming.km.biz.connect.connector;
+
+
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO;
+
+import java.util.List;
+
+/**
+ * @author wyb
+ * @date 2022/11/14
+ */
+public interface WorkerConnectorManager {
+ Result> getTaskOverview(Long connectClusterId, String connectorName);
+
+}
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java
new file mode 100644
index 00000000..5800b26f
--- /dev/null
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java
@@ -0,0 +1,115 @@
+package com.xiaojukeji.know.streaming.km.biz.connect.connector.impl;
+
+import com.xiaojukeji.know.streaming.km.biz.connect.connector.ConnectorManager;
+import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.config.ConnectConfigInfos;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
+import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
+import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
+import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
+import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
+import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+@Service
+public class ConnectorManagerImpl implements ConnectorManager {
+ @Autowired
+ private PluginService pluginService;
+
+ @Autowired
+ private ConnectorService connectorService;
+
+ @Autowired
+ private WorkerConnectorService workerConnectorService;
+
+ @Override
+ public Result updateConnectorConfig(Long connectClusterId, String connectorName, Properties configs, String operator) {
+ Result infosResult = pluginService.validateConfig(connectClusterId, configs);
+ if (infosResult.failed()) {
+ return Result.buildFromIgnoreData(infosResult);
+ }
+
+ if (infosResult.getData().getErrorCount() > 0) {
+ return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Connector参数错误");
+ }
+
+ return connectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
+ }
+
+ @Override
+ public Result createConnector(ConnectorCreateDTO dto, String operator) {
+ dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
+
+ Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
+ if (createResult.failed()) {
+ return Result.buildFromIgnoreData(createResult);
+ }
+
+ Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
+ if (ksConnectorResult.failed()) {
+ return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
+ }
+
+ connectorService.addNewToDB(ksConnectorResult.getData());
+ return Result.buildSuc();
+ }
+
+ @Override
+ public Result createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) {
+ dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
+
+ Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
+ if (createResult.failed()) {
+ return Result.buildFromIgnoreData(createResult);
+ }
+
+ Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
+ if (ksConnectorResult.failed()) {
+ return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
+ }
+
+ KSConnector connector = ksConnectorResult.getData();
+ connector.setCheckpointConnectorName(checkpointName);
+ connector.setHeartbeatConnectorName(heartbeatName);
+
+ connectorService.addNewToDB(connector);
+ return Result.buildSuc();
+ }
+
+
+ @Override
+ public Result getConnectorStateVO(Long connectClusterId, String connectorName) {
+ ConnectorPO connectorPO = connectorService.getConnectorFromDB(connectClusterId, connectorName);
+
+ if (connectorPO == null) {
+ return Result.buildFailure(ResultStatus.NOT_EXIST);
+ }
+
+ List workerConnectorList = workerConnectorService.listFromDB(connectClusterId).stream().filter(elem -> elem.getConnectorName().equals(connectorName)).collect(Collectors.toList());
+
+ return Result.buildSuc(convert2ConnectorOverviewVO(connectorPO, workerConnectorList));
+ }
+
+ private ConnectorStateVO convert2ConnectorOverviewVO(ConnectorPO connectorPO, List workerConnectorList) {
+ ConnectorStateVO connectorStateVO = new ConnectorStateVO();
+ connectorStateVO.setConnectClusterId(connectorPO.getConnectClusterId());
+ connectorStateVO.setName(connectorPO.getConnectorName());
+ connectorStateVO.setType(connectorPO.getConnectorType());
+ connectorStateVO.setState(connectorPO.getState());
+ connectorStateVO.setTotalTaskCount(workerConnectorList.size());
+ connectorStateVO.setAliveTaskCount(workerConnectorList.stream().filter(elem -> elem.getState().equals(AbstractStatus.State.RUNNING.name())).collect(Collectors.toList()).size());
+ connectorStateVO.setTotalWorkerCount(workerConnectorList.stream().map(elem -> elem.getWorkerId()).collect(Collectors.toSet()).size());
+ return connectorStateVO;
+ }
+}
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/WorkerConnectorManageImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/WorkerConnectorManageImpl.java
new file mode 100644
index 00000000..4d0cd317
--- /dev/null
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/WorkerConnectorManageImpl.java
@@ -0,0 +1,37 @@
+package com.xiaojukeji.know.streaming.km.biz.connect.connector.impl;
+
+import com.didiglobal.logi.log.ILog;
+import com.didiglobal.logi.log.LogFactory;
+import com.xiaojukeji.know.streaming.km.biz.connect.connector.WorkerConnectorManager;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.WorkerConnector;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO;
+import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
+import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
+import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * @author wyb
+ * @date 2022/11/14
+ */
+@Service
+public class WorkerConnectorManageImpl implements WorkerConnectorManager {
+
+ private static final ILog LOGGER = LogFactory.getLog(WorkerConnectorManageImpl.class);
+
+ @Autowired
+ private WorkerConnectorService workerConnectorService;
+
+ @Override
+ public Result> getTaskOverview(Long connectClusterId, String connectorName) {
+ ConnectCluster connectCluster = LoadedConnectClusterCache.getByPhyId(connectClusterId);
+ List workerConnectorList = workerConnectorService.getWorkerConnectorListFromCluster(connectCluster, connectorName);
+
+ return Result.buildSuc(ConvertUtil.list2List(workerConnectorList, KCTaskOverviewVO.class));
+ }
+}
diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/MirrorMakerManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/MirrorMakerManager.java
new file mode 100644
index 00000000..6851ca5e
--- /dev/null
+++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/MirrorMakerManager.java
@@ -0,0 +1,43 @@
+package com.xiaojukeji.know.streaming.km.biz.connect.mm2;
+
+import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterMirrorMakersOverviewDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2.MirrorMakerCreateDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.ClusterMirrorMakerOverviewVO;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerBaseStateVO;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2.MirrorMakerStateVO;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.plugin.ConnectConfigInfosVO;
+import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.task.KCTaskOverviewVO;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author wyb
+ * @date 2022/12/26
+ */
+public interface MirrorMakerManager {
+ Result createMirrorMaker(MirrorMakerCreateDTO dto, String operator);
+
+ Result deleteMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator);
+
+ Result modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String operator);
+
+ Result restartMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator);
+ Result stopMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator);
+ Result resumeMirrorMaker(Long connectClusterId, String sourceConnectorName, String operator);
+
+ Result getMirrorMakerStateVO(Long clusterPhyId);
+
+ PaginationResult getClusterMirrorMakersOverview(Long clusterPhyId, ClusterMirrorMakersOverviewDTO dto);
+
+
+ Result getMirrorMakerState(Long connectId, String connectName);
+
+ Result