mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-25 04:32:12 +08:00
Compare commits
72 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
89405fe003 | ||
|
|
b9ea3865a5 | ||
|
|
b5bd643814 | ||
|
|
5bc6eb6774 | ||
|
|
3ba81e9aaa | ||
|
|
329a9b59c1 | ||
|
|
22c26e24b1 | ||
|
|
396045177c | ||
|
|
e311d3767c | ||
|
|
24d7b80244 | ||
|
|
61f99e4d2e | ||
|
|
d5348bcf49 | ||
|
|
5d31d66365 | ||
|
|
29778a0154 | ||
|
|
165c0a5866 | ||
|
|
588323961e | ||
|
|
fd1c0b71c5 | ||
|
|
54fbdcadf9 | ||
|
|
69a30d0cf0 | ||
|
|
b8f9b44f38 | ||
|
|
cbf17d4eb5 | ||
|
|
327e025262 | ||
|
|
6b1e944bba | ||
|
|
668ed4d61b | ||
|
|
312c0584ed | ||
|
|
110d3acb58 | ||
|
|
ddbc60283b | ||
|
|
471bcecfd6 | ||
|
|
0245791b13 | ||
|
|
4794396ce8 | ||
|
|
c7088779d6 | ||
|
|
672905da12 | ||
|
|
47172b13be | ||
|
|
3668a10af6 | ||
|
|
a4e294c03f | ||
|
|
3fd6f4003f | ||
|
|
3eaf5cd530 | ||
|
|
c344fd8ca4 | ||
|
|
09639ca294 | ||
|
|
a81b6dca83 | ||
|
|
b74aefb08f | ||
|
|
757f90aa7a | ||
|
|
7e1b3c552b | ||
|
|
69736a63b6 | ||
|
|
fb4a9f9056 | ||
|
|
387d89d3af | ||
|
|
65d9ca9d39 | ||
|
|
8c842af4ba | ||
|
|
4faf9262c9 | ||
|
|
be7724c67d | ||
|
|
48d26347f7 | ||
|
|
bdb01ec8b5 | ||
|
|
9047815799 | ||
|
|
05bd94a2cc | ||
|
|
c9f7da84d0 | ||
|
|
bcc124e86a | ||
|
|
48d2733403 | ||
|
|
31fc6e4e56 | ||
|
|
fcdeef0146 | ||
|
|
1cd524c0cc | ||
|
|
0f746917a7 | ||
|
|
a2228d0169 | ||
|
|
e8a679d34b | ||
|
|
1912a42091 | ||
|
|
ca81f96635 | ||
|
|
eb3b8c4b31 | ||
|
|
6740d6d60b | ||
|
|
c46c35b248 | ||
|
|
0b2dcec4bc | ||
|
|
7256db8c4e | ||
|
|
25c3aeaa5f | ||
|
|
736d5a00b7 |
20
README.md
20
README.md
@@ -1,13 +1,13 @@
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
**一站式`Apache Kafka`集群指标监控与运维管控平台**
|
**一站式`Apache Kafka`集群指标监控与运维管控平台**
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
阅读本README文档,您可以了解到滴滴Logi-KafkaManager的用户群体、产品定位等信息,并通过体验地址,快速体验Kafka集群指标监控与运维管控的全流程。<br>若滴滴Logi-KafkaManager已在贵司的生产环境进行使用,并想要获得官方更好地支持和指导,可以通过[`OCE认证`](http://obsuite.didiyun.com/open/openAuth),加入官方交流平台。
|
阅读本README文档,您可以了解到滴滴Logi-KafkaManager的用户群体、产品定位等信息,并通过体验地址,快速体验Kafka集群指标监控与运维管控的全流程。
|
||||||
|
|
||||||
|
|
||||||
## 1 产品简介
|
## 1 产品简介
|
||||||
@@ -73,15 +73,17 @@
|
|||||||
|
|
||||||
|
|
||||||

|

|
||||||
微信加群:关注公众号 Obsuite(官方公众号) 回复 "Logi加群"
|
微信加群:添加mike_zhangliang的微信号备注Logi加群或关注公众号 云原生可观测性 回复 "Logi加群"
|
||||||
|
|
||||||

|
## 4 知识星球
|
||||||
钉钉群ID:32821440
|
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
## 4 OCE认证
|
✅知识星球首个【Kafka中文社区】,内测期免费加入~https://z.didi.cn/5gSF9
|
||||||
OCE是一个认证机制和交流平台,为滴滴Logi-KafkaManager生产用户量身打造,我们会为OCE企业提供更好的技术支持,比如专属的技术沙龙、企业一对一的交流机会、专属的答疑群等,如果贵司Logi-KafkaManager上了生产,[快来加入吧](http://obsuite.didiyun.com/open/openAuth)
|
有问必答~!
|
||||||
|
互动有礼~!
|
||||||
|
1600+群友一起共建国内最专业的【Kafka中文社区】
|
||||||
|
PS:提问请尽量把问题一次性描述清楚,并告知环境信息情况哦~!如使用版本、操作步骤、报错/警告信息等,方便嘉宾们快速解答~
|
||||||
|
|
||||||
## 5 项目成员
|
## 5 项目成员
|
||||||
|
|
||||||
@@ -97,4 +99,4 @@ OCE是一个认证机制和交流平台,为滴滴Logi-KafkaManager生产用户
|
|||||||
|
|
||||||
## 6 协议
|
## 6 协议
|
||||||
|
|
||||||
`kafka-manager`基于`Apache-2.0`协议进行分发和使用,更多信息参见[协议文件](./LICENSE)
|
`LogiKM`基于`Apache-2.0`协议进行分发和使用,更多信息参见[协议文件](./LICENSE)
|
||||||
|
|||||||
@@ -1,43 +1,28 @@
|
|||||||
FROM openjdk:16-jdk-alpine3.13
|
FROM openjdk:16-jdk-alpine3.13
|
||||||
|
|
||||||
LABEL author="yangvipguang"
|
LABEL author="fengxsong"
|
||||||
|
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && apk add --no-cache tini
|
||||||
ENV VERSION 2.3.1
|
|
||||||
|
|
||||||
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
|
|
||||||
RUN apk add --no-cache --virtual .build-deps \
|
|
||||||
font-adobe-100dpi \
|
|
||||||
ttf-dejavu \
|
|
||||||
fontconfig \
|
|
||||||
curl \
|
|
||||||
apr \
|
|
||||||
apr-util \
|
|
||||||
apr-dev \
|
|
||||||
tomcat-native \
|
|
||||||
&& apk del .build-deps
|
|
||||||
|
|
||||||
RUN apk add --no-cache tini
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
ENV VERSION 2.4.2
|
||||||
|
WORKDIR /opt/
|
||||||
|
|
||||||
ENV AGENT_HOME /opt/agent/
|
ENV AGENT_HOME /opt/agent/
|
||||||
|
|
||||||
WORKDIR /tmp
|
|
||||||
|
|
||||||
COPY $JAR_PATH/kafka-manager.jar app.jar
|
|
||||||
# COPY application.yml application.yml ##默认使用helm 挂载,防止敏感配置泄露
|
|
||||||
|
|
||||||
COPY docker-depends/config.yaml $AGENT_HOME
|
COPY docker-depends/config.yaml $AGENT_HOME
|
||||||
COPY docker-depends/jmx_prometheus_javaagent-0.15.0.jar $AGENT_HOME
|
COPY docker-depends/jmx_prometheus_javaagent-0.15.0.jar $AGENT_HOME
|
||||||
|
|
||||||
ENV JAVA_AGENT="-javaagent:$AGENT_HOME/jmx_prometheus_javaagent-0.15.0.jar=9999:$AGENT_HOME/config.yaml"
|
ENV JAVA_AGENT="-javaagent:$AGENT_HOME/jmx_prometheus_javaagent-0.15.0.jar=9999:$AGENT_HOME/config.yaml"
|
||||||
ENV JAVA_HEAP_OPTS="-Xms1024M -Xmx1024M -Xmn100M "
|
ENV JAVA_HEAP_OPTS="-Xms1024M -Xmx1024M -Xmn100M "
|
||||||
ENV JAVA_OPTS="-verbose:gc \
|
ENV JAVA_OPTS="-verbose:gc \
|
||||||
-XX:MaxMetaspaceSize=256M -XX:+DisableExplicitGC -XX:+UseStringDeduplication \
|
-XX:MaxMetaspaceSize=256M -XX:+DisableExplicitGC -XX:+UseStringDeduplication \
|
||||||
-XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:-UseContainerSupport"
|
-XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:-UseContainerSupport"
|
||||||
|
|
||||||
|
RUN wget https://github.com/didi/Logi-KafkaManager/releases/download/v${VERSION}/kafka-manager-${VERSION}.tar.gz && \
|
||||||
|
tar xvf kafka-manager-${VERSION}.tar.gz && \
|
||||||
|
mv kafka-manager-${VERSION}/kafka-manager.jar /opt/app.jar && \
|
||||||
|
rm -rf kafka-manager-${VERSION}*
|
||||||
|
|
||||||
EXPOSE 8080 9999
|
EXPOSE 8080 9999
|
||||||
|
|
||||||
ENTRYPOINT ["tini", "--"]
|
ENTRYPOINT ["tini", "--"]
|
||||||
|
|
||||||
CMD ["sh","-c","java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS app.jar --spring.config.location=application.yml"]
|
CMD [ "sh", "-c", "java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS app.jar --spring.config.location=application.yml"]
|
||||||
6
container/helm/Chart.lock
Normal file
6
container/helm/Chart.lock
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
dependencies:
|
||||||
|
- name: mysql
|
||||||
|
repository: https://charts.bitnami.com/bitnami
|
||||||
|
version: 8.6.3
|
||||||
|
digest: sha256:d250c463c1d78ba30a24a338a06a551503c7a736621d974fe4999d2db7f6143e
|
||||||
|
generated: "2021-06-24T11:34:54.625217+08:00"
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
apiVersion: v2
|
apiVersion: v2
|
||||||
name: didi-km
|
name: didi-km
|
||||||
description: A Helm chart for Kubernetes
|
description: Logi-KafkaManager
|
||||||
|
|
||||||
# A chart can be either an 'application' or a 'library' chart.
|
# A chart can be either an 'application' or a 'library' chart.
|
||||||
#
|
#
|
||||||
@@ -21,4 +21,9 @@ version: 0.1.0
|
|||||||
# incremented each time you make changes to the application. Versions are not expected to
|
# incremented each time you make changes to the application. Versions are not expected to
|
||||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||||
# It is recommended to use it with quotes.
|
# It is recommended to use it with quotes.
|
||||||
appVersion: "1.16.0"
|
appVersion: "2.4.2"
|
||||||
|
dependencies:
|
||||||
|
- condition: mysql.enabled
|
||||||
|
name: mysql
|
||||||
|
repository: https://charts.bitnami.com/bitnami
|
||||||
|
version: 8.x.x
|
||||||
|
|||||||
BIN
container/helm/charts/mysql-8.6.3.tgz
Normal file
BIN
container/helm/charts/mysql-8.6.3.tgz
Normal file
Binary file not shown.
@@ -1,7 +1,17 @@
|
|||||||
|
{{- define "datasource.mysql" -}}
|
||||||
|
{{- if .Values.mysql.enabled }}
|
||||||
|
{{- printf "%s-mysql" (include "didi-km.fullname" .) -}}
|
||||||
|
{{- else -}}
|
||||||
|
{{- printf "%s" .Values.externalDatabase.host -}}
|
||||||
|
{{- end -}}
|
||||||
|
{{- end -}}
|
||||||
|
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: ConfigMap
|
kind: ConfigMap
|
||||||
metadata:
|
metadata:
|
||||||
name: km-cm
|
name: {{ include "didi-km.fullname" . }}-configs
|
||||||
|
labels:
|
||||||
|
{{- include "didi-km.labels" . | nindent 4 }}
|
||||||
data:
|
data:
|
||||||
application.yml: |
|
application.yml: |
|
||||||
server:
|
server:
|
||||||
@@ -17,9 +27,9 @@ data:
|
|||||||
name: kafkamanager
|
name: kafkamanager
|
||||||
datasource:
|
datasource:
|
||||||
kafka-manager:
|
kafka-manager:
|
||||||
jdbc-url: jdbc:mysql://xxxxx:3306/kafka-manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
|
jdbc-url: jdbc:mysql://{{ include "datasource.mysql" . }}:3306/{{ .Values.mysql.auth.database }}?characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
|
||||||
username: admin
|
username: {{ .Values.mysql.auth.username }}
|
||||||
password: admin
|
password: {{ .Values.mysql.auth.password }}
|
||||||
driver-class-name: com.mysql.jdbc.Driver
|
driver-class-name: com.mysql.jdbc.Driver
|
||||||
main:
|
main:
|
||||||
allow-bean-definition-overriding: true
|
allow-bean-definition-overriding: true
|
||||||
@@ -54,7 +64,10 @@ data:
|
|||||||
sync-topic-enabled: false # 未落盘的Topic定期同步到DB中
|
sync-topic-enabled: false # 未落盘的Topic定期同步到DB中
|
||||||
|
|
||||||
account:
|
account:
|
||||||
|
# ldap settings
|
||||||
ldap:
|
ldap:
|
||||||
|
enabled: false
|
||||||
|
authUserRegistration: false
|
||||||
|
|
||||||
kcm:
|
kcm:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
|||||||
@@ -42,6 +42,10 @@ spec:
|
|||||||
protocol: TCP
|
protocol: TCP
|
||||||
resources:
|
resources:
|
||||||
{{- toYaml .Values.resources | nindent 12 }}
|
{{- toYaml .Values.resources | nindent 12 }}
|
||||||
|
volumeMounts:
|
||||||
|
- name: configs
|
||||||
|
mountPath: /tmp/application.yml
|
||||||
|
subPath: application.yml
|
||||||
{{- with .Values.nodeSelector }}
|
{{- with .Values.nodeSelector }}
|
||||||
nodeSelector:
|
nodeSelector:
|
||||||
{{- toYaml . | nindent 8 }}
|
{{- toYaml . | nindent 8 }}
|
||||||
@@ -54,3 +58,7 @@ spec:
|
|||||||
tolerations:
|
tolerations:
|
||||||
{{- toYaml . | nindent 8 }}
|
{{- toYaml . | nindent 8 }}
|
||||||
{{- end }}
|
{{- end }}
|
||||||
|
volumes:
|
||||||
|
- name: configs
|
||||||
|
configMap:
|
||||||
|
name: {{ include "didi-km.fullname" . }}-configs
|
||||||
|
|||||||
@@ -5,13 +5,14 @@
|
|||||||
replicaCount: 1
|
replicaCount: 1
|
||||||
|
|
||||||
image:
|
image:
|
||||||
repository: docker.io/yangvipguang/km
|
repository: docker.io/fengxsong/logi-kafka-manager
|
||||||
pullPolicy: IfNotPresent
|
pullPolicy: IfNotPresent
|
||||||
# Overrides the image tag whose default is the chart appVersion.
|
# Overrides the image tag whose default is the chart appVersion.
|
||||||
tag: "v18"
|
tag: "v2.4.2"
|
||||||
|
|
||||||
imagePullSecrets: []
|
imagePullSecrets: []
|
||||||
nameOverride: ""
|
nameOverride: ""
|
||||||
|
# fullnameOverride must set same as release name
|
||||||
fullnameOverride: "km"
|
fullnameOverride: "km"
|
||||||
|
|
||||||
serviceAccount:
|
serviceAccount:
|
||||||
@@ -59,10 +60,10 @@ resources:
|
|||||||
# resources, such as Minikube. If you do want to specify resources, uncomment the following
|
# resources, such as Minikube. If you do want to specify resources, uncomment the following
|
||||||
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
|
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
|
||||||
limits:
|
limits:
|
||||||
cpu: 50m
|
cpu: 500m
|
||||||
memory: 2048Mi
|
memory: 2048Mi
|
||||||
requests:
|
requests:
|
||||||
cpu: 10m
|
cpu: 100m
|
||||||
memory: 200Mi
|
memory: 200Mi
|
||||||
|
|
||||||
autoscaling:
|
autoscaling:
|
||||||
@@ -77,3 +78,16 @@ nodeSelector: {}
|
|||||||
tolerations: []
|
tolerations: []
|
||||||
|
|
||||||
affinity: {}
|
affinity: {}
|
||||||
|
|
||||||
|
# more configurations are set with configmap in file template/configmap.yaml
|
||||||
|
externalDatabase:
|
||||||
|
host: ""
|
||||||
|
mysql:
|
||||||
|
# if enabled is set to false, then you should manually specified externalDatabase.host
|
||||||
|
enabled: true
|
||||||
|
architecture: standalone
|
||||||
|
auth:
|
||||||
|
rootPassword: "s3cretR00t"
|
||||||
|
database: "logi_kafka_manager"
|
||||||
|
username: "logi_kafka_manager"
|
||||||
|
password: "n0tp@55w0rd"
|
||||||
|
|||||||
28
distribution/conf/application.yml
Normal file
28
distribution/conf/application.yml
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
|
||||||
|
## kafka-manager的配置文件,该文件中的配置会覆盖默认配置
|
||||||
|
## 下面的配置信息基本就是jar中的 application.yml默认配置了;
|
||||||
|
## 可以只修改自己变更的配置,其他的删除就行了; 比如只配置一下mysql
|
||||||
|
|
||||||
|
|
||||||
|
server:
|
||||||
|
port: 8080
|
||||||
|
tomcat:
|
||||||
|
accept-count: 1000
|
||||||
|
max-connections: 10000
|
||||||
|
max-threads: 800
|
||||||
|
min-spare-threads: 100
|
||||||
|
|
||||||
|
spring:
|
||||||
|
application:
|
||||||
|
name: kafkamanager
|
||||||
|
profiles:
|
||||||
|
active: dev
|
||||||
|
datasource:
|
||||||
|
kafka-manager:
|
||||||
|
jdbc-url: jdbc:mysql://localhost:3306/logi_kafka_manager?characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
|
||||||
|
username: root
|
||||||
|
password: 123456
|
||||||
|
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||||
|
main:
|
||||||
|
allow-bean-definition-overriding: true
|
||||||
|
|
||||||
@@ -15,6 +15,8 @@ server:
|
|||||||
spring:
|
spring:
|
||||||
application:
|
application:
|
||||||
name: kafkamanager
|
name: kafkamanager
|
||||||
|
profiles:
|
||||||
|
active: dev
|
||||||
datasource:
|
datasource:
|
||||||
kafka-manager:
|
kafka-manager:
|
||||||
jdbc-url: jdbc:mysql://localhost:3306/logi_kafka_manager?characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
|
jdbc-url: jdbc:mysql://localhost:3306/logi_kafka_manager?characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
|
||||||
|
|||||||
@@ -4,7 +4,6 @@
|
|||||||
> 当您从一个很低的版本升级时候,应该依次执行中间有过变更的sql脚本
|
> 当您从一个很低的版本升级时候,应该依次执行中间有过变更的sql脚本
|
||||||
|
|
||||||
|
|
||||||

|
|
||||||
|
|
||||||
**一站式`Apache Kafka`集群指标监控与运维管控平台**
|
**一站式`Apache Kafka`集群指标监控与运维管控平台**
|
||||||
|
|
||||||
|
|||||||
Binary file not shown.
|
Before Width: | Height: | Size: 20 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 785 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 2.5 MiB |
53
docs/dev_guide/如何增加上报监控系统指标.md
Normal file
53
docs/dev_guide/如何增加上报监控系统指标.md
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
**一站式`Apache Kafka`集群指标监控与运维管控平台**
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
# 如何增加上报监控系统指标?
|
||||||
|
|
||||||
|
## 0、前言
|
||||||
|
|
||||||
|
LogiKM是 **一站式`Apache Kafka`集群指标监控与运维管控平台** ,当前会将消费Lag,Topic流量等指标上报到监控系统中,从而方便用户在监控系统中对这些指标配置监控告警规则,进而达到监控自身客户端是否正常的目的。
|
||||||
|
|
||||||
|
那么,如果我们想增加一个新的监控指标,应该如何做呢,比如我们想监控Broker的流量,监控Broker的存活信息,监控集群Controller个数等等。
|
||||||
|
|
||||||
|
在具体介绍之前,我们大家都知道,Kafka监控相关的信息,基本都存储于Broker、Jmx以及ZK中。当前LogiKM也已经具备从这三个地方获取数据的基本能力,因此基于LogiKM我们再获取其他指标,总体上还是非常方便的。
|
||||||
|
|
||||||
|
这里我们就以已经获取到的Topic流量信息为例,看LogiKM如何实现Topic指标的获取并上报的。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 1、确定指标位置
|
||||||
|
|
||||||
|
基于对Kafka的了解,我们知道Topic流量信息这个指标是存储于Jmx中的,因此我们需要从Jmx中获取。大家如果对于自己所需要获取的指标存储在何处不太清楚的,可以加入我们维护的Kafka中文社区(README中有二维码)中今天沟通交流。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2、指标获取
|
||||||
|
|
||||||
|
Topic流量指标的获取详细见图中说明。
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3、指标上报
|
||||||
|
|
||||||
|
上一步我们已经采集到Topic流量指标了,下一步就是将该指标上报到监控系统,这块只需要按照监控系统要求的格式,将数据上报即可。
|
||||||
|
|
||||||
|
LogiKM中有一个monitor模块,具体的如下图所示:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
|
||||||
|
## 4、补充说明
|
||||||
|
|
||||||
|
监控系统对接的相关内容见:
|
||||||
|
|
||||||
|
[监控系统集成](./monitor_system_integrate_with_self.md)
|
||||||
|
|
||||||
|
[监控系统集成例子——集成夜莺](./monitor_system_integrate_with_n9e.md)
|
||||||
@@ -19,9 +19,9 @@
|
|||||||
|
|
||||||
| 模块 |对比指标 |底层依赖 |开源版 |商业版 |备注 |
|
| 模块 |对比指标 |底层依赖 |开源版 |商业版 |备注 |
|
||||||
| --- | --- | --- | --- | --- | --- |
|
| --- | --- | --- | --- | --- | --- |
|
||||||
| 服务发现 | bootstrap地址变更对客户端无影响 | | | 是| |
|
| 服务发现 | bootstrap地址变更对客户端无影响 | Gateway | | 是| |
|
||||||
| 安全管控 | 身份鉴权(appID+password) | | | 是 | |
|
| 安全管控 | 身份鉴权(appID+password) | Gateway | | 是 | |
|
||||||
| | 权限鉴权(Topic+appID) | | | 是 | |
|
| | 权限鉴权(Topic+appID) | Gateway | | 是 | |
|
||||||
| 指标监控 | Topic实时流量、历史流量 | | 是 | 是 | |
|
| 指标监控 | Topic实时流量、历史流量 | | 是 | 是 | |
|
||||||
| | Broker实时耗时、历史耗时 | 引擎 | | 是 | |
|
| | Broker实时耗时、历史耗时 | 引擎 | | 是 | |
|
||||||
| | 分区落盘 | 引擎 | | 是 | |
|
| | 分区落盘 | 引擎 | | 是 | |
|
||||||
@@ -49,7 +49,7 @@
|
|||||||
|
|
||||||
**总结**
|
**总结**
|
||||||
|
|
||||||
Logi-KafkaManager的商业特性体现在在滴滴Kafka Gateway、滴滴Kafka引擎、内部沉淀出的资源治理专家经验、可定制化的健康分算法。
|
滴滴LogiKM的商业特性体现在滴滴Kafka Gateway、滴滴Kafka引擎、内部沉淀出的资源治理专家经验、可定制化的健康分算法。
|
||||||
从场景来看,滴滴Logi-KafkaManager的开源版本在kafka集群运维、的Topic管理、监控告警、资源治理等kafka核心场景都充分开源用户的使用需求并且有着出色的表现。而商业版相较于开源版在安全管控、流量管控、更丰富的指标监控、资源治理专家经验的具有明显提升,更加符合企业业务需求。
|
从场景来看,滴滴Logi-KafkaManager的开源版本在kafka集群运维、的Topic管理、监控告警、资源治理等kafka核心场景都充分开源用户的使用需求并且有着出色的表现。而商业版相较于开源版在安全管控、流量管控、更丰富的指标监控、资源治理专家经验的具有明显提升,更加符合企业业务需求。
|
||||||
除此之外,商业版还可根据企业实际需求对平台源码进行定制化改造,并提供运维保障,稳定性保障,运营保障等服务。
|
除此之外,商业版还可根据企业实际需求对平台源码进行定制化改造,并提供运维保障,稳定性保障,运营保障等服务。
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ public class TopicCreationConstant {
|
|||||||
|
|
||||||
public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms";
|
public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms";
|
||||||
|
|
||||||
|
public static final String TOPIC_RETENTION_BYTES_KEY_NAME = "retention.bytes";
|
||||||
|
|
||||||
public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L;
|
public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L;
|
||||||
|
|
||||||
public static Properties createNewProperties(Long retentionTime) {
|
public static Properties createNewProperties(Long retentionTime) {
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ public class MineTopicSummary {
|
|||||||
|
|
||||||
private Integer access;
|
private Integer access;
|
||||||
|
|
||||||
|
private String description;
|
||||||
|
|
||||||
public Long getLogicalClusterId() {
|
public Long getLogicalClusterId() {
|
||||||
return logicalClusterId;
|
return logicalClusterId;
|
||||||
}
|
}
|
||||||
@@ -105,6 +107,14 @@ public class MineTopicSummary {
|
|||||||
this.access = access;
|
this.access = access;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getDescription() {
|
||||||
|
return description;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDescription(String description) {
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "MineTopicSummary{" +
|
return "MineTopicSummary{" +
|
||||||
|
|||||||
@@ -37,6 +37,8 @@ public class TopicBasicDTO {
|
|||||||
|
|
||||||
private Long retentionTime;
|
private Long retentionTime;
|
||||||
|
|
||||||
|
private Long retentionBytes;
|
||||||
|
|
||||||
public Long getClusterId() {
|
public Long getClusterId() {
|
||||||
return clusterId;
|
return clusterId;
|
||||||
}
|
}
|
||||||
@@ -157,6 +159,14 @@ public class TopicBasicDTO {
|
|||||||
this.retentionTime = retentionTime;
|
this.retentionTime = retentionTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Long getRetentionBytes() {
|
||||||
|
return retentionBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRetentionBytes(Long retentionBytes) {
|
||||||
|
this.retentionBytes = retentionBytes;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "TopicBasicDTO{" +
|
return "TopicBasicDTO{" +
|
||||||
@@ -166,7 +176,7 @@ public class TopicBasicDTO {
|
|||||||
", principals='" + principals + '\'' +
|
", principals='" + principals + '\'' +
|
||||||
", topicName='" + topicName + '\'' +
|
", topicName='" + topicName + '\'' +
|
||||||
", description='" + description + '\'' +
|
", description='" + description + '\'' +
|
||||||
", regionNameList='" + regionNameList + '\'' +
|
", regionNameList=" + regionNameList +
|
||||||
", score=" + score +
|
", score=" + score +
|
||||||
", topicCodeC='" + topicCodeC + '\'' +
|
", topicCodeC='" + topicCodeC + '\'' +
|
||||||
", partitionNum=" + partitionNum +
|
", partitionNum=" + partitionNum +
|
||||||
@@ -175,6 +185,7 @@ public class TopicBasicDTO {
|
|||||||
", modifyTime=" + modifyTime +
|
", modifyTime=" + modifyTime +
|
||||||
", createTime=" + createTime +
|
", createTime=" + createTime +
|
||||||
", retentionTime=" + retentionTime +
|
", retentionTime=" + retentionTime +
|
||||||
|
", retentionBytes=" + retentionBytes +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,8 +27,11 @@ public class OrderVO {
|
|||||||
@ApiModelProperty(value = "工单状态, 0:待审批, 1:通过, 2:拒绝, 3:取消")
|
@ApiModelProperty(value = "工单状态, 0:待审批, 1:通过, 2:拒绝, 3:取消")
|
||||||
private Integer status;
|
private Integer status;
|
||||||
|
|
||||||
@ApiModelProperty(value = "申请/审核时间")
|
@ApiModelProperty(value = "申请时间")
|
||||||
private Date gmtTime;
|
private Date gmtCreate;
|
||||||
|
|
||||||
|
@ApiModelProperty(value = "审核时间")
|
||||||
|
private Date gmtHandle;
|
||||||
|
|
||||||
public Long getId() {
|
public Long getId() {
|
||||||
return id;
|
return id;
|
||||||
@@ -70,12 +73,20 @@ public class OrderVO {
|
|||||||
this.status = status;
|
this.status = status;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Date getGmtTime() {
|
public Date getGmtCreate() {
|
||||||
return gmtTime;
|
return gmtCreate;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setGmtTime(Date gmtTime) {
|
public void setGmtCreate(Date gmtCreate) {
|
||||||
this.gmtTime = gmtTime;
|
this.gmtCreate = gmtCreate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Date getGmtHandle() {
|
||||||
|
return gmtHandle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGmtHandle(Date gmtHandle) {
|
||||||
|
this.gmtHandle = gmtHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getApplicant() {
|
public String getApplicant() {
|
||||||
@@ -95,7 +106,7 @@ public class OrderVO {
|
|||||||
", applicant='" + applicant + '\'' +
|
", applicant='" + applicant + '\'' +
|
||||||
", description='" + description + '\'' +
|
", description='" + description + '\'' +
|
||||||
", status=" + status +
|
", status=" + status +
|
||||||
", gmtTime=" + gmtTime +
|
", gmtTime=" + gmtCreate +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,6 +33,9 @@ public class TopicBasicVO {
|
|||||||
@ApiModelProperty(value = "存储时间(ms)")
|
@ApiModelProperty(value = "存储时间(ms)")
|
||||||
private Long retentionTime;
|
private Long retentionTime;
|
||||||
|
|
||||||
|
@ApiModelProperty(value = "单分区数据保存大小(Byte)")
|
||||||
|
private Long retentionBytes;
|
||||||
|
|
||||||
@ApiModelProperty(value = "创建时间")
|
@ApiModelProperty(value = "创建时间")
|
||||||
private Long createTime;
|
private Long createTime;
|
||||||
|
|
||||||
@@ -62,12 +65,20 @@ public class TopicBasicVO {
|
|||||||
this.clusterId = clusterId;
|
this.clusterId = clusterId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getTopicCodeC() {
|
public String getAppId() {
|
||||||
return topicCodeC;
|
return appId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTopicCodeC(String topicCodeC) {
|
public void setAppId(String appId) {
|
||||||
this.topicCodeC = topicCodeC;
|
this.appId = appId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAppName() {
|
||||||
|
return appName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAppName(String appName) {
|
||||||
|
this.appName = appName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getPartitionNum() {
|
public Integer getPartitionNum() {
|
||||||
@@ -86,22 +97,6 @@ public class TopicBasicVO {
|
|||||||
this.replicaNum = replicaNum;
|
this.replicaNum = replicaNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Long getModifyTime() {
|
|
||||||
return modifyTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setModifyTime(Long modifyTime) {
|
|
||||||
this.modifyTime = modifyTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Long getCreateTime() {
|
|
||||||
return createTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCreateTime(Long createTime) {
|
|
||||||
this.createTime = createTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getPrincipals() {
|
public String getPrincipals() {
|
||||||
return principals;
|
return principals;
|
||||||
}
|
}
|
||||||
@@ -110,30 +105,6 @@ public class TopicBasicVO {
|
|||||||
this.principals = principals;
|
this.principals = principals;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getDescription() {
|
|
||||||
return description;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDescription(String description) {
|
|
||||||
this.description = description;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setAppId(String appId) {
|
|
||||||
this.appId = appId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBootstrapServers(String bootstrapServers) {
|
|
||||||
this.bootstrapServers = bootstrapServers;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getAppId() {
|
|
||||||
return appId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getBootstrapServers() {
|
|
||||||
return bootstrapServers;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Long getRetentionTime() {
|
public Long getRetentionTime() {
|
||||||
return retentionTime;
|
return retentionTime;
|
||||||
}
|
}
|
||||||
@@ -142,12 +113,28 @@ public class TopicBasicVO {
|
|||||||
this.retentionTime = retentionTime;
|
this.retentionTime = retentionTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getAppName() {
|
public Long getRetentionBytes() {
|
||||||
return appName;
|
return retentionBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setAppName(String appName) {
|
public void setRetentionBytes(Long retentionBytes) {
|
||||||
this.appName = appName;
|
this.retentionBytes = retentionBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getCreateTime() {
|
||||||
|
return createTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCreateTime(Long createTime) {
|
||||||
|
this.createTime = createTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getModifyTime() {
|
||||||
|
return modifyTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setModifyTime(Long modifyTime) {
|
||||||
|
this.modifyTime = modifyTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getScore() {
|
public Integer getScore() {
|
||||||
@@ -158,6 +145,30 @@ public class TopicBasicVO {
|
|||||||
this.score = score;
|
this.score = score;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getTopicCodeC() {
|
||||||
|
return topicCodeC;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTopicCodeC(String topicCodeC) {
|
||||||
|
this.topicCodeC = topicCodeC;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDescription() {
|
||||||
|
return description;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDescription(String description) {
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBootstrapServers() {
|
||||||
|
return bootstrapServers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBootstrapServers(String bootstrapServers) {
|
||||||
|
this.bootstrapServers = bootstrapServers;
|
||||||
|
}
|
||||||
|
|
||||||
public List<String> getRegionNameList() {
|
public List<String> getRegionNameList() {
|
||||||
return regionNameList;
|
return regionNameList;
|
||||||
}
|
}
|
||||||
@@ -176,6 +187,7 @@ public class TopicBasicVO {
|
|||||||
", replicaNum=" + replicaNum +
|
", replicaNum=" + replicaNum +
|
||||||
", principals='" + principals + '\'' +
|
", principals='" + principals + '\'' +
|
||||||
", retentionTime=" + retentionTime +
|
", retentionTime=" + retentionTime +
|
||||||
|
", retentionBytes=" + retentionBytes +
|
||||||
", createTime=" + createTime +
|
", createTime=" + createTime +
|
||||||
", modifyTime=" + modifyTime +
|
", modifyTime=" + modifyTime +
|
||||||
", score=" + score +
|
", score=" + score +
|
||||||
|
|||||||
@@ -36,6 +36,9 @@ public class TopicMineVO {
|
|||||||
@ApiModelProperty(value = "状态, 0:无权限, 1:可消费 2:可发送 3:可消费发送 4:可管理")
|
@ApiModelProperty(value = "状态, 0:无权限, 1:可消费 2:可发送 3:可消费发送 4:可管理")
|
||||||
private Integer access;
|
private Integer access;
|
||||||
|
|
||||||
|
@ApiModelProperty(value = "备注")
|
||||||
|
private String description;
|
||||||
|
|
||||||
public Long getClusterId() {
|
public Long getClusterId() {
|
||||||
return clusterId;
|
return clusterId;
|
||||||
}
|
}
|
||||||
@@ -108,6 +111,14 @@ public class TopicMineVO {
|
|||||||
this.access = access;
|
this.access = access;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getDescription() {
|
||||||
|
return description;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDescription(String description) {
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "TopicMineVO{" +
|
return "TopicMineVO{" +
|
||||||
|
|||||||
@@ -0,0 +1,20 @@
|
|||||||
|
package com.xiaojukeji.kafka.manager.common.utils;
|
||||||
|
|
||||||
|
public class BackoffUtils {
|
||||||
|
private BackoffUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void backoff(long timeUnitMs) {
|
||||||
|
if (timeUnitMs <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(timeUnitMs);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
package com.xiaojukeji.kafka.manager.common.utils.factory;
|
package com.xiaojukeji.kafka.manager.common.utils.factory;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import org.apache.commons.pool2.BasePooledObjectFactory;
|
import org.apache.commons.pool2.BasePooledObjectFactory;
|
||||||
import org.apache.commons.pool2.PooledObject;
|
import org.apache.commons.pool2.PooledObject;
|
||||||
@@ -16,7 +16,7 @@ import java.util.Properties;
|
|||||||
* @author zengqiao
|
* @author zengqiao
|
||||||
* @date 20/8/24
|
* @date 20/8/24
|
||||||
*/
|
*/
|
||||||
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer> {
|
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer<String, String>> {
|
||||||
private ClusterDO clusterDO;
|
private ClusterDO clusterDO;
|
||||||
|
|
||||||
public KafkaConsumerFactory(ClusterDO clusterDO) {
|
public KafkaConsumerFactory(ClusterDO clusterDO) {
|
||||||
@@ -25,17 +25,17 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public KafkaConsumer create() {
|
public KafkaConsumer create() {
|
||||||
return new KafkaConsumer(createKafkaConsumerProperties(clusterDO));
|
return new KafkaConsumer<String, String>(createKafkaConsumerProperties(clusterDO));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PooledObject<KafkaConsumer> wrap(KafkaConsumer obj) {
|
public PooledObject<KafkaConsumer<String, String>> wrap(KafkaConsumer<String, String> obj) {
|
||||||
return new DefaultPooledObject<KafkaConsumer>(obj);
|
return new DefaultPooledObject<>(obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroyObject(final PooledObject<KafkaConsumer> p) throws Exception {
|
public void destroyObject(final PooledObject<KafkaConsumer<String, String>> p) throws Exception {
|
||||||
KafkaConsumer kafkaConsumer = p.getObject();
|
KafkaConsumer<String, String> kafkaConsumer = p.getObject();
|
||||||
if (ValidateUtils.isNull(kafkaConsumer)) {
|
if (ValidateUtils.isNull(kafkaConsumer)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -57,7 +57,7 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
|
|||||||
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
|
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
properties.putAll(JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class));
|
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.xiaojukeji.kafka.manager.common.utils.jmx;
|
package com.xiaojukeji.kafka.manager.common.utils.jmx;
|
||||||
|
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -79,7 +80,8 @@ public class JmxConnectorWrap {
|
|||||||
try {
|
try {
|
||||||
Map<String, Object> environment = new HashMap<String, Object>();
|
Map<String, Object> environment = new HashMap<String, Object>();
|
||||||
if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getPassword())) {
|
if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getPassword())) {
|
||||||
environment.put(JMXConnector.CREDENTIALS, Arrays.asList(this.jmxConfig.getUsername(), this.jmxConfig.getPassword()));
|
// fixed by riyuetianmu
|
||||||
|
environment.put(JMXConnector.CREDENTIALS, new String[]{this.jmxConfig.getUsername(), this.jmxConfig.getPassword()});
|
||||||
}
|
}
|
||||||
if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) {
|
if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) {
|
||||||
environment.put(Context.SECURITY_PROTOCOL, "ssl");
|
environment.put(Context.SECURITY_PROTOCOL, "ssl");
|
||||||
@@ -145,18 +147,16 @@ public class JmxConnectorWrap {
|
|||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
if (System.currentTimeMillis() - now > 60000) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
int num = atomicInteger.get();
|
int num = atomicInteger.get();
|
||||||
if (num <= 0) {
|
if (num <= 0) {
|
||||||
Thread.sleep(2);
|
BackoffUtils.backoff(2);
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
if (atomicInteger.compareAndSet(num, num - 1)) {
|
|
||||||
|
if (atomicInteger.compareAndSet(num, num - 1) || System.currentTimeMillis() - now > 6000) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
// ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "logi-kafka",
|
"name": "logi-kafka",
|
||||||
"version": "2.4.3",
|
"version": "2.5.0",
|
||||||
"description": "",
|
"description": "",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"start": "webpack-dev-server",
|
"start": "webpack-dev-server",
|
||||||
@@ -57,4 +57,4 @@
|
|||||||
"dependencies": {
|
"dependencies": {
|
||||||
"format-to-json": "^1.0.4"
|
"format-to-json": "^1.0.4"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
BIN
kafka-manager-console/src/assets/image/weChat.png
Normal file
BIN
kafka-manager-console/src/assets/image/weChat.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 36 KiB |
|
Before Width: | Height: | Size: 125 KiB After Width: | Height: | Size: 125 KiB |
@@ -60,6 +60,22 @@ export class ChartWithDatePicker extends React.Component<IChartProps> {
|
|||||||
public changeChartOptions(options: any) {
|
public changeChartOptions(options: any) {
|
||||||
const noData = options.series.length ? false : true;
|
const noData = options.series.length ? false : true;
|
||||||
this.setState({ noData });
|
this.setState({ noData });
|
||||||
|
options.tooltip.formatter = (params: any) => {
|
||||||
|
var res =
|
||||||
|
"<div style='margin-bottom:5px;padding:0 12px;width:100%;height:24px;line-height:24px;border-radius:3px;'><p>" +
|
||||||
|
params[0].data.time +
|
||||||
|
" </p></div>";
|
||||||
|
for (var i = 0; i < params.length; i++) {
|
||||||
|
res += `<div key=${params[i].seriesName} style="color: #fff;padding:0 12px;line-height: 24px">
|
||||||
|
<span style="display:inline-block;margin-right:5px;border-radius:50%;width:10px;height:10px;background-color:${[
|
||||||
|
params[i].color,
|
||||||
|
]};"></span>
|
||||||
|
${params[i].seriesName}
|
||||||
|
${params[i].data[params[i].seriesName]}
|
||||||
|
</div>`;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
this.chart.setOption(options, true);
|
this.chart.setOption(options, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -79,7 +95,7 @@ export class ChartWithDatePicker extends React.Component<IChartProps> {
|
|||||||
public render() {
|
public render() {
|
||||||
const { customerNode } = this.props;
|
const { customerNode } = this.props;
|
||||||
return (
|
return (
|
||||||
<div className="status-box" style={{minWidth: '930px'}}>
|
<div className="status-box" style={{ minWidth: '930px' }}>
|
||||||
<div className="status-graph">
|
<div className="status-graph">
|
||||||
<div className="k-toolbar">
|
<div className="k-toolbar">
|
||||||
{customerNode}
|
{customerNode}
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import { urlPrefix } from 'constants/left-menu';
|
|||||||
import { region, IRegionIdcs } from 'store/region';
|
import { region, IRegionIdcs } from 'store/region';
|
||||||
import logoUrl from '../../assets/image/kafka-logo.png';
|
import logoUrl from '../../assets/image/kafka-logo.png';
|
||||||
import userIcon from '../../assets/image/normal.png';
|
import userIcon from '../../assets/image/normal.png';
|
||||||
import weChat from '../../assets/image/wechat.jpeg';
|
import weChat from '../../assets/image/weChat.png';
|
||||||
import { users } from 'store/users';
|
import { users } from 'store/users';
|
||||||
import { observer } from 'mobx-react';
|
import { observer } from 'mobx-react';
|
||||||
import { Link } from 'react-router-dom';
|
import { Link } from 'react-router-dom';
|
||||||
@@ -60,8 +60,8 @@ export const Header = observer((props: IHeader) => {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
const content = (
|
const content = (
|
||||||
<div style={{ height: '250px', padding: '5px' }} className="kafka-avatar-img">
|
<div style={{ height: '200px', padding: '5px' }} className="kafka-avatar-img">
|
||||||
<img style={{ width: '190px', height: '246px' }} src={weChat} alt="" />
|
<img style={{ width: '190px', height: '190px' }} src={weChat} alt="" />
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
const helpCenter = (
|
const helpCenter = (
|
||||||
@@ -144,8 +144,8 @@ export const Header = observer((props: IHeader) => {
|
|||||||
<div className="kafka-header-container">
|
<div className="kafka-header-container">
|
||||||
<div className="left-content">
|
<div className="left-content">
|
||||||
<img className="kafka-header-icon" src={logoUrl} alt="" />
|
<img className="kafka-header-icon" src={logoUrl} alt="" />
|
||||||
<span className="kafka-header-text">Kafka Manager</span>
|
<span className="kafka-header-text">LogiKM</span>
|
||||||
<a className='kafka-header-version' href="https://github.com/didi/Logi-KafkaManager/releases" target='_blank'>v2.4.2</a>
|
<a className='kafka-header-version' href="https://github.com/didi/Logi-KafkaManager/releases" target='_blank'>v2.5.0</a>
|
||||||
{/* 添加版本超链接 */}
|
{/* 添加版本超链接 */}
|
||||||
</div>
|
</div>
|
||||||
<div className="mid-content">
|
<div className="mid-content">
|
||||||
|
|||||||
@@ -115,11 +115,19 @@ export class OrderList extends SearchAndFilterContainer {
|
|||||||
status,
|
status,
|
||||||
{
|
{
|
||||||
title: '申请时间',
|
title: '申请时间',
|
||||||
dataIndex: 'gmtTime',
|
dataIndex: 'gmtCreate',
|
||||||
key: 'gmtTime',
|
key: 'gmtCreate',
|
||||||
sorter: (a: IBaseOrder, b: IBaseOrder) => b.gmtTime - a.gmtTime,
|
sorter: (a: IBaseOrder, b: IBaseOrder) => b.gmtCreate - a.gmtCreate,
|
||||||
render: (t: number) => moment(t).format(timeFormat),
|
render: (t: number) => t ? moment(t).format(timeFormat) : '-',
|
||||||
}, {
|
},
|
||||||
|
{
|
||||||
|
title: '审批时间',
|
||||||
|
dataIndex: 'gmtHandle',
|
||||||
|
key: 'gmtHandle',
|
||||||
|
sorter: (a: IBaseOrder, b: IBaseOrder) => b.gmtHandle - a.gmtHandle,
|
||||||
|
render: (t: number) => t ? moment(t).format(timeFormat) : '-',
|
||||||
|
},
|
||||||
|
{
|
||||||
title: '操作',
|
title: '操作',
|
||||||
key: 'operation',
|
key: 'operation',
|
||||||
dataIndex: 'operation',
|
dataIndex: 'operation',
|
||||||
|
|||||||
@@ -1,12 +1,15 @@
|
|||||||
<!DOCTYPE html>
|
<!DOCTYPE html>
|
||||||
<html lang="en">
|
<html lang="en">
|
||||||
|
|
||||||
<head>
|
<head>
|
||||||
<meta charset="UTF-8">
|
<meta charset="UTF-8">
|
||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=2">
|
<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=2">
|
||||||
<title>KafkaManager</title>
|
<title>LogiKM</title>
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body>
|
<body>
|
||||||
<div id="root"></div>
|
<div id="root"></div>
|
||||||
<div id="modal"></div>
|
<div id="modal"></div>
|
||||||
</body>
|
</body>
|
||||||
|
|
||||||
</html>
|
</html>
|
||||||
@@ -17,6 +17,9 @@ public class ConsumerMetadataCache {
|
|||||||
|
|
||||||
private static final Map<Long, ConsumerMetadata> CG_METADATA_IN_BK_MAP = new ConcurrentHashMap<>();
|
private static final Map<Long, ConsumerMetadata> CG_METADATA_IN_BK_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private ConsumerMetadataCache() {
|
||||||
|
}
|
||||||
|
|
||||||
public static void putConsumerMetadataInZK(Long clusterId, ConsumerMetadata consumerMetadata) {
|
public static void putConsumerMetadataInZK(Long clusterId, ConsumerMetadata consumerMetadata) {
|
||||||
if (clusterId == null || consumerMetadata == null) {
|
if (clusterId == null || consumerMetadata == null) {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
package com.xiaojukeji.kafka.manager.service.cache;
|
package com.xiaojukeji.kafka.manager.service.cache;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
|
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
|
||||||
import kafka.admin.AdminClient;
|
import kafka.admin.AdminClient;
|
||||||
@@ -26,19 +26,22 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||||||
* @date 19/12/24
|
* @date 19/12/24
|
||||||
*/
|
*/
|
||||||
public class KafkaClientPool {
|
public class KafkaClientPool {
|
||||||
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* AdminClient
|
* AdminClient
|
||||||
*/
|
*/
|
||||||
private static Map<Long, AdminClient> AdminClientMap = new ConcurrentHashMap<>();
|
private static final Map<Long, AdminClient> ADMIN_CLIENT_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private static Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
|
private static final Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private static Map<Long, GenericObjectPool<KafkaConsumer>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
|
private static final Map<Long, GenericObjectPool<KafkaConsumer<String, String>>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private static ReentrantLock lock = new ReentrantLock();
|
private static ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
|
private KafkaClientPool() {
|
||||||
|
}
|
||||||
|
|
||||||
private static void initKafkaProducerMap(Long clusterId) {
|
private static void initKafkaProducerMap(Long clusterId) {
|
||||||
ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterId);
|
ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterId);
|
||||||
if (clusterDO == null) {
|
if (clusterDO == null) {
|
||||||
@@ -55,7 +58,7 @@ public class KafkaClientPool {
|
|||||||
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
|
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
|
||||||
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
|
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
|
||||||
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
|
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
|
||||||
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<String, String>(properties));
|
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<>(properties));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("create kafka producer failed, clusterDO:{}.", clusterDO, e);
|
LOGGER.error("create kafka producer failed, clusterDO:{}.", clusterDO, e);
|
||||||
} finally {
|
} finally {
|
||||||
@@ -77,25 +80,22 @@ public class KafkaClientPool {
|
|||||||
if (ValidateUtils.isNull(kafkaProducer)) {
|
if (ValidateUtils.isNull(kafkaProducer)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
kafkaProducer.send(new ProducerRecord<String, String>(topicName, data));
|
kafkaProducer.send(new ProducerRecord<>(topicName, data));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void initKafkaConsumerPool(ClusterDO clusterDO) {
|
private static void initKafkaConsumerPool(ClusterDO clusterDO) {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||||
if (objectPool != null) {
|
if (objectPool != null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
|
GenericObjectPoolConfig<KafkaConsumer<String, String>> config = new GenericObjectPoolConfig<>();
|
||||||
config.setMaxIdle(24);
|
config.setMaxIdle(24);
|
||||||
config.setMinIdle(24);
|
config.setMinIdle(24);
|
||||||
config.setMaxTotal(24);
|
config.setMaxTotal(24);
|
||||||
KAFKA_CONSUMER_POOL.put(
|
KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config));
|
||||||
clusterDO.getId(),
|
|
||||||
new GenericObjectPool<KafkaConsumer>(new KafkaConsumerFactory(clusterDO), config)
|
|
||||||
);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e);
|
LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e);
|
||||||
} finally {
|
} finally {
|
||||||
@@ -106,7 +106,7 @@ public class KafkaClientPool {
|
|||||||
public static void closeKafkaConsumerPool(Long clusterId) {
|
public static void closeKafkaConsumerPool(Long clusterId) {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
|
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
|
||||||
if (objectPool == null) {
|
if (objectPool == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -118,11 +118,11 @@ public class KafkaClientPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
|
public static KafkaConsumer<String, String> borrowKafkaConsumerClient(ClusterDO clusterDO) {
|
||||||
if (ValidateUtils.isNull(clusterDO)) {
|
if (ValidateUtils.isNull(clusterDO)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||||
if (ValidateUtils.isNull(objectPool)) {
|
if (ValidateUtils.isNull(objectPool)) {
|
||||||
initKafkaConsumerPool(clusterDO);
|
initKafkaConsumerPool(clusterDO);
|
||||||
objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||||
@@ -139,11 +139,11 @@ public class KafkaClientPool {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) {
|
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer<String, String> kafkaConsumer) {
|
||||||
if (ValidateUtils.isNull(physicalClusterId) || ValidateUtils.isNull(kafkaConsumer)) {
|
if (ValidateUtils.isNull(physicalClusterId) || ValidateUtils.isNull(kafkaConsumer)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
|
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
|
||||||
if (ValidateUtils.isNull(objectPool)) {
|
if (ValidateUtils.isNull(objectPool)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -155,7 +155,7 @@ public class KafkaClientPool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static AdminClient getAdminClient(Long clusterId) {
|
public static AdminClient getAdminClient(Long clusterId) {
|
||||||
AdminClient adminClient = AdminClientMap.get(clusterId);
|
AdminClient adminClient = ADMIN_CLIENT_MAP.get(clusterId);
|
||||||
if (adminClient != null) {
|
if (adminClient != null) {
|
||||||
return adminClient;
|
return adminClient;
|
||||||
}
|
}
|
||||||
@@ -166,26 +166,26 @@ public class KafkaClientPool {
|
|||||||
Properties properties = createProperties(clusterDO, false);
|
Properties properties = createProperties(clusterDO, false);
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
adminClient = AdminClientMap.get(clusterId);
|
adminClient = ADMIN_CLIENT_MAP.get(clusterId);
|
||||||
if (adminClient != null) {
|
if (adminClient != null) {
|
||||||
return adminClient;
|
return adminClient;
|
||||||
}
|
}
|
||||||
AdminClientMap.put(clusterId, AdminClient.create(properties));
|
ADMIN_CLIENT_MAP.put(clusterId, AdminClient.create(properties));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("create kafka admin client failed, clusterId:{}.", clusterId, e);
|
LOGGER.error("create kafka admin client failed, clusterId:{}.", clusterId, e);
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
return AdminClientMap.get(clusterId);
|
return ADMIN_CLIENT_MAP.get(clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void closeAdminClient(ClusterDO cluster) {
|
public static void closeAdminClient(ClusterDO cluster) {
|
||||||
if (AdminClientMap.containsKey(cluster.getId())) {
|
if (ADMIN_CLIENT_MAP.containsKey(cluster.getId())) {
|
||||||
AdminClientMap.get(cluster.getId()).close();
|
ADMIN_CLIENT_MAP.get(cluster.getId()).close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Properties createProperties(ClusterDO clusterDO, Boolean serialize) {
|
public static Properties createProperties(ClusterDO clusterDO, boolean serialize) {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterDO.getBootstrapServers());
|
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterDO.getBootstrapServers());
|
||||||
if (serialize) {
|
if (serialize) {
|
||||||
@@ -198,8 +198,7 @@ public class KafkaClientPool {
|
|||||||
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
|
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
Properties securityProperties = JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class);
|
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
|
||||||
properties.putAll(securityProperties);
|
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -14,7 +14,10 @@ public class KafkaMetricsCache {
|
|||||||
/**
|
/**
|
||||||
* <clusterId, Metrics List>
|
* <clusterId, Metrics List>
|
||||||
*/
|
*/
|
||||||
private static Map<Long, Map<String, TopicMetrics>> TopicMetricsMap = new ConcurrentHashMap<>();
|
private static final Map<Long, Map<String, TopicMetrics>> TOPIC_METRICS_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private KafkaMetricsCache() {
|
||||||
|
}
|
||||||
|
|
||||||
public static void putTopicMetricsToCache(Long clusterId, List<TopicMetrics> dataList) {
|
public static void putTopicMetricsToCache(Long clusterId, List<TopicMetrics> dataList) {
|
||||||
if (clusterId == null || dataList == null) {
|
if (clusterId == null || dataList == null) {
|
||||||
@@ -24,22 +27,22 @@ public class KafkaMetricsCache {
|
|||||||
for (TopicMetrics topicMetrics : dataList) {
|
for (TopicMetrics topicMetrics : dataList) {
|
||||||
subMetricsMap.put(topicMetrics.getTopicName(), topicMetrics);
|
subMetricsMap.put(topicMetrics.getTopicName(), topicMetrics);
|
||||||
}
|
}
|
||||||
TopicMetricsMap.put(clusterId, subMetricsMap);
|
TOPIC_METRICS_MAP.put(clusterId, subMetricsMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Map<String, TopicMetrics> getTopicMetricsFromCache(Long clusterId) {
|
public static Map<String, TopicMetrics> getTopicMetricsFromCache(Long clusterId) {
|
||||||
return TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
|
return TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Map<Long, Map<String, TopicMetrics>> getAllTopicMetricsFromCache() {
|
public static Map<Long, Map<String, TopicMetrics>> getAllTopicMetricsFromCache() {
|
||||||
return TopicMetricsMap;
|
return TOPIC_METRICS_MAP;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TopicMetrics getTopicMetricsFromCache(Long clusterId, String topicName) {
|
public static TopicMetrics getTopicMetricsFromCache(Long clusterId, String topicName) {
|
||||||
if (clusterId == null || topicName == null) {
|
if (clusterId == null || topicName == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
Map<String, TopicMetrics> subMap = TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
|
Map<String, TopicMetrics> subMap = TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
|
||||||
return subMap.get(topicName);
|
return subMap.get(topicName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -160,7 +160,7 @@ public class LogicalClusterMetadataManager {
|
|||||||
public void flush() {
|
public void flush() {
|
||||||
List<LogicalClusterDO> logicalClusterDOList = logicalClusterService.listAll();
|
List<LogicalClusterDO> logicalClusterDOList = logicalClusterService.listAll();
|
||||||
if (ValidateUtils.isNull(logicalClusterDOList)) {
|
if (ValidateUtils.isNull(logicalClusterDOList)) {
|
||||||
logicalClusterDOList = Collections.EMPTY_LIST;
|
logicalClusterDOList = Collections.emptyList();
|
||||||
}
|
}
|
||||||
Set<Long> inDbLogicalClusterIds = logicalClusterDOList.stream()
|
Set<Long> inDbLogicalClusterIds = logicalClusterDOList.stream()
|
||||||
.map(LogicalClusterDO::getId)
|
.map(LogicalClusterDO::getId)
|
||||||
|
|||||||
@@ -3,10 +3,12 @@ package com.xiaojukeji.kafka.manager.service.cache;
|
|||||||
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
|
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
|
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
|
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
|
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
|
||||||
@@ -37,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
*/
|
*/
|
||||||
@Service
|
@Service
|
||||||
public class PhysicalClusterMetadataManager {
|
public class PhysicalClusterMetadataManager {
|
||||||
private final static Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ControllerDao controllerDao;
|
private ControllerDao controllerDao;
|
||||||
@@ -48,22 +50,22 @@ public class PhysicalClusterMetadataManager {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private ClusterService clusterService;
|
private ClusterService clusterService;
|
||||||
|
|
||||||
private final static Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
|
private static final Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final static Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
|
private static final Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final static Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
|
private static final Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final static Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
|
private static final Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final static Map<Long, Map<String, Long>> TOPIC_RETENTION_TIME_MAP = new ConcurrentHashMap<>();
|
private static final Map<Long, Map<String, Properties>> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final static Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
|
private static final Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JXM连接, 延迟连接
|
* JXM连接, 延迟连接
|
||||||
*/
|
*/
|
||||||
private final static Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
|
private static final Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* KafkaBroker版本, 延迟获取
|
* KafkaBroker版本, 延迟获取
|
||||||
@@ -95,7 +97,7 @@ public class PhysicalClusterMetadataManager {
|
|||||||
|
|
||||||
// 初始化topic-map
|
// 初始化topic-map
|
||||||
TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
|
TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
|
||||||
TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
|
TOPIC_PROPERTIES_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
|
||||||
|
|
||||||
// 初始化cluster-map
|
// 初始化cluster-map
|
||||||
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
|
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
|
||||||
@@ -158,7 +160,7 @@ public class PhysicalClusterMetadataManager {
|
|||||||
KAFKA_VERSION_MAP.remove(clusterId);
|
KAFKA_VERSION_MAP.remove(clusterId);
|
||||||
|
|
||||||
TOPIC_METADATA_MAP.remove(clusterId);
|
TOPIC_METADATA_MAP.remove(clusterId);
|
||||||
TOPIC_RETENTION_TIME_MAP.remove(clusterId);
|
TOPIC_PROPERTIES_MAP.remove(clusterId);
|
||||||
CLUSTER_MAP.remove(clusterId);
|
CLUSTER_MAP.remove(clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -262,24 +264,45 @@ public class PhysicalClusterMetadataManager {
|
|||||||
|
|
||||||
//---------------------------配置相关元信息--------------
|
//---------------------------配置相关元信息--------------
|
||||||
|
|
||||||
public static void putTopicRetentionTime(Long clusterId, String topicName, Long retentionTime) {
|
public static void putTopicProperties(Long clusterId, String topicName, Properties properties) {
|
||||||
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
|
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isBlank(topicName) || ValidateUtils.isNull(properties)) {
|
||||||
if (timeMap == null) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
timeMap.put(topicName, retentionTime);
|
|
||||||
|
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
|
||||||
|
if (ValidateUtils.isNull(propertiesMap)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
propertiesMap.put(topicName, properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Long getTopicRetentionTime(Long clusterId, String topicName) {
|
public static Long getTopicRetentionTime(Long clusterId, String topicName) {
|
||||||
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
|
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
|
||||||
if (timeMap == null) {
|
if (ValidateUtils.isNull(propertiesMap)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return timeMap.get(topicName);
|
|
||||||
|
Properties properties = propertiesMap.get(topicName);
|
||||||
|
if (ValidateUtils.isNull(properties)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_TIME_KEY_NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Long getTopicRetentionBytes(Long clusterId, String topicName) {
|
||||||
|
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
|
||||||
|
if (ValidateUtils.isNull(propertiesMap)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Properties properties = propertiesMap.get(topicName);
|
||||||
|
if (ValidateUtils.isNull(properties)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_BYTES_KEY_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
//---------------------------Broker元信息相关--------------
|
//---------------------------Broker元信息相关--------------
|
||||||
|
|
||||||
@@ -375,7 +398,7 @@ public class PhysicalClusterMetadataManager {
|
|||||||
KafkaBrokerRoleEnum roleEnum) {
|
KafkaBrokerRoleEnum roleEnum) {
|
||||||
BrokerMetadata brokerMetadata =
|
BrokerMetadata brokerMetadata =
|
||||||
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
|
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
|
||||||
if (ValidateUtils.isNull(brokerMetadata)) {
|
if (brokerMetadata == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String hostname = brokerMetadata.getHost().replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, "");
|
String hostname = brokerMetadata.getHost().replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, "");
|
||||||
@@ -415,7 +438,7 @@ public class PhysicalClusterMetadataManager {
|
|||||||
KafkaBrokerRoleEnum roleEnum) {
|
KafkaBrokerRoleEnum roleEnum) {
|
||||||
BrokerMetadata brokerMetadata =
|
BrokerMetadata brokerMetadata =
|
||||||
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
|
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
|
||||||
if (ValidateUtils.isNull(brokerMetadata)) {
|
if (brokerMetadata == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,4 +13,12 @@ public interface TopicExpiredService {
|
|||||||
List<TopicExpiredData> getExpiredTopicDataList(String username);
|
List<TopicExpiredData> getExpiredTopicDataList(String username);
|
||||||
|
|
||||||
ResultStatus retainExpiredTopic(Long physicalClusterId, String topicName, Integer retainDays);
|
ResultStatus retainExpiredTopic(Long physicalClusterId, String topicName, Integer retainDays);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 通过topictopic名称删除
|
||||||
|
* @param clusterId 集群id
|
||||||
|
* @param topicName topic名称
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
int deleteByTopicName(Long clusterId, String topicName);
|
||||||
}
|
}
|
||||||
@@ -43,6 +43,9 @@ public class AdminServiceImpl implements AdminService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TopicManagerService topicManagerService;
|
private TopicManagerService topicManagerService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TopicExpiredService topicExpiredService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private TopicService topicService;
|
private TopicService topicService;
|
||||||
|
|
||||||
@@ -143,6 +146,7 @@ public class AdminServiceImpl implements AdminService {
|
|||||||
|
|
||||||
// 3. 数据库中删除topic
|
// 3. 数据库中删除topic
|
||||||
topicManagerService.deleteByTopicName(clusterDO.getId(), topicName);
|
topicManagerService.deleteByTopicName(clusterDO.getId(), topicName);
|
||||||
|
topicExpiredService.deleteByTopicName(clusterDO.getId(), topicName);
|
||||||
|
|
||||||
// 4. 数据库中删除authority
|
// 4. 数据库中删除authority
|
||||||
authorityService.deleteAuthorityByTopic(clusterDO.getId(), topicName);
|
authorityService.deleteAuthorityByTopic(clusterDO.getId(), topicName);
|
||||||
|
|||||||
@@ -19,6 +19,8 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
|
|||||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.*;
|
import com.xiaojukeji.kafka.manager.service.service.*;
|
||||||
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
|
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
|
||||||
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
import org.apache.zookeeper.Watcher;
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -210,7 +212,7 @@ public class ClusterServiceImpl implements ClusterService {
|
|||||||
|
|
||||||
ZooKeeper zk = null;
|
ZooKeeper zk = null;
|
||||||
try {
|
try {
|
||||||
zk = new ZooKeeper(zookeeper, 1000, null);
|
zk = new ZooKeeper(zookeeper, 1000, watchedEvent -> LOGGER.info(" receive event : " + watchedEvent.getType().name()));
|
||||||
for (int i = 0; i < 15; ++i) {
|
for (int i = 0; i < 15; ++i) {
|
||||||
if (zk.getState().isConnected()) {
|
if (zk.getState().isConnected()) {
|
||||||
// 只有状态是connected的时候,才表示地址是合法的
|
// 只有状态是connected的时候,才表示地址是合法的
|
||||||
|
|||||||
@@ -75,4 +75,14 @@ public class TopicExpiredServiceImpl implements TopicExpiredService {
|
|||||||
}
|
}
|
||||||
return ResultStatus.MYSQL_ERROR;
|
return ResultStatus.MYSQL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int deleteByTopicName(Long clusterId, String topicName) {
|
||||||
|
try {
|
||||||
|
return topicExpiredDao.deleteByName(clusterId, topicName);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("delete topic failed, clusterId:{} topicName:{}", clusterId, topicName, e);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -210,7 +210,7 @@ public class TopicManagerServiceImpl implements TopicManagerService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 增加流量信息
|
// 增加流量和描述信息
|
||||||
Map<Long, Map<String, TopicMetrics>> metricMap = KafkaMetricsCache.getAllTopicMetricsFromCache();
|
Map<Long, Map<String, TopicMetrics>> metricMap = KafkaMetricsCache.getAllTopicMetricsFromCache();
|
||||||
for (MineTopicSummary mineTopicSummary : summaryList) {
|
for (MineTopicSummary mineTopicSummary : summaryList) {
|
||||||
TopicMetrics topicMetrics = getTopicMetricsFromCacheOrJmx(
|
TopicMetrics topicMetrics = getTopicMetricsFromCacheOrJmx(
|
||||||
@@ -219,6 +219,10 @@ public class TopicManagerServiceImpl implements TopicManagerService {
|
|||||||
metricMap);
|
metricMap);
|
||||||
mineTopicSummary.setBytesIn(topicMetrics.getSpecifiedMetrics("BytesInPerSecOneMinuteRate"));
|
mineTopicSummary.setBytesIn(topicMetrics.getSpecifiedMetrics("BytesInPerSecOneMinuteRate"));
|
||||||
mineTopicSummary.setBytesOut(topicMetrics.getSpecifiedMetrics("BytesOutPerSecOneMinuteRate"));
|
mineTopicSummary.setBytesOut(topicMetrics.getSpecifiedMetrics("BytesOutPerSecOneMinuteRate"));
|
||||||
|
|
||||||
|
// 增加topic描述信息
|
||||||
|
TopicDO topicDO = topicDao.getByTopicName(mineTopicSummary.getPhysicalClusterId(), mineTopicSummary.getTopicName());
|
||||||
|
mineTopicSummary.setDescription(topicDO.getDescription());
|
||||||
}
|
}
|
||||||
return summaryList;
|
return summaryList;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -223,6 +223,7 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
basicDTO.setCreateTime(topicMetadata.getCreateTime());
|
basicDTO.setCreateTime(topicMetadata.getCreateTime());
|
||||||
basicDTO.setModifyTime(topicMetadata.getModifyTime());
|
basicDTO.setModifyTime(topicMetadata.getModifyTime());
|
||||||
basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName));
|
basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName));
|
||||||
|
basicDTO.setRetentionBytes(PhysicalClusterMetadataManager.getTopicRetentionBytes(clusterId, topicName));
|
||||||
|
|
||||||
TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName);
|
TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName);
|
||||||
if (!ValidateUtils.isNull(topicDO)) {
|
if (!ValidateUtils.isNull(topicDO)) {
|
||||||
@@ -648,10 +649,11 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
List<String> dataList = new ArrayList<>();
|
List<String> dataList = new ArrayList<>();
|
||||||
int currentSize = dataList.size();
|
int currentSize = dataList.size();
|
||||||
while (dataList.size() < maxMsgNum) {
|
while (dataList.size() < maxMsgNum) {
|
||||||
|
if (remainingWaitMs <= 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (remainingWaitMs <= 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ConsumerRecords<String, String> records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS);
|
ConsumerRecords<String, String> records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS);
|
||||||
for (ConsumerRecord record : records) {
|
for (ConsumerRecord record : records) {
|
||||||
String value = (String) record.value();
|
String value = (String) record.value();
|
||||||
@@ -661,20 +663,22 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
: value
|
: value
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// 当前批次一条数据都没拉取到,则结束拉取
|
|
||||||
if (dataList.size() - currentSize == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
currentSize = dataList.size();
|
|
||||||
// 检查是否超时
|
|
||||||
long elapsed = System.currentTimeMillis() - begin;
|
|
||||||
if (elapsed >= maxWaitMs) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
remainingWaitMs = maxWaitMs - elapsed;
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
|
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 当前批次一条数据都没拉取到,则结束拉取
|
||||||
|
if (dataList.size() - currentSize == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
currentSize = dataList.size();
|
||||||
|
|
||||||
|
// 检查是否超时
|
||||||
|
long elapsed = System.currentTimeMillis() - begin;
|
||||||
|
if (elapsed >= maxWaitMs) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
remainingWaitMs = maxWaitMs - elapsed;
|
||||||
}
|
}
|
||||||
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
|
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
|
||||||
}
|
}
|
||||||
@@ -698,14 +702,15 @@ public class TopicServiceImpl implements TopicService {
|
|||||||
: value
|
: value
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if (System.currentTimeMillis() - timestamp > timeout
|
|
||||||
|| dataList.size() >= maxMsgNum) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
|
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (System.currentTimeMillis() - timestamp > timeout || dataList.size() >= maxMsgNum) {
|
||||||
|
// 超时或者是数据已采集足够时, 直接返回
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
|
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,4 +17,6 @@ public interface TopicExpiredDao {
|
|||||||
int replace(TopicExpiredDO expiredDO);
|
int replace(TopicExpiredDO expiredDO);
|
||||||
|
|
||||||
TopicExpiredDO getByTopic(Long clusterId, String topicName);
|
TopicExpiredDO getByTopic(Long clusterId, String topicName);
|
||||||
|
|
||||||
|
int deleteByName(Long clusterId, String topicName);
|
||||||
}
|
}
|
||||||
@@ -50,4 +50,12 @@ public class TopicExpiredDaoImpl implements TopicExpiredDao {
|
|||||||
params.put("topicName", topicName);
|
params.put("topicName", topicName);
|
||||||
return sqlSession.selectOne("TopicExpiredDao.getByTopic", params);
|
return sqlSession.selectOne("TopicExpiredDao.getByTopic", params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int deleteByName(Long clusterId, String topicName) {
|
||||||
|
Map<String, Object> params = new HashMap<>(2);
|
||||||
|
params.put("clusterId", clusterId);
|
||||||
|
params.put("topicName", topicName);
|
||||||
|
return sqlSession.delete("TopicExpiredDao.deleteByName", params);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -36,4 +36,8 @@
|
|||||||
<select id="getByTopic" parameterType="java.util.Map" resultMap="TopicExpiredMap">
|
<select id="getByTopic" parameterType="java.util.Map" resultMap="TopicExpiredMap">
|
||||||
SELECT * FROM topic_expired WHERE cluster_id = #{clusterId} AND topic_name = #{topicName}
|
SELECT * FROM topic_expired WHERE cluster_id = #{clusterId} AND topic_name = #{topicName}
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
|
<delete id="deleteByName" parameterType="java.util.Map">
|
||||||
|
DELETE FROM topic_expired WHERE cluster_id=#{clusterId} AND topic_name=#{topicName}
|
||||||
|
</delete>
|
||||||
</mapper>
|
</mapper>
|
||||||
|
|||||||
@@ -25,6 +25,7 @@
|
|||||||
WHERE cluster_id = #{clusterId}
|
WHERE cluster_id = #{clusterId}
|
||||||
AND topic_name = #{topicName}
|
AND topic_name = #{topicName}
|
||||||
AND gmt_create BETWEEN #{startTime} AND #{endTime}
|
AND gmt_create BETWEEN #{startTime} AND #{endTime}
|
||||||
|
ORDER BY gmt_create
|
||||||
]]>
|
]]>
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
@@ -32,6 +33,7 @@
|
|||||||
<![CDATA[
|
<![CDATA[
|
||||||
SELECT * FROM topic_metrics
|
SELECT * FROM topic_metrics
|
||||||
WHERE cluster_id = #{clusterId} AND #{afterTime} <= gmt_create
|
WHERE cluster_id = #{clusterId} AND #{afterTime} <= gmt_create
|
||||||
|
ORDER BY gmt_create
|
||||||
]]>
|
]]>
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
|
|||||||
@@ -75,11 +75,7 @@ public class LoginServiceImpl implements LoginService {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)
|
if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)) {
|
||||||
|| classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_PREFIX)
|
|
||||||
|| classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX)
|
|
||||||
|| classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_NORMAL_PREFIX)
|
|
||||||
|| classRequestMappingValue.equals(ApiPrefix.GATEWAY_API_V1_PREFIX)) {
|
|
||||||
// 白名单接口直接true
|
// 白名单接口直接true
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ public class Converts {
|
|||||||
orderDO.setApprover("");
|
orderDO.setApprover("");
|
||||||
orderDO.setOpinion("");
|
orderDO.setOpinion("");
|
||||||
orderDO.setExtensions(orderDTO.getExtensions());
|
orderDO.setExtensions(orderDTO.getExtensions());
|
||||||
orderDO.setType(orderDTO.getType());
|
|
||||||
return orderDO;
|
return orderDO;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -30,16 +30,23 @@ public class CollectAndPublishCommunityTopicMetrics extends AbstractScheduledTas
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<ClusterDO> listAllTasks() {
|
protected List<ClusterDO> listAllTasks() {
|
||||||
|
// 获取需要进行指标采集的集群列表,这些集群将会被拆分到多台KM中进行执行。
|
||||||
return clusterService.list();
|
return clusterService.list();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processTask(ClusterDO clusterDO) {
|
public void processTask(ClusterDO clusterDO) {
|
||||||
|
// 这里需要实现对clusterDO这个集群进行Topic指标采集的代码逻辑
|
||||||
|
|
||||||
|
// 进行Topic指标获取
|
||||||
List<TopicMetrics> metricsList = getTopicMetrics(clusterDO.getId());
|
List<TopicMetrics> metricsList = getTopicMetrics(clusterDO.getId());
|
||||||
|
|
||||||
|
// 获取到Topic流量指标之后,发布一个事件,
|
||||||
SpringTool.publish(new TopicMetricsCollectedEvent(this, clusterDO.getId(), metricsList));
|
SpringTool.publish(new TopicMetricsCollectedEvent(this, clusterDO.getId(), metricsList));
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<TopicMetrics> getTopicMetrics(Long clusterId) {
|
private List<TopicMetrics> getTopicMetrics(Long clusterId) {
|
||||||
|
// 具体获取Topic流量指标的入口代码
|
||||||
List<TopicMetrics> metricsList =
|
List<TopicMetrics> metricsList =
|
||||||
jmxService.getTopicMetrics(clusterId, KafkaMetricsCollections.TOPIC_METRICS_TO_DB, true);
|
jmxService.getTopicMetrics(clusterId, KafkaMetricsCollections.TOPIC_METRICS_TO_DB, true);
|
||||||
if (ValidateUtils.isEmptyList(metricsList)) {
|
if (ValidateUtils.isEmptyList(metricsList)) {
|
||||||
|
|||||||
@@ -14,13 +14,14 @@ import org.springframework.scheduling.annotation.Scheduled;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author zengqiao
|
* @author zengqiao
|
||||||
* @date 20/7/23
|
* @date 20/7/23
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
public class FlushTopicRetentionTime {
|
public class FlushTopicProperties {
|
||||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@@ -33,7 +34,7 @@ public class FlushTopicRetentionTime {
|
|||||||
try {
|
try {
|
||||||
flush(clusterDO);
|
flush(clusterDO);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("flush topic retention time failed, clusterId:{}.", clusterDO.getId(), e);
|
LOGGER.error("flush topic properties failed, clusterId:{}.", clusterDO.getId(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -41,22 +42,20 @@ public class FlushTopicRetentionTime {
|
|||||||
private void flush(ClusterDO clusterDO) {
|
private void flush(ClusterDO clusterDO) {
|
||||||
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId());
|
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId());
|
||||||
if (ValidateUtils.isNull(zkConfig)) {
|
if (ValidateUtils.isNull(zkConfig)) {
|
||||||
LOGGER.error("flush topic retention time, get zk config failed, clusterId:{}.", clusterDO.getId());
|
LOGGER.error("flush topic properties, get zk config failed, clusterId:{}.", clusterDO.getId());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
|
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
|
||||||
try {
|
try {
|
||||||
Long retentionTime = KafkaZookeeperUtils.getTopicRetentionTime(zkConfig, topicName);
|
Properties properties = KafkaZookeeperUtils.getTopicProperties(zkConfig, topicName);
|
||||||
if (retentionTime == null) {
|
if (ValidateUtils.isNull(properties)) {
|
||||||
LOGGER.warn("get topic retentionTime failed, clusterId:{} topicName:{}.",
|
LOGGER.warn("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName);
|
||||||
clusterDO.getId(), topicName);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
PhysicalClusterMetadataManager.putTopicRetentionTime(clusterDO.getId(), topicName, retentionTime);
|
PhysicalClusterMetadataManager.putTopicProperties(clusterDO.getId(), topicName, properties);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("get topic retentionTime failed, clusterId:{} topicName:{}.",
|
LOGGER.error("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
|
||||||
clusterDO.getId(), topicName, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -19,7 +19,7 @@
|
|||||||
<springframework.boot.version>2.1.1.RELEASE</springframework.boot.version>
|
<springframework.boot.version>2.1.1.RELEASE</springframework.boot.version>
|
||||||
<spring-version>5.1.3.RELEASE</spring-version>
|
<spring-version>5.1.3.RELEASE</spring-version>
|
||||||
<failOnMissingWebXml>false</failOnMissingWebXml>
|
<failOnMissingWebXml>false</failOnMissingWebXml>
|
||||||
<tomcat.version>8.5.66</tomcat.version>
|
<tomcat.version>8.5.72</tomcat.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|||||||
@@ -61,10 +61,7 @@ public class NormalTopicController {
|
|||||||
@ApiOperation(value = "Topic基本信息", notes = "")
|
@ApiOperation(value = "Topic基本信息", notes = "")
|
||||||
@RequestMapping(value = "{clusterId}/topics/{topicName}/basic-info", method = RequestMethod.GET)
|
@RequestMapping(value = "{clusterId}/topics/{topicName}/basic-info", method = RequestMethod.GET)
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
public Result<TopicBasicVO> getTopicBasic(
|
public Result<TopicBasicVO> getTopicBasic(@PathVariable Long clusterId, @PathVariable String topicName, @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
|
||||||
@PathVariable Long clusterId,
|
|
||||||
@PathVariable String topicName,
|
|
||||||
@RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
|
|
||||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId);
|
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId);
|
||||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||||
|
|||||||
@@ -1,15 +1,16 @@
|
|||||||
package com.xiaojukeji.kafka.manager.web.converters;
|
package com.xiaojukeji.kafka.manager.web.converters;
|
||||||
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.ao.account.Account;
|
|
||||||
import com.xiaojukeji.kafka.manager.bpm.common.OrderResult;
|
import com.xiaojukeji.kafka.manager.bpm.common.OrderResult;
|
||||||
|
import com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum;
|
||||||
import com.xiaojukeji.kafka.manager.bpm.common.entry.BaseOrderDetailData;
|
import com.xiaojukeji.kafka.manager.bpm.common.entry.BaseOrderDetailData;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.account.Account;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.OrderDO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.common.AccountVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.common.AccountVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.OrderResultVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.OrderResultVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.OrderVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.OrderVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.detail.OrderDetailBaseVO;
|
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.detail.OrderDetailBaseVO;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.CopyUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.CopyUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.OrderDO;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -41,7 +42,9 @@ public class OrderConverter {
|
|||||||
}
|
}
|
||||||
OrderVO orderVO = new OrderVO();
|
OrderVO orderVO = new OrderVO();
|
||||||
CopyUtils.copyProperties(orderVO, orderDO);
|
CopyUtils.copyProperties(orderVO, orderDO);
|
||||||
orderVO.setGmtTime(orderDO.getGmtCreate());
|
if (OrderStatusEnum.WAIT_DEAL.getCode().equals(orderDO.getStatus())) {
|
||||||
|
orderVO.setGmtHandle(null);
|
||||||
|
}
|
||||||
return orderVO;
|
return orderVO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ public class TopicMineConverter {
|
|||||||
vo.setClusterName(data.getLogicalClusterName());
|
vo.setClusterName(data.getLogicalClusterName());
|
||||||
vo.setBytesIn(data.getBytesIn());
|
vo.setBytesIn(data.getBytesIn());
|
||||||
vo.setBytesOut(data.getBytesOut());
|
vo.setBytesOut(data.getBytesOut());
|
||||||
|
vo.setDescription(data.getDescription());
|
||||||
voList.add(vo);
|
voList.add(vo);
|
||||||
}
|
}
|
||||||
return voList;
|
return voList;
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ public class TopicModelConverter {
|
|||||||
vo.setReplicaNum(dto.getReplicaNum());
|
vo.setReplicaNum(dto.getReplicaNum());
|
||||||
vo.setPrincipals(dto.getPrincipals());
|
vo.setPrincipals(dto.getPrincipals());
|
||||||
vo.setRetentionTime(dto.getRetentionTime());
|
vo.setRetentionTime(dto.getRetentionTime());
|
||||||
|
vo.setRetentionBytes(dto.getRetentionBytes());
|
||||||
vo.setCreateTime(dto.getCreateTime());
|
vo.setCreateTime(dto.getCreateTime());
|
||||||
vo.setModifyTime(dto.getModifyTime());
|
vo.setModifyTime(dto.getModifyTime());
|
||||||
vo.setScore(dto.getScore());
|
vo.setScore(dto.getScore());
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ server:
|
|||||||
spring:
|
spring:
|
||||||
application:
|
application:
|
||||||
name: kafkamanager
|
name: kafkamanager
|
||||||
|
profiles:
|
||||||
|
active: dev
|
||||||
datasource:
|
datasource:
|
||||||
kafka-manager:
|
kafka-manager:
|
||||||
jdbc-url: jdbc:mysql://localhost:3306/logi_kafka_manager?characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
|
jdbc-url: jdbc:mysql://localhost:3306/logi_kafka_manager?characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
|
||||||
|
|||||||
6
pom.xml
6
pom.xml
@@ -16,7 +16,7 @@
|
|||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<kafka-manager.revision>2.4.2-SNAPSHOT</kafka-manager.revision>
|
<kafka-manager.revision>2.5</kafka-manager.revision>
|
||||||
<swagger2.version>2.7.0</swagger2.version>
|
<swagger2.version>2.7.0</swagger2.version>
|
||||||
<swagger.version>1.5.13</swagger.version>
|
<swagger.version>1.5.13</swagger.version>
|
||||||
|
|
||||||
@@ -26,7 +26,7 @@
|
|||||||
<java_target_version>1.8</java_target_version>
|
<java_target_version>1.8</java_target_version>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<file_encoding>UTF-8</file_encoding>
|
<file_encoding>UTF-8</file_encoding>
|
||||||
<tomcat.version>8.5.66</tomcat.version>
|
<tomcat.version>8.5.72</tomcat.version>
|
||||||
<maven-assembly-plugin.version>3.0.0</maven-assembly-plugin.version>
|
<maven-assembly-plugin.version>3.0.0</maven-assembly-plugin.version>
|
||||||
|
|
||||||
</properties>
|
</properties>
|
||||||
@@ -150,7 +150,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
<artifactId>jackson-databind</artifactId>
|
<artifactId>jackson-databind</artifactId>
|
||||||
<version>2.9.10.5</version>
|
<version>2.9.10.8</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- commons -->
|
<!-- commons -->
|
||||||
|
|||||||
Reference in New Issue
Block a user