diff --git a/README.md b/README.md
index e69e1688..739b2309 100644
--- a/README.md
+++ b/README.md
@@ -1,13 +1,13 @@
---
-
+
**一站式`Apache Kafka`集群指标监控与运维管控平台**
-阅读本README文档,您可以了解到滴滴Logi-KafkaManager的用户群体、产品定位等信息,并通过体验地址,快速体验Kafka集群指标监控与运维管控的全流程。
若滴滴Logi-KafkaManager已在贵司的生产环境进行使用,并想要获得官方更好地支持和指导,可以通过[`OCE认证`](http://obsuite.didiyun.com/open/openAuth),加入官方交流平台。
+阅读本README文档,您可以了解到滴滴Logi-KafkaManager的用户群体、产品定位等信息,并通过体验地址,快速体验Kafka集群指标监控与运维管控的全流程。
## 1 产品简介
@@ -73,15 +73,17 @@

-微信加群:关注公众号 Obsuite(官方公众号) 回复 "Logi加群"
+微信加群:添加mike_zhangliang的微信号备注Logi加群或关注公众号 云原生可观测性 回复 "Logi加群"
-
-钉钉群ID:32821440
+## 4 知识星球
+
-## 4 OCE认证
-OCE是一个认证机制和交流平台,为滴滴Logi-KafkaManager生产用户量身打造,我们会为OCE企业提供更好的技术支持,比如专属的技术沙龙、企业一对一的交流机会、专属的答疑群等,如果贵司Logi-KafkaManager上了生产,[快来加入吧](http://obsuite.didiyun.com/open/openAuth)
-
+✅知识星球首个【Kafka中文社区】,内测期免费加入~https://z.didi.cn/5gSF9
+有问必答~!
+互动有礼~!
+1100+群友一起共建国内最专业的【Kafka中文社区】
+PS:提问请尽量把问题一次性描述清楚,并告知环境信息情况哦~!如使用版本、操作步骤、报错/警告信息等,方便嘉宾们快速解答~
## 5 项目成员
@@ -97,4 +99,4 @@ OCE是一个认证机制和交流平台,为滴滴Logi-KafkaManager生产用户
## 6 协议
-`kafka-manager`基于`Apache-2.0`协议进行分发和使用,更多信息参见[协议文件](./LICENSE)
+`LogiKM`基于`Apache-2.0`协议进行分发和使用,更多信息参见[协议文件](./LICENSE)
diff --git a/container/dockerfiles/Dockerfile b/container/dockerfiles/Dockerfile
index 1ffe27e4..fa1850e3 100644
--- a/container/dockerfiles/Dockerfile
+++ b/container/dockerfiles/Dockerfile
@@ -1,43 +1,28 @@
FROM openjdk:16-jdk-alpine3.13
-LABEL author="yangvipguang"
-
-ENV VERSION 2.3.1
-
-RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
-RUN apk add --no-cache --virtual .build-deps \
- font-adobe-100dpi \
- ttf-dejavu \
- fontconfig \
- curl \
- apr \
- apr-util \
- apr-dev \
- tomcat-native \
- && apk del .build-deps
-
-RUN apk add --no-cache tini
-
-
+LABEL author="fengxsong"
+RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && apk add --no-cache tini
+ENV VERSION 2.4.2
+WORKDIR /opt/
ENV AGENT_HOME /opt/agent/
-
-WORKDIR /tmp
-
-COPY $JAR_PATH/kafka-manager.jar app.jar
-# COPY application.yml application.yml ##默认使用helm 挂载,防止敏感配置泄露
-
COPY docker-depends/config.yaml $AGENT_HOME
COPY docker-depends/jmx_prometheus_javaagent-0.15.0.jar $AGENT_HOME
ENV JAVA_AGENT="-javaagent:$AGENT_HOME/jmx_prometheus_javaagent-0.15.0.jar=9999:$AGENT_HOME/config.yaml"
ENV JAVA_HEAP_OPTS="-Xms1024M -Xmx1024M -Xmn100M "
ENV JAVA_OPTS="-verbose:gc \
- -XX:MaxMetaspaceSize=256M -XX:+DisableExplicitGC -XX:+UseStringDeduplication \
- -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:-UseContainerSupport"
+ -XX:MaxMetaspaceSize=256M -XX:+DisableExplicitGC -XX:+UseStringDeduplication \
+ -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:-UseContainerSupport"
+
+RUN wget https://github.com/didi/Logi-KafkaManager/releases/download/v${VERSION}/kafka-manager-${VERSION}.tar.gz && \
+ tar xvf kafka-manager-${VERSION}.tar.gz && \
+ mv kafka-manager-${VERSION}/kafka-manager.jar /opt/app.jar && \
+ rm -rf kafka-manager-${VERSION}*
+
EXPOSE 8080 9999
ENTRYPOINT ["tini", "--"]
-CMD ["sh","-c","java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS app.jar --spring.config.location=application.yml"]
+CMD [ "sh", "-c", "java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS app.jar --spring.config.location=application.yml"]
\ No newline at end of file
diff --git a/container/helm/Chart.lock b/container/helm/Chart.lock
new file mode 100644
index 00000000..04958b2d
--- /dev/null
+++ b/container/helm/Chart.lock
@@ -0,0 +1,6 @@
+dependencies:
+- name: mysql
+ repository: https://charts.bitnami.com/bitnami
+ version: 8.6.3
+digest: sha256:d250c463c1d78ba30a24a338a06a551503c7a736621d974fe4999d2db7f6143e
+generated: "2021-06-24T11:34:54.625217+08:00"
diff --git a/container/helm/Chart.yaml b/container/helm/Chart.yaml
index 7161f735..088abfb0 100644
--- a/container/helm/Chart.yaml
+++ b/container/helm/Chart.yaml
@@ -1,6 +1,6 @@
apiVersion: v2
name: didi-km
-description: A Helm chart for Kubernetes
+description: Logi-KafkaManager
# A chart can be either an 'application' or a 'library' chart.
#
@@ -21,4 +21,9 @@ version: 0.1.0
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
-appVersion: "1.16.0"
+appVersion: "2.4.2"
+dependencies:
+ - condition: mysql.enabled
+ name: mysql
+ repository: https://charts.bitnami.com/bitnami
+ version: 8.x.x
diff --git a/container/helm/charts/mysql-8.6.3.tgz b/container/helm/charts/mysql-8.6.3.tgz
new file mode 100644
index 00000000..c5fde140
Binary files /dev/null and b/container/helm/charts/mysql-8.6.3.tgz differ
diff --git a/container/helm/templates/configmap.yaml b/container/helm/templates/configmap.yaml
index ffc75ec5..b487f2bd 100644
--- a/container/helm/templates/configmap.yaml
+++ b/container/helm/templates/configmap.yaml
@@ -1,7 +1,17 @@
+{{- define "datasource.mysql" -}}
+{{- if .Values.mysql.enabled }}
+ {{- printf "%s-mysql" (include "didi-km.fullname" .) -}}
+{{- else -}}
+ {{- printf "%s" .Values.externalDatabase.host -}}
+{{- end -}}
+{{- end -}}
+
apiVersion: v1
kind: ConfigMap
metadata:
- name: km-cm
+ name: {{ include "didi-km.fullname" . }}-configs
+ labels:
+ {{- include "didi-km.labels" . | nindent 4 }}
data:
application.yml: |
server:
@@ -17,9 +27,9 @@ data:
name: kafkamanager
datasource:
kafka-manager:
- jdbc-url: jdbc:mysql://xxxxx:3306/kafka-manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
- username: admin
- password: admin
+ jdbc-url: jdbc:mysql://{{ include "datasource.mysql" . }}:3306/{{ .Values.mysql.auth.database }}?characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
+ username: {{ .Values.mysql.auth.username }}
+ password: {{ .Values.mysql.auth.password }}
driver-class-name: com.mysql.jdbc.Driver
main:
allow-bean-definition-overriding: true
@@ -54,7 +64,10 @@ data:
sync-topic-enabled: false # 未落盘的Topic定期同步到DB中
account:
+ # ldap settings
ldap:
+ enabled: false
+ authUserRegistration: false
kcm:
enabled: false
diff --git a/container/helm/templates/deployment.yaml b/container/helm/templates/deployment.yaml
index 4754b53e..80ab2c29 100644
--- a/container/helm/templates/deployment.yaml
+++ b/container/helm/templates/deployment.yaml
@@ -42,6 +42,10 @@ spec:
protocol: TCP
resources:
{{- toYaml .Values.resources | nindent 12 }}
+ volumeMounts:
+ - name: configs
+ mountPath: /tmp/application.yml
+ subPath: application.yml
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
@@ -54,3 +58,7 @@ spec:
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
+ volumes:
+ - name: configs
+ configMap:
+ name: {{ include "didi-km.fullname" . }}-configs
diff --git a/container/helm/values.yaml b/container/helm/values.yaml
index a5f49e40..cbb6f3d4 100644
--- a/container/helm/values.yaml
+++ b/container/helm/values.yaml
@@ -5,13 +5,14 @@
replicaCount: 1
image:
- repository: docker.io/yangvipguang/km
+ repository: docker.io/fengxsong/logi-kafka-manager
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
- tag: "v18"
+ tag: "v2.4.2"
imagePullSecrets: []
nameOverride: ""
+# fullnameOverride must set same as release name
fullnameOverride: "km"
serviceAccount:
@@ -59,10 +60,10 @@ resources:
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
limits:
- cpu: 50m
+ cpu: 500m
memory: 2048Mi
requests:
- cpu: 10m
+ cpu: 100m
memory: 200Mi
autoscaling:
@@ -77,3 +78,16 @@ nodeSelector: {}
tolerations: []
affinity: {}
+
+# more configurations are set with configmap in file template/configmap.yaml
+externalDatabase:
+ host: ""
+mysql:
+ # if enabled is set to false, then you should manually specified externalDatabase.host
+ enabled: true
+ architecture: standalone
+ auth:
+ rootPassword: "s3cretR00t"
+ database: "logi_kafka_manager"
+ username: "logi_kafka_manager"
+ password: "n0tp@55w0rd"
diff --git a/docs/assets/images/common/dingding_group.jpg b/docs/assets/images/common/dingding_group.jpg
deleted file mode 100644
index ee597d87..00000000
Binary files a/docs/assets/images/common/dingding_group.jpg and /dev/null differ
diff --git a/docs/开源版与商业版特性对比.md b/docs/开源版与商业版特性对比.md
index e9d9dbd4..853efcdb 100644
--- a/docs/开源版与商业版特性对比.md
+++ b/docs/开源版与商业版特性对比.md
@@ -19,9 +19,9 @@
| 模块 |对比指标 |底层依赖 |开源版 |商业版 |备注 |
| --- | --- | --- | --- | --- | --- |
-| 服务发现 | bootstrap地址变更对客户端无影响 | | | 是| |
-| 安全管控 | 身份鉴权(appID+password) | | | 是 | |
-| | 权限鉴权(Topic+appID) | | | 是 | |
+| 服务发现 | bootstrap地址变更对客户端无影响 | Gateway | | 是| |
+| 安全管控 | 身份鉴权(appID+password) | Gateway | | 是 | |
+| | 权限鉴权(Topic+appID) | Gateway | | 是 | |
| 指标监控 | Topic实时流量、历史流量 | | 是 | 是 | |
| | Broker实时耗时、历史耗时 | 引擎 | | 是 | |
| | 分区落盘 | 引擎 | | 是 | |
@@ -49,7 +49,7 @@
**总结**
-Logi-KafkaManager的商业特性体现在在滴滴Kafka Gateway、滴滴Kafka引擎、内部沉淀出的资源治理专家经验、可定制化的健康分算法。
+滴滴LogiKM的商业特性体现在滴滴Kafka Gateway、滴滴Kafka引擎、内部沉淀出的资源治理专家经验、可定制化的健康分算法。
从场景来看,滴滴Logi-KafkaManager的开源版本在kafka集群运维、的Topic管理、监控告警、资源治理等kafka核心场景都充分开源用户的使用需求并且有着出色的表现。而商业版相较于开源版在安全管控、流量管控、更丰富的指标监控、资源治理专家经验的具有明显提升,更加符合企业业务需求。
除此之外,商业版还可根据企业实际需求对平台源码进行定制化改造,并提供运维保障,稳定性保障,运营保障等服务。
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java
index 3a6dd478..4d569907 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java
@@ -25,6 +25,8 @@ public class TopicCreationConstant {
public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms";
+ public static final String TOPIC_RETENTION_BYTES_KEY_NAME = "retention.bytes";
+
public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L;
public static Properties createNewProperties(Long retentionTime) {
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java
index e3ea08ed..9150569b 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java
@@ -37,6 +37,8 @@ public class TopicBasicDTO {
private Long retentionTime;
+ private Long retentionBytes;
+
public Long getClusterId() {
return clusterId;
}
@@ -157,6 +159,14 @@ public class TopicBasicDTO {
this.retentionTime = retentionTime;
}
+ public Long getRetentionBytes() {
+ return retentionBytes;
+ }
+
+ public void setRetentionBytes(Long retentionBytes) {
+ this.retentionBytes = retentionBytes;
+ }
+
@Override
public String toString() {
return "TopicBasicDTO{" +
@@ -166,7 +176,7 @@ public class TopicBasicDTO {
", principals='" + principals + '\'' +
", topicName='" + topicName + '\'' +
", description='" + description + '\'' +
- ", regionNameList='" + regionNameList + '\'' +
+ ", regionNameList=" + regionNameList +
", score=" + score +
", topicCodeC='" + topicCodeC + '\'' +
", partitionNum=" + partitionNum +
@@ -175,6 +185,7 @@ public class TopicBasicDTO {
", modifyTime=" + modifyTime +
", createTime=" + createTime +
", retentionTime=" + retentionTime +
+ ", retentionBytes=" + retentionBytes +
'}';
}
}
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java
index 946a9997..b200a150 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java
@@ -33,6 +33,9 @@ public class TopicBasicVO {
@ApiModelProperty(value = "存储时间(ms)")
private Long retentionTime;
+ @ApiModelProperty(value = "单分区数据保存大小(Byte)")
+ private Long retentionBytes;
+
@ApiModelProperty(value = "创建时间")
private Long createTime;
@@ -62,12 +65,20 @@ public class TopicBasicVO {
this.clusterId = clusterId;
}
- public String getTopicCodeC() {
- return topicCodeC;
+ public String getAppId() {
+ return appId;
}
- public void setTopicCodeC(String topicCodeC) {
- this.topicCodeC = topicCodeC;
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public void setAppName(String appName) {
+ this.appName = appName;
}
public Integer getPartitionNum() {
@@ -86,22 +97,6 @@ public class TopicBasicVO {
this.replicaNum = replicaNum;
}
- public Long getModifyTime() {
- return modifyTime;
- }
-
- public void setModifyTime(Long modifyTime) {
- this.modifyTime = modifyTime;
- }
-
- public Long getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(Long createTime) {
- this.createTime = createTime;
- }
-
public String getPrincipals() {
return principals;
}
@@ -110,30 +105,6 @@ public class TopicBasicVO {
this.principals = principals;
}
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public void setAppId(String appId) {
- this.appId = appId;
- }
-
- public void setBootstrapServers(String bootstrapServers) {
- this.bootstrapServers = bootstrapServers;
- }
-
- public String getAppId() {
- return appId;
- }
-
- public String getBootstrapServers() {
- return bootstrapServers;
- }
-
public Long getRetentionTime() {
return retentionTime;
}
@@ -142,12 +113,28 @@ public class TopicBasicVO {
this.retentionTime = retentionTime;
}
- public String getAppName() {
- return appName;
+ public Long getRetentionBytes() {
+ return retentionBytes;
}
- public void setAppName(String appName) {
- this.appName = appName;
+ public void setRetentionBytes(Long retentionBytes) {
+ this.retentionBytes = retentionBytes;
+ }
+
+ public Long getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Long createTime) {
+ this.createTime = createTime;
+ }
+
+ public Long getModifyTime() {
+ return modifyTime;
+ }
+
+ public void setModifyTime(Long modifyTime) {
+ this.modifyTime = modifyTime;
}
public Integer getScore() {
@@ -158,6 +145,30 @@ public class TopicBasicVO {
this.score = score;
}
+ public String getTopicCodeC() {
+ return topicCodeC;
+ }
+
+ public void setTopicCodeC(String topicCodeC) {
+ this.topicCodeC = topicCodeC;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
public List getRegionNameList() {
return regionNameList;
}
@@ -176,6 +187,7 @@ public class TopicBasicVO {
", replicaNum=" + replicaNum +
", principals='" + principals + '\'' +
", retentionTime=" + retentionTime +
+ ", retentionBytes=" + retentionBytes +
", createTime=" + createTime +
", modifyTime=" + modifyTime +
", score=" + score +
diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/factory/KafkaConsumerFactory.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/factory/KafkaConsumerFactory.java
index 68109779..5964d162 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/factory/KafkaConsumerFactory.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/factory/KafkaConsumerFactory.java
@@ -1,7 +1,7 @@
package com.xiaojukeji.kafka.manager.common.utils.factory;
-import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
+import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
@@ -16,7 +16,7 @@ import java.util.Properties;
* @author zengqiao
* @date 20/8/24
*/
-public class KafkaConsumerFactory extends BasePooledObjectFactory {
+public class KafkaConsumerFactory extends BasePooledObjectFactory> {
private ClusterDO clusterDO;
public KafkaConsumerFactory(ClusterDO clusterDO) {
@@ -25,17 +25,17 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory
@Override
public KafkaConsumer create() {
- return new KafkaConsumer(createKafkaConsumerProperties(clusterDO));
+ return new KafkaConsumer(createKafkaConsumerProperties(clusterDO));
}
@Override
- public PooledObject wrap(KafkaConsumer obj) {
- return new DefaultPooledObject(obj);
+ public PooledObject> wrap(KafkaConsumer obj) {
+ return new DefaultPooledObject<>(obj);
}
@Override
- public void destroyObject(final PooledObject p) throws Exception {
- KafkaConsumer kafkaConsumer = p.getObject();
+ public void destroyObject(final PooledObject> p) throws Exception {
+ KafkaConsumer kafkaConsumer = p.getObject();
if (ValidateUtils.isNull(kafkaConsumer)) {
return;
}
@@ -57,7 +57,7 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
return properties;
}
- properties.putAll(JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class));
+ properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
return properties;
}
}
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java
index 41fd0092..3fd6aaac 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java
@@ -17,6 +17,9 @@ public class ConsumerMetadataCache {
private static final Map CG_METADATA_IN_BK_MAP = new ConcurrentHashMap<>();
+ private ConsumerMetadataCache() {
+ }
+
public static void putConsumerMetadataInZK(Long clusterId, ConsumerMetadata consumerMetadata) {
if (clusterId == null || consumerMetadata == null) {
return;
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java
index 921b13ba..56e17ae5 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java
@@ -1,7 +1,7 @@
package com.xiaojukeji.kafka.manager.service.cache;
-import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
+import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
import kafka.admin.AdminClient;
@@ -26,19 +26,22 @@ import java.util.concurrent.locks.ReentrantLock;
* @date 19/12/24
*/
public class KafkaClientPool {
- private final static Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
/**
* AdminClient
*/
- private static Map AdminClientMap = new ConcurrentHashMap<>();
+ private static final Map ADMIN_CLIENT_MAP = new ConcurrentHashMap<>();
- private static Map> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
+ private static final Map> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
- private static Map> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
+ private static final Map>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
private static ReentrantLock lock = new ReentrantLock();
+ private KafkaClientPool() {
+ }
+
private static void initKafkaProducerMap(Long clusterId) {
ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterId);
if (clusterDO == null) {
@@ -55,7 +58,7 @@ public class KafkaClientPool {
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
- KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer(properties));
+ KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<>(properties));
} catch (Exception e) {
LOGGER.error("create kafka producer failed, clusterDO:{}.", clusterDO, e);
} finally {
@@ -77,25 +80,22 @@ public class KafkaClientPool {
if (ValidateUtils.isNull(kafkaProducer)) {
return false;
}
- kafkaProducer.send(new ProducerRecord(topicName, data));
+ kafkaProducer.send(new ProducerRecord<>(topicName, data));
return true;
}
private static void initKafkaConsumerPool(ClusterDO clusterDO) {
lock.lock();
try {
- GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
+ GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
if (objectPool != null) {
return;
}
- GenericObjectPoolConfig config = new GenericObjectPoolConfig();
+ GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>();
config.setMaxIdle(24);
config.setMinIdle(24);
config.setMaxTotal(24);
- KAFKA_CONSUMER_POOL.put(
- clusterDO.getId(),
- new GenericObjectPool(new KafkaConsumerFactory(clusterDO), config)
- );
+ KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config));
} catch (Exception e) {
LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e);
} finally {
@@ -106,7 +106,7 @@ public class KafkaClientPool {
public static void closeKafkaConsumerPool(Long clusterId) {
lock.lock();
try {
- GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
+ GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
if (objectPool == null) {
return;
}
@@ -118,11 +118,11 @@ public class KafkaClientPool {
}
}
- public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
+ public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
if (ValidateUtils.isNull(clusterDO)) {
return null;
}
- GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
+ GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
if (ValidateUtils.isNull(objectPool)) {
initKafkaConsumerPool(clusterDO);
objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
@@ -139,11 +139,11 @@ public class KafkaClientPool {
return null;
}
- public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) {
+ public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) {
if (ValidateUtils.isNull(physicalClusterId) || ValidateUtils.isNull(kafkaConsumer)) {
return;
}
- GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
+ GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
if (ValidateUtils.isNull(objectPool)) {
return;
}
@@ -155,7 +155,7 @@ public class KafkaClientPool {
}
public static AdminClient getAdminClient(Long clusterId) {
- AdminClient adminClient = AdminClientMap.get(clusterId);
+ AdminClient adminClient = ADMIN_CLIENT_MAP.get(clusterId);
if (adminClient != null) {
return adminClient;
}
@@ -166,26 +166,26 @@ public class KafkaClientPool {
Properties properties = createProperties(clusterDO, false);
lock.lock();
try {
- adminClient = AdminClientMap.get(clusterId);
+ adminClient = ADMIN_CLIENT_MAP.get(clusterId);
if (adminClient != null) {
return adminClient;
}
- AdminClientMap.put(clusterId, AdminClient.create(properties));
+ ADMIN_CLIENT_MAP.put(clusterId, AdminClient.create(properties));
} catch (Exception e) {
LOGGER.error("create kafka admin client failed, clusterId:{}.", clusterId, e);
} finally {
lock.unlock();
}
- return AdminClientMap.get(clusterId);
+ return ADMIN_CLIENT_MAP.get(clusterId);
}
public static void closeAdminClient(ClusterDO cluster) {
- if (AdminClientMap.containsKey(cluster.getId())) {
- AdminClientMap.get(cluster.getId()).close();
+ if (ADMIN_CLIENT_MAP.containsKey(cluster.getId())) {
+ ADMIN_CLIENT_MAP.get(cluster.getId()).close();
}
}
- public static Properties createProperties(ClusterDO clusterDO, Boolean serialize) {
+ public static Properties createProperties(ClusterDO clusterDO, boolean serialize) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterDO.getBootstrapServers());
if (serialize) {
@@ -198,8 +198,7 @@ public class KafkaClientPool {
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
return properties;
}
- Properties securityProperties = JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class);
- properties.putAll(securityProperties);
+ properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
return properties;
}
}
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java
index 011bc1e6..7ba1e304 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java
@@ -14,7 +14,10 @@ public class KafkaMetricsCache {
/**
*
*/
- private static Map> TopicMetricsMap = new ConcurrentHashMap<>();
+ private static final Map> TOPIC_METRICS_MAP = new ConcurrentHashMap<>();
+
+ private KafkaMetricsCache() {
+ }
public static void putTopicMetricsToCache(Long clusterId, List dataList) {
if (clusterId == null || dataList == null) {
@@ -24,22 +27,22 @@ public class KafkaMetricsCache {
for (TopicMetrics topicMetrics : dataList) {
subMetricsMap.put(topicMetrics.getTopicName(), topicMetrics);
}
- TopicMetricsMap.put(clusterId, subMetricsMap);
+ TOPIC_METRICS_MAP.put(clusterId, subMetricsMap);
}
public static Map getTopicMetricsFromCache(Long clusterId) {
- return TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
+ return TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
}
public static Map> getAllTopicMetricsFromCache() {
- return TopicMetricsMap;
+ return TOPIC_METRICS_MAP;
}
public static TopicMetrics getTopicMetricsFromCache(Long clusterId, String topicName) {
if (clusterId == null || topicName == null) {
return null;
}
- Map subMap = TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
+ Map subMap = TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
return subMap.get(topicName);
}
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java
index 5cd81581..744101ef 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java
@@ -160,7 +160,7 @@ public class LogicalClusterMetadataManager {
public void flush() {
List logicalClusterDOList = logicalClusterService.listAll();
if (ValidateUtils.isNull(logicalClusterDOList)) {
- logicalClusterDOList = Collections.EMPTY_LIST;
+ logicalClusterDOList = Collections.emptyList();
}
Set inDbLogicalClusterIds = logicalClusterDOList.stream()
.map(LogicalClusterDO::getId)
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
index 631b254f..c5f09820 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
@@ -3,10 +3,12 @@ package com.xiaojukeji.kafka.manager.service.cache;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
+import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
+import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
@@ -37,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Service
public class PhysicalClusterMetadataManager {
- private final static Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
@Autowired
private ControllerDao controllerDao;
@@ -48,22 +50,22 @@ public class PhysicalClusterMetadataManager {
@Autowired
private ClusterService clusterService;
- private final static Map CLUSTER_MAP = new ConcurrentHashMap<>();
+ private static final Map CLUSTER_MAP = new ConcurrentHashMap<>();
- private final static Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
+ private static final Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
- private final static Map ZK_CONFIG_MAP = new ConcurrentHashMap<>();
+ private static final Map ZK_CONFIG_MAP = new ConcurrentHashMap<>();
- private final static Map> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
+ private static final Map> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
- private final static Map> TOPIC_RETENTION_TIME_MAP = new ConcurrentHashMap<>();
+ private static final Map> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
- private final static Map> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
+ private static final Map> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
/**
* JXM连接, 延迟连接
*/
- private final static Map> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
+ private static final Map> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
/**
* KafkaBroker版本, 延迟获取
@@ -95,7 +97,7 @@ public class PhysicalClusterMetadataManager {
// 初始化topic-map
TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
- TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
+ TOPIC_PROPERTIES_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
// 初始化cluster-map
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
@@ -158,7 +160,7 @@ public class PhysicalClusterMetadataManager {
KAFKA_VERSION_MAP.remove(clusterId);
TOPIC_METADATA_MAP.remove(clusterId);
- TOPIC_RETENTION_TIME_MAP.remove(clusterId);
+ TOPIC_PROPERTIES_MAP.remove(clusterId);
CLUSTER_MAP.remove(clusterId);
}
@@ -262,24 +264,45 @@ public class PhysicalClusterMetadataManager {
//---------------------------配置相关元信息--------------
- public static void putTopicRetentionTime(Long clusterId, String topicName, Long retentionTime) {
- Map timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
- if (timeMap == null) {
+ public static void putTopicProperties(Long clusterId, String topicName, Properties properties) {
+ if (ValidateUtils.isNull(clusterId) || ValidateUtils.isBlank(topicName) || ValidateUtils.isNull(properties)) {
return;
}
- timeMap.put(topicName, retentionTime);
+
+ Map propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
+ if (ValidateUtils.isNull(propertiesMap)) {
+ return;
+ }
+ propertiesMap.put(topicName, properties);
}
public static Long getTopicRetentionTime(Long clusterId, String topicName) {
- Map timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
- if (timeMap == null) {
+ Map propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
+ if (ValidateUtils.isNull(propertiesMap)) {
return null;
}
- return timeMap.get(topicName);
+
+ Properties properties = propertiesMap.get(topicName);
+ if (ValidateUtils.isNull(properties)) {
+ return null;
+ }
+
+ return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_TIME_KEY_NAME));
}
+ public static Long getTopicRetentionBytes(Long clusterId, String topicName) {
+ Map propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
+ if (ValidateUtils.isNull(propertiesMap)) {
+ return null;
+ }
+ Properties properties = propertiesMap.get(topicName);
+ if (ValidateUtils.isNull(properties)) {
+ return null;
+ }
+ return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_BYTES_KEY_NAME));
+ }
//---------------------------Broker元信息相关--------------
@@ -375,7 +398,7 @@ public class PhysicalClusterMetadataManager {
KafkaBrokerRoleEnum roleEnum) {
BrokerMetadata brokerMetadata =
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
- if (ValidateUtils.isNull(brokerMetadata)) {
+ if (brokerMetadata == null) {
return;
}
String hostname = brokerMetadata.getHost().replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, "");
@@ -415,7 +438,7 @@ public class PhysicalClusterMetadataManager {
KafkaBrokerRoleEnum roleEnum) {
BrokerMetadata brokerMetadata =
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
- if (ValidateUtils.isNull(brokerMetadata)) {
+ if (brokerMetadata == null) {
return;
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
index 63191888..154faf77 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
@@ -223,6 +223,7 @@ public class TopicServiceImpl implements TopicService {
basicDTO.setCreateTime(topicMetadata.getCreateTime());
basicDTO.setModifyTime(topicMetadata.getModifyTime());
basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName));
+ basicDTO.setRetentionBytes(PhysicalClusterMetadataManager.getTopicRetentionBytes(clusterId, topicName));
TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName);
if (!ValidateUtils.isNull(topicDO)) {
@@ -648,10 +649,11 @@ public class TopicServiceImpl implements TopicService {
List dataList = new ArrayList<>();
int currentSize = dataList.size();
while (dataList.size() < maxMsgNum) {
+ if (remainingWaitMs <= 0) {
+ break;
+ }
+
try {
- if (remainingWaitMs <= 0) {
- break;
- }
ConsumerRecords records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS);
for (ConsumerRecord record : records) {
String value = (String) record.value();
@@ -661,20 +663,22 @@ public class TopicServiceImpl implements TopicService {
: value
);
}
- // 当前批次一条数据都没拉取到,则结束拉取
- if (dataList.size() - currentSize == 0) {
- break;
- }
- currentSize = dataList.size();
- // 检查是否超时
- long elapsed = System.currentTimeMillis() - begin;
- if (elapsed >= maxWaitMs) {
- break;
- }
- remainingWaitMs = maxWaitMs - elapsed;
} catch (Exception e) {
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
}
+
+ // 当前批次一条数据都没拉取到,则结束拉取
+ if (dataList.size() - currentSize == 0) {
+ break;
+ }
+ currentSize = dataList.size();
+
+ // 检查是否超时
+ long elapsed = System.currentTimeMillis() - begin;
+ if (elapsed >= maxWaitMs) {
+ break;
+ }
+ remainingWaitMs = maxWaitMs - elapsed;
}
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
}
@@ -698,14 +702,15 @@ public class TopicServiceImpl implements TopicService {
: value
);
}
- if (System.currentTimeMillis() - timestamp > timeout
- || dataList.size() >= maxMsgNum) {
- break;
- }
Thread.sleep(10);
} catch (Exception e) {
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
}
+
+ if (System.currentTimeMillis() - timestamp > timeout || dataList.size() >= maxMsgNum) {
+ // 超时或者是数据已采集足够时, 直接返回
+ break;
+ }
}
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
}
diff --git a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java
index 8f079fde..f49f7dca 100644
--- a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java
+++ b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java
@@ -75,11 +75,7 @@ public class LoginServiceImpl implements LoginService {
return false;
}
- if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)
- || classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_PREFIX)
- || classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX)
- || classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_NORMAL_PREFIX)
- || classRequestMappingValue.equals(ApiPrefix.GATEWAY_API_V1_PREFIX)) {
+ if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)) {
// 白名单接口直接true
return true;
}
diff --git a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/Converts.java b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/Converts.java
index 956491b2..39d80369 100644
--- a/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/Converts.java
+++ b/kafka-manager-extends/kafka-manager-bpm/src/main/java/com/xiaojukeji/kafka/manager/bpm/Converts.java
@@ -19,7 +19,6 @@ public class Converts {
orderDO.setApprover("");
orderDO.setOpinion("");
orderDO.setExtensions(orderDTO.getExtensions());
- orderDO.setType(orderDTO.getType());
return orderDO;
}
}
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicRetentionTime.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicProperties.java
similarity index 68%
rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicRetentionTime.java
rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicProperties.java
index 225f8393..41a8bde4 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicRetentionTime.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicProperties.java
@@ -14,13 +14,14 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
+import java.util.Properties;
/**
* @author zengqiao
* @date 20/7/23
*/
@Component
-public class FlushTopicRetentionTime {
+public class FlushTopicProperties {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
@@ -33,7 +34,7 @@ public class FlushTopicRetentionTime {
try {
flush(clusterDO);
} catch (Exception e) {
- LOGGER.error("flush topic retention time failed, clusterId:{}.", clusterDO.getId(), e);
+ LOGGER.error("flush topic properties failed, clusterId:{}.", clusterDO.getId(), e);
}
}
}
@@ -41,22 +42,20 @@ public class FlushTopicRetentionTime {
private void flush(ClusterDO clusterDO) {
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId());
if (ValidateUtils.isNull(zkConfig)) {
- LOGGER.error("flush topic retention time, get zk config failed, clusterId:{}.", clusterDO.getId());
+ LOGGER.error("flush topic properties, get zk config failed, clusterId:{}.", clusterDO.getId());
return;
}
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
try {
- Long retentionTime = KafkaZookeeperUtils.getTopicRetentionTime(zkConfig, topicName);
- if (retentionTime == null) {
- LOGGER.warn("get topic retentionTime failed, clusterId:{} topicName:{}.",
- clusterDO.getId(), topicName);
+ Properties properties = KafkaZookeeperUtils.getTopicProperties(zkConfig, topicName);
+ if (ValidateUtils.isNull(properties)) {
+ LOGGER.warn("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName);
continue;
}
- PhysicalClusterMetadataManager.putTopicRetentionTime(clusterDO.getId(), topicName, retentionTime);
+ PhysicalClusterMetadataManager.putTopicProperties(clusterDO.getId(), topicName, properties);
} catch (Exception e) {
- LOGGER.error("get topic retentionTime failed, clusterId:{} topicName:{}.",
- clusterDO.getId(), topicName, e);
+ LOGGER.error("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
}
}
}
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java
index aaac290f..bb42cadd 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java
@@ -61,10 +61,7 @@ public class NormalTopicController {
@ApiOperation(value = "Topic基本信息", notes = "")
@RequestMapping(value = "{clusterId}/topics/{topicName}/basic-info", method = RequestMethod.GET)
@ResponseBody
- public Result getTopicBasic(
- @PathVariable Long clusterId,
- @PathVariable String topicName,
- @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
+ public Result getTopicBasic(@PathVariable Long clusterId, @PathVariable String topicName, @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId);
if (ValidateUtils.isNull(physicalClusterId)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java
index 4e28ca8b..c7364cb5 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java
@@ -31,6 +31,7 @@ public class TopicModelConverter {
vo.setReplicaNum(dto.getReplicaNum());
vo.setPrincipals(dto.getPrincipals());
vo.setRetentionTime(dto.getRetentionTime());
+ vo.setRetentionBytes(dto.getRetentionBytes());
vo.setCreateTime(dto.getCreateTime());
vo.setModifyTime(dto.getModifyTime());
vo.setScore(dto.getScore());
diff --git a/pom.xml b/pom.xml
index f94e109f..6a841081 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,7 +152,7 @@
com.fasterxml.jackson.core
jackson-databind
- 2.9.10.5
+ 2.9.10.8