mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
Merge remote-tracking branch 'origin/master' into v2.4.3
This commit is contained in:
20
README.md
20
README.md
@@ -1,13 +1,13 @@
|
||||
|
||||
---
|
||||
|
||||

|
||||

|
||||
|
||||
**一站式`Apache Kafka`集群指标监控与运维管控平台**
|
||||
|
||||
|
||||
|
||||
阅读本README文档,您可以了解到滴滴Logi-KafkaManager的用户群体、产品定位等信息,并通过体验地址,快速体验Kafka集群指标监控与运维管控的全流程。<br>若滴滴Logi-KafkaManager已在贵司的生产环境进行使用,并想要获得官方更好地支持和指导,可以通过[`OCE认证`](http://obsuite.didiyun.com/open/openAuth),加入官方交流平台。
|
||||
阅读本README文档,您可以了解到滴滴Logi-KafkaManager的用户群体、产品定位等信息,并通过体验地址,快速体验Kafka集群指标监控与运维管控的全流程。
|
||||
|
||||
|
||||
## 1 产品简介
|
||||
@@ -73,15 +73,17 @@
|
||||
|
||||
|
||||

|
||||
微信加群:关注公众号 Obsuite(官方公众号) 回复 "Logi加群"
|
||||
微信加群:添加mike_zhangliang的微信号备注Logi加群或关注公众号 云原生可观测性 回复 "Logi加群"
|
||||
|
||||

|
||||
钉钉群ID:32821440
|
||||
## 4 知识星球
|
||||
|
||||

|
||||
|
||||
## 4 OCE认证
|
||||
OCE是一个认证机制和交流平台,为滴滴Logi-KafkaManager生产用户量身打造,我们会为OCE企业提供更好的技术支持,比如专属的技术沙龙、企业一对一的交流机会、专属的答疑群等,如果贵司Logi-KafkaManager上了生产,[快来加入吧](http://obsuite.didiyun.com/open/openAuth)
|
||||
|
||||
✅知识星球首个【Kafka中文社区】,内测期免费加入~https://z.didi.cn/5gSF9
|
||||
有问必答~!
|
||||
互动有礼~!
|
||||
1100+群友一起共建国内最专业的【Kafka中文社区】
|
||||
PS:提问请尽量把问题一次性描述清楚,并告知环境信息情况哦~!如使用版本、操作步骤、报错/警告信息等,方便嘉宾们快速解答~
|
||||
|
||||
## 5 项目成员
|
||||
|
||||
@@ -97,4 +99,4 @@ OCE是一个认证机制和交流平台,为滴滴Logi-KafkaManager生产用户
|
||||
|
||||
## 6 协议
|
||||
|
||||
`kafka-manager`基于`Apache-2.0`协议进行分发和使用,更多信息参见[协议文件](./LICENSE)
|
||||
`LogiKM`基于`Apache-2.0`协议进行分发和使用,更多信息参见[协议文件](./LICENSE)
|
||||
|
||||
@@ -1,43 +1,28 @@
|
||||
FROM openjdk:16-jdk-alpine3.13
|
||||
|
||||
LABEL author="yangvipguang"
|
||||
|
||||
ENV VERSION 2.3.1
|
||||
|
||||
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
|
||||
RUN apk add --no-cache --virtual .build-deps \
|
||||
font-adobe-100dpi \
|
||||
ttf-dejavu \
|
||||
fontconfig \
|
||||
curl \
|
||||
apr \
|
||||
apr-util \
|
||||
apr-dev \
|
||||
tomcat-native \
|
||||
&& apk del .build-deps
|
||||
|
||||
RUN apk add --no-cache tini
|
||||
|
||||
|
||||
LABEL author="fengxsong"
|
||||
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && apk add --no-cache tini
|
||||
|
||||
ENV VERSION 2.4.2
|
||||
WORKDIR /opt/
|
||||
|
||||
ENV AGENT_HOME /opt/agent/
|
||||
|
||||
WORKDIR /tmp
|
||||
|
||||
COPY $JAR_PATH/kafka-manager.jar app.jar
|
||||
# COPY application.yml application.yml ##默认使用helm 挂载,防止敏感配置泄露
|
||||
|
||||
COPY docker-depends/config.yaml $AGENT_HOME
|
||||
COPY docker-depends/jmx_prometheus_javaagent-0.15.0.jar $AGENT_HOME
|
||||
|
||||
ENV JAVA_AGENT="-javaagent:$AGENT_HOME/jmx_prometheus_javaagent-0.15.0.jar=9999:$AGENT_HOME/config.yaml"
|
||||
ENV JAVA_HEAP_OPTS="-Xms1024M -Xmx1024M -Xmn100M "
|
||||
ENV JAVA_OPTS="-verbose:gc \
|
||||
-XX:MaxMetaspaceSize=256M -XX:+DisableExplicitGC -XX:+UseStringDeduplication \
|
||||
-XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:-UseContainerSupport"
|
||||
-XX:MaxMetaspaceSize=256M -XX:+DisableExplicitGC -XX:+UseStringDeduplication \
|
||||
-XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:-UseContainerSupport"
|
||||
|
||||
RUN wget https://github.com/didi/Logi-KafkaManager/releases/download/v${VERSION}/kafka-manager-${VERSION}.tar.gz && \
|
||||
tar xvf kafka-manager-${VERSION}.tar.gz && \
|
||||
mv kafka-manager-${VERSION}/kafka-manager.jar /opt/app.jar && \
|
||||
rm -rf kafka-manager-${VERSION}*
|
||||
|
||||
EXPOSE 8080 9999
|
||||
|
||||
ENTRYPOINT ["tini", "--"]
|
||||
|
||||
CMD ["sh","-c","java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS app.jar --spring.config.location=application.yml"]
|
||||
CMD [ "sh", "-c", "java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS app.jar --spring.config.location=application.yml"]
|
||||
6
container/helm/Chart.lock
Normal file
6
container/helm/Chart.lock
Normal file
@@ -0,0 +1,6 @@
|
||||
dependencies:
|
||||
- name: mysql
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 8.6.3
|
||||
digest: sha256:d250c463c1d78ba30a24a338a06a551503c7a736621d974fe4999d2db7f6143e
|
||||
generated: "2021-06-24T11:34:54.625217+08:00"
|
||||
@@ -1,6 +1,6 @@
|
||||
apiVersion: v2
|
||||
name: didi-km
|
||||
description: A Helm chart for Kubernetes
|
||||
description: Logi-KafkaManager
|
||||
|
||||
# A chart can be either an 'application' or a 'library' chart.
|
||||
#
|
||||
@@ -21,4 +21,9 @@ version: 0.1.0
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "1.16.0"
|
||||
appVersion: "2.4.2"
|
||||
dependencies:
|
||||
- condition: mysql.enabled
|
||||
name: mysql
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 8.x.x
|
||||
|
||||
BIN
container/helm/charts/mysql-8.6.3.tgz
Normal file
BIN
container/helm/charts/mysql-8.6.3.tgz
Normal file
Binary file not shown.
@@ -1,7 +1,17 @@
|
||||
{{- define "datasource.mysql" -}}
|
||||
{{- if .Values.mysql.enabled }}
|
||||
{{- printf "%s-mysql" (include "didi-km.fullname" .) -}}
|
||||
{{- else -}}
|
||||
{{- printf "%s" .Values.externalDatabase.host -}}
|
||||
{{- end -}}
|
||||
{{- end -}}
|
||||
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: km-cm
|
||||
name: {{ include "didi-km.fullname" . }}-configs
|
||||
labels:
|
||||
{{- include "didi-km.labels" . | nindent 4 }}
|
||||
data:
|
||||
application.yml: |
|
||||
server:
|
||||
@@ -17,9 +27,9 @@ data:
|
||||
name: kafkamanager
|
||||
datasource:
|
||||
kafka-manager:
|
||||
jdbc-url: jdbc:mysql://xxxxx:3306/kafka-manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
|
||||
username: admin
|
||||
password: admin
|
||||
jdbc-url: jdbc:mysql://{{ include "datasource.mysql" . }}:3306/{{ .Values.mysql.auth.database }}?characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
|
||||
username: {{ .Values.mysql.auth.username }}
|
||||
password: {{ .Values.mysql.auth.password }}
|
||||
driver-class-name: com.mysql.jdbc.Driver
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
@@ -54,7 +64,10 @@ data:
|
||||
sync-topic-enabled: false # 未落盘的Topic定期同步到DB中
|
||||
|
||||
account:
|
||||
# ldap settings
|
||||
ldap:
|
||||
enabled: false
|
||||
authUserRegistration: false
|
||||
|
||||
kcm:
|
||||
enabled: false
|
||||
|
||||
@@ -42,6 +42,10 @@ spec:
|
||||
protocol: TCP
|
||||
resources:
|
||||
{{- toYaml .Values.resources | nindent 12 }}
|
||||
volumeMounts:
|
||||
- name: configs
|
||||
mountPath: /tmp/application.yml
|
||||
subPath: application.yml
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
@@ -54,3 +58,7 @@ spec:
|
||||
tolerations:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
volumes:
|
||||
- name: configs
|
||||
configMap:
|
||||
name: {{ include "didi-km.fullname" . }}-configs
|
||||
|
||||
@@ -5,13 +5,14 @@
|
||||
replicaCount: 1
|
||||
|
||||
image:
|
||||
repository: docker.io/yangvipguang/km
|
||||
repository: docker.io/fengxsong/logi-kafka-manager
|
||||
pullPolicy: IfNotPresent
|
||||
# Overrides the image tag whose default is the chart appVersion.
|
||||
tag: "v18"
|
||||
tag: "v2.4.2"
|
||||
|
||||
imagePullSecrets: []
|
||||
nameOverride: ""
|
||||
# fullnameOverride must set same as release name
|
||||
fullnameOverride: "km"
|
||||
|
||||
serviceAccount:
|
||||
@@ -59,10 +60,10 @@ resources:
|
||||
# resources, such as Minikube. If you do want to specify resources, uncomment the following
|
||||
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
|
||||
limits:
|
||||
cpu: 50m
|
||||
cpu: 500m
|
||||
memory: 2048Mi
|
||||
requests:
|
||||
cpu: 10m
|
||||
cpu: 100m
|
||||
memory: 200Mi
|
||||
|
||||
autoscaling:
|
||||
@@ -77,3 +78,16 @@ nodeSelector: {}
|
||||
tolerations: []
|
||||
|
||||
affinity: {}
|
||||
|
||||
# more configurations are set with configmap in file template/configmap.yaml
|
||||
externalDatabase:
|
||||
host: ""
|
||||
mysql:
|
||||
# if enabled is set to false, then you should manually specified externalDatabase.host
|
||||
enabled: true
|
||||
architecture: standalone
|
||||
auth:
|
||||
rootPassword: "s3cretR00t"
|
||||
database: "logi_kafka_manager"
|
||||
username: "logi_kafka_manager"
|
||||
password: "n0tp@55w0rd"
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 20 KiB |
@@ -19,9 +19,9 @@
|
||||
|
||||
| 模块 |对比指标 |底层依赖 |开源版 |商业版 |备注 |
|
||||
| --- | --- | --- | --- | --- | --- |
|
||||
| 服务发现 | bootstrap地址变更对客户端无影响 | | | 是| |
|
||||
| 安全管控 | 身份鉴权(appID+password) | | | 是 | |
|
||||
| | 权限鉴权(Topic+appID) | | | 是 | |
|
||||
| 服务发现 | bootstrap地址变更对客户端无影响 | Gateway | | 是| |
|
||||
| 安全管控 | 身份鉴权(appID+password) | Gateway | | 是 | |
|
||||
| | 权限鉴权(Topic+appID) | Gateway | | 是 | |
|
||||
| 指标监控 | Topic实时流量、历史流量 | | 是 | 是 | |
|
||||
| | Broker实时耗时、历史耗时 | 引擎 | | 是 | |
|
||||
| | 分区落盘 | 引擎 | | 是 | |
|
||||
@@ -49,7 +49,7 @@
|
||||
|
||||
**总结**
|
||||
|
||||
Logi-KafkaManager的商业特性体现在在滴滴Kafka Gateway、滴滴Kafka引擎、内部沉淀出的资源治理专家经验、可定制化的健康分算法。
|
||||
滴滴LogiKM的商业特性体现在滴滴Kafka Gateway、滴滴Kafka引擎、内部沉淀出的资源治理专家经验、可定制化的健康分算法。
|
||||
从场景来看,滴滴Logi-KafkaManager的开源版本在kafka集群运维、的Topic管理、监控告警、资源治理等kafka核心场景都充分开源用户的使用需求并且有着出色的表现。而商业版相较于开源版在安全管控、流量管控、更丰富的指标监控、资源治理专家经验的具有明显提升,更加符合企业业务需求。
|
||||
除此之外,商业版还可根据企业实际需求对平台源码进行定制化改造,并提供运维保障,稳定性保障,运营保障等服务。
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@ public class TopicCreationConstant {
|
||||
|
||||
public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms";
|
||||
|
||||
public static final String TOPIC_RETENTION_BYTES_KEY_NAME = "retention.bytes";
|
||||
|
||||
public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L;
|
||||
|
||||
public static Properties createNewProperties(Long retentionTime) {
|
||||
|
||||
@@ -37,6 +37,8 @@ public class TopicBasicDTO {
|
||||
|
||||
private Long retentionTime;
|
||||
|
||||
private Long retentionBytes;
|
||||
|
||||
public Long getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
@@ -157,6 +159,14 @@ public class TopicBasicDTO {
|
||||
this.retentionTime = retentionTime;
|
||||
}
|
||||
|
||||
public Long getRetentionBytes() {
|
||||
return retentionBytes;
|
||||
}
|
||||
|
||||
public void setRetentionBytes(Long retentionBytes) {
|
||||
this.retentionBytes = retentionBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TopicBasicDTO{" +
|
||||
@@ -166,7 +176,7 @@ public class TopicBasicDTO {
|
||||
", principals='" + principals + '\'' +
|
||||
", topicName='" + topicName + '\'' +
|
||||
", description='" + description + '\'' +
|
||||
", regionNameList='" + regionNameList + '\'' +
|
||||
", regionNameList=" + regionNameList +
|
||||
", score=" + score +
|
||||
", topicCodeC='" + topicCodeC + '\'' +
|
||||
", partitionNum=" + partitionNum +
|
||||
@@ -175,6 +185,7 @@ public class TopicBasicDTO {
|
||||
", modifyTime=" + modifyTime +
|
||||
", createTime=" + createTime +
|
||||
", retentionTime=" + retentionTime +
|
||||
", retentionBytes=" + retentionBytes +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,9 @@ public class TopicBasicVO {
|
||||
@ApiModelProperty(value = "存储时间(ms)")
|
||||
private Long retentionTime;
|
||||
|
||||
@ApiModelProperty(value = "单分区数据保存大小(Byte)")
|
||||
private Long retentionBytes;
|
||||
|
||||
@ApiModelProperty(value = "创建时间")
|
||||
private Long createTime;
|
||||
|
||||
@@ -62,12 +65,20 @@ public class TopicBasicVO {
|
||||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
public String getTopicCodeC() {
|
||||
return topicCodeC;
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public void setTopicCodeC(String topicCodeC) {
|
||||
this.topicCodeC = topicCodeC;
|
||||
public void setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public String getAppName() {
|
||||
return appName;
|
||||
}
|
||||
|
||||
public void setAppName(String appName) {
|
||||
this.appName = appName;
|
||||
}
|
||||
|
||||
public Integer getPartitionNum() {
|
||||
@@ -86,22 +97,6 @@ public class TopicBasicVO {
|
||||
this.replicaNum = replicaNum;
|
||||
}
|
||||
|
||||
public Long getModifyTime() {
|
||||
return modifyTime;
|
||||
}
|
||||
|
||||
public void setModifyTime(Long modifyTime) {
|
||||
this.modifyTime = modifyTime;
|
||||
}
|
||||
|
||||
public Long getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
public void setCreateTime(Long createTime) {
|
||||
this.createTime = createTime;
|
||||
}
|
||||
|
||||
public String getPrincipals() {
|
||||
return principals;
|
||||
}
|
||||
@@ -110,30 +105,6 @@ public class TopicBasicVO {
|
||||
this.principals = principals;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public void setDescription(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public void setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
public void setBootstrapServers(String bootstrapServers) {
|
||||
this.bootstrapServers = bootstrapServers;
|
||||
}
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public String getBootstrapServers() {
|
||||
return bootstrapServers;
|
||||
}
|
||||
|
||||
public Long getRetentionTime() {
|
||||
return retentionTime;
|
||||
}
|
||||
@@ -142,12 +113,28 @@ public class TopicBasicVO {
|
||||
this.retentionTime = retentionTime;
|
||||
}
|
||||
|
||||
public String getAppName() {
|
||||
return appName;
|
||||
public Long getRetentionBytes() {
|
||||
return retentionBytes;
|
||||
}
|
||||
|
||||
public void setAppName(String appName) {
|
||||
this.appName = appName;
|
||||
public void setRetentionBytes(Long retentionBytes) {
|
||||
this.retentionBytes = retentionBytes;
|
||||
}
|
||||
|
||||
public Long getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
public void setCreateTime(Long createTime) {
|
||||
this.createTime = createTime;
|
||||
}
|
||||
|
||||
public Long getModifyTime() {
|
||||
return modifyTime;
|
||||
}
|
||||
|
||||
public void setModifyTime(Long modifyTime) {
|
||||
this.modifyTime = modifyTime;
|
||||
}
|
||||
|
||||
public Integer getScore() {
|
||||
@@ -158,6 +145,30 @@ public class TopicBasicVO {
|
||||
this.score = score;
|
||||
}
|
||||
|
||||
public String getTopicCodeC() {
|
||||
return topicCodeC;
|
||||
}
|
||||
|
||||
public void setTopicCodeC(String topicCodeC) {
|
||||
this.topicCodeC = topicCodeC;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public void setDescription(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public String getBootstrapServers() {
|
||||
return bootstrapServers;
|
||||
}
|
||||
|
||||
public void setBootstrapServers(String bootstrapServers) {
|
||||
this.bootstrapServers = bootstrapServers;
|
||||
}
|
||||
|
||||
public List<String> getRegionNameList() {
|
||||
return regionNameList;
|
||||
}
|
||||
@@ -176,6 +187,7 @@ public class TopicBasicVO {
|
||||
", replicaNum=" + replicaNum +
|
||||
", principals='" + principals + '\'' +
|
||||
", retentionTime=" + retentionTime +
|
||||
", retentionBytes=" + retentionBytes +
|
||||
", createTime=" + createTime +
|
||||
", modifyTime=" + modifyTime +
|
||||
", score=" + score +
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.xiaojukeji.kafka.manager.common.utils.factory;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import org.apache.commons.pool2.BasePooledObjectFactory;
|
||||
import org.apache.commons.pool2.PooledObject;
|
||||
@@ -16,7 +16,7 @@ import java.util.Properties;
|
||||
* @author zengqiao
|
||||
* @date 20/8/24
|
||||
*/
|
||||
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer> {
|
||||
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer<String, String>> {
|
||||
private ClusterDO clusterDO;
|
||||
|
||||
public KafkaConsumerFactory(ClusterDO clusterDO) {
|
||||
@@ -25,17 +25,17 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
|
||||
|
||||
@Override
|
||||
public KafkaConsumer create() {
|
||||
return new KafkaConsumer(createKafkaConsumerProperties(clusterDO));
|
||||
return new KafkaConsumer<String, String>(createKafkaConsumerProperties(clusterDO));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PooledObject<KafkaConsumer> wrap(KafkaConsumer obj) {
|
||||
return new DefaultPooledObject<KafkaConsumer>(obj);
|
||||
public PooledObject<KafkaConsumer<String, String>> wrap(KafkaConsumer<String, String> obj) {
|
||||
return new DefaultPooledObject<>(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroyObject(final PooledObject<KafkaConsumer> p) throws Exception {
|
||||
KafkaConsumer kafkaConsumer = p.getObject();
|
||||
public void destroyObject(final PooledObject<KafkaConsumer<String, String>> p) throws Exception {
|
||||
KafkaConsumer<String, String> kafkaConsumer = p.getObject();
|
||||
if (ValidateUtils.isNull(kafkaConsumer)) {
|
||||
return;
|
||||
}
|
||||
@@ -57,7 +57,7 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
|
||||
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
|
||||
return properties;
|
||||
}
|
||||
properties.putAll(JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class));
|
||||
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,9 @@ public class ConsumerMetadataCache {
|
||||
|
||||
private static final Map<Long, ConsumerMetadata> CG_METADATA_IN_BK_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private ConsumerMetadataCache() {
|
||||
}
|
||||
|
||||
public static void putConsumerMetadataInZK(Long clusterId, ConsumerMetadata consumerMetadata) {
|
||||
if (clusterId == null || consumerMetadata == null) {
|
||||
return;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.xiaojukeji.kafka.manager.service.cache;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
|
||||
import kafka.admin.AdminClient;
|
||||
@@ -26,19 +26,22 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
* @date 19/12/24
|
||||
*/
|
||||
public class KafkaClientPool {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
|
||||
|
||||
/**
|
||||
* AdminClient
|
||||
*/
|
||||
private static Map<Long, AdminClient> AdminClientMap = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, AdminClient> ADMIN_CLIENT_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private static Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private static Map<Long, GenericObjectPool<KafkaConsumer>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, GenericObjectPool<KafkaConsumer<String, String>>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
|
||||
|
||||
private static ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private KafkaClientPool() {
|
||||
}
|
||||
|
||||
private static void initKafkaProducerMap(Long clusterId) {
|
||||
ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterId);
|
||||
if (clusterDO == null) {
|
||||
@@ -55,7 +58,7 @@ public class KafkaClientPool {
|
||||
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
|
||||
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
|
||||
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
|
||||
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<String, String>(properties));
|
||||
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<>(properties));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("create kafka producer failed, clusterDO:{}.", clusterDO, e);
|
||||
} finally {
|
||||
@@ -77,25 +80,22 @@ public class KafkaClientPool {
|
||||
if (ValidateUtils.isNull(kafkaProducer)) {
|
||||
return false;
|
||||
}
|
||||
kafkaProducer.send(new ProducerRecord<String, String>(topicName, data));
|
||||
kafkaProducer.send(new ProducerRecord<>(topicName, data));
|
||||
return true;
|
||||
}
|
||||
|
||||
private static void initKafkaConsumerPool(ClusterDO clusterDO) {
|
||||
lock.lock();
|
||||
try {
|
||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
if (objectPool != null) {
|
||||
return;
|
||||
}
|
||||
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
|
||||
GenericObjectPoolConfig<KafkaConsumer<String, String>> config = new GenericObjectPoolConfig<>();
|
||||
config.setMaxIdle(24);
|
||||
config.setMinIdle(24);
|
||||
config.setMaxTotal(24);
|
||||
KAFKA_CONSUMER_POOL.put(
|
||||
clusterDO.getId(),
|
||||
new GenericObjectPool<KafkaConsumer>(new KafkaConsumerFactory(clusterDO), config)
|
||||
);
|
||||
KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e);
|
||||
} finally {
|
||||
@@ -106,7 +106,7 @@ public class KafkaClientPool {
|
||||
public static void closeKafkaConsumerPool(Long clusterId) {
|
||||
lock.lock();
|
||||
try {
|
||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
|
||||
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
|
||||
if (objectPool == null) {
|
||||
return;
|
||||
}
|
||||
@@ -118,11 +118,11 @@ public class KafkaClientPool {
|
||||
}
|
||||
}
|
||||
|
||||
public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
|
||||
public static KafkaConsumer<String, String> borrowKafkaConsumerClient(ClusterDO clusterDO) {
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
return null;
|
||||
}
|
||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
if (ValidateUtils.isNull(objectPool)) {
|
||||
initKafkaConsumerPool(clusterDO);
|
||||
objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
@@ -139,11 +139,11 @@ public class KafkaClientPool {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) {
|
||||
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer<String, String> kafkaConsumer) {
|
||||
if (ValidateUtils.isNull(physicalClusterId) || ValidateUtils.isNull(kafkaConsumer)) {
|
||||
return;
|
||||
}
|
||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
|
||||
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
|
||||
if (ValidateUtils.isNull(objectPool)) {
|
||||
return;
|
||||
}
|
||||
@@ -155,7 +155,7 @@ public class KafkaClientPool {
|
||||
}
|
||||
|
||||
public static AdminClient getAdminClient(Long clusterId) {
|
||||
AdminClient adminClient = AdminClientMap.get(clusterId);
|
||||
AdminClient adminClient = ADMIN_CLIENT_MAP.get(clusterId);
|
||||
if (adminClient != null) {
|
||||
return adminClient;
|
||||
}
|
||||
@@ -166,26 +166,26 @@ public class KafkaClientPool {
|
||||
Properties properties = createProperties(clusterDO, false);
|
||||
lock.lock();
|
||||
try {
|
||||
adminClient = AdminClientMap.get(clusterId);
|
||||
adminClient = ADMIN_CLIENT_MAP.get(clusterId);
|
||||
if (adminClient != null) {
|
||||
return adminClient;
|
||||
}
|
||||
AdminClientMap.put(clusterId, AdminClient.create(properties));
|
||||
ADMIN_CLIENT_MAP.put(clusterId, AdminClient.create(properties));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("create kafka admin client failed, clusterId:{}.", clusterId, e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return AdminClientMap.get(clusterId);
|
||||
return ADMIN_CLIENT_MAP.get(clusterId);
|
||||
}
|
||||
|
||||
public static void closeAdminClient(ClusterDO cluster) {
|
||||
if (AdminClientMap.containsKey(cluster.getId())) {
|
||||
AdminClientMap.get(cluster.getId()).close();
|
||||
if (ADMIN_CLIENT_MAP.containsKey(cluster.getId())) {
|
||||
ADMIN_CLIENT_MAP.get(cluster.getId()).close();
|
||||
}
|
||||
}
|
||||
|
||||
public static Properties createProperties(ClusterDO clusterDO, Boolean serialize) {
|
||||
public static Properties createProperties(ClusterDO clusterDO, boolean serialize) {
|
||||
Properties properties = new Properties();
|
||||
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterDO.getBootstrapServers());
|
||||
if (serialize) {
|
||||
@@ -198,8 +198,7 @@ public class KafkaClientPool {
|
||||
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
|
||||
return properties;
|
||||
}
|
||||
Properties securityProperties = JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class);
|
||||
properties.putAll(securityProperties);
|
||||
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
@@ -14,7 +14,10 @@ public class KafkaMetricsCache {
|
||||
/**
|
||||
* <clusterId, Metrics List>
|
||||
*/
|
||||
private static Map<Long, Map<String, TopicMetrics>> TopicMetricsMap = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<String, TopicMetrics>> TOPIC_METRICS_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private KafkaMetricsCache() {
|
||||
}
|
||||
|
||||
public static void putTopicMetricsToCache(Long clusterId, List<TopicMetrics> dataList) {
|
||||
if (clusterId == null || dataList == null) {
|
||||
@@ -24,22 +27,22 @@ public class KafkaMetricsCache {
|
||||
for (TopicMetrics topicMetrics : dataList) {
|
||||
subMetricsMap.put(topicMetrics.getTopicName(), topicMetrics);
|
||||
}
|
||||
TopicMetricsMap.put(clusterId, subMetricsMap);
|
||||
TOPIC_METRICS_MAP.put(clusterId, subMetricsMap);
|
||||
}
|
||||
|
||||
public static Map<String, TopicMetrics> getTopicMetricsFromCache(Long clusterId) {
|
||||
return TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
|
||||
return TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static Map<Long, Map<String, TopicMetrics>> getAllTopicMetricsFromCache() {
|
||||
return TopicMetricsMap;
|
||||
return TOPIC_METRICS_MAP;
|
||||
}
|
||||
|
||||
public static TopicMetrics getTopicMetricsFromCache(Long clusterId, String topicName) {
|
||||
if (clusterId == null || topicName == null) {
|
||||
return null;
|
||||
}
|
||||
Map<String, TopicMetrics> subMap = TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
|
||||
Map<String, TopicMetrics> subMap = TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
|
||||
return subMap.get(topicName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,7 @@ public class LogicalClusterMetadataManager {
|
||||
public void flush() {
|
||||
List<LogicalClusterDO> logicalClusterDOList = logicalClusterService.listAll();
|
||||
if (ValidateUtils.isNull(logicalClusterDOList)) {
|
||||
logicalClusterDOList = Collections.EMPTY_LIST;
|
||||
logicalClusterDOList = Collections.emptyList();
|
||||
}
|
||||
Set<Long> inDbLogicalClusterIds = logicalClusterDOList.stream()
|
||||
.map(LogicalClusterDO::getId)
|
||||
|
||||
@@ -3,10 +3,12 @@ package com.xiaojukeji.kafka.manager.service.cache;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
|
||||
@@ -37,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
*/
|
||||
@Service
|
||||
public class PhysicalClusterMetadataManager {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
|
||||
|
||||
@Autowired
|
||||
private ControllerDao controllerDao;
|
||||
@@ -48,22 +50,22 @@ public class PhysicalClusterMetadataManager {
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
private final static Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, Map<String, Long>> TOPIC_RETENTION_TIME_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<String, Properties>> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* JXM连接, 延迟连接
|
||||
*/
|
||||
private final static Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* KafkaBroker版本, 延迟获取
|
||||
@@ -95,7 +97,7 @@ public class PhysicalClusterMetadataManager {
|
||||
|
||||
// 初始化topic-map
|
||||
TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
|
||||
TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
|
||||
TOPIC_PROPERTIES_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
|
||||
|
||||
// 初始化cluster-map
|
||||
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
|
||||
@@ -158,7 +160,7 @@ public class PhysicalClusterMetadataManager {
|
||||
KAFKA_VERSION_MAP.remove(clusterId);
|
||||
|
||||
TOPIC_METADATA_MAP.remove(clusterId);
|
||||
TOPIC_RETENTION_TIME_MAP.remove(clusterId);
|
||||
TOPIC_PROPERTIES_MAP.remove(clusterId);
|
||||
CLUSTER_MAP.remove(clusterId);
|
||||
}
|
||||
|
||||
@@ -262,24 +264,45 @@ public class PhysicalClusterMetadataManager {
|
||||
|
||||
//---------------------------配置相关元信息--------------
|
||||
|
||||
public static void putTopicRetentionTime(Long clusterId, String topicName, Long retentionTime) {
|
||||
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
|
||||
if (timeMap == null) {
|
||||
public static void putTopicProperties(Long clusterId, String topicName, Properties properties) {
|
||||
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isBlank(topicName) || ValidateUtils.isNull(properties)) {
|
||||
return;
|
||||
}
|
||||
timeMap.put(topicName, retentionTime);
|
||||
|
||||
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
|
||||
if (ValidateUtils.isNull(propertiesMap)) {
|
||||
return;
|
||||
}
|
||||
propertiesMap.put(topicName, properties);
|
||||
}
|
||||
|
||||
public static Long getTopicRetentionTime(Long clusterId, String topicName) {
|
||||
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
|
||||
if (timeMap == null) {
|
||||
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
|
||||
if (ValidateUtils.isNull(propertiesMap)) {
|
||||
return null;
|
||||
}
|
||||
return timeMap.get(topicName);
|
||||
|
||||
Properties properties = propertiesMap.get(topicName);
|
||||
if (ValidateUtils.isNull(properties)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_TIME_KEY_NAME));
|
||||
}
|
||||
|
||||
public static Long getTopicRetentionBytes(Long clusterId, String topicName) {
|
||||
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
|
||||
if (ValidateUtils.isNull(propertiesMap)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Properties properties = propertiesMap.get(topicName);
|
||||
if (ValidateUtils.isNull(properties)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_BYTES_KEY_NAME));
|
||||
}
|
||||
|
||||
//---------------------------Broker元信息相关--------------
|
||||
|
||||
@@ -375,7 +398,7 @@ public class PhysicalClusterMetadataManager {
|
||||
KafkaBrokerRoleEnum roleEnum) {
|
||||
BrokerMetadata brokerMetadata =
|
||||
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
|
||||
if (ValidateUtils.isNull(brokerMetadata)) {
|
||||
if (brokerMetadata == null) {
|
||||
return;
|
||||
}
|
||||
String hostname = brokerMetadata.getHost().replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, "");
|
||||
@@ -415,7 +438,7 @@ public class PhysicalClusterMetadataManager {
|
||||
KafkaBrokerRoleEnum roleEnum) {
|
||||
BrokerMetadata brokerMetadata =
|
||||
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
|
||||
if (ValidateUtils.isNull(brokerMetadata)) {
|
||||
if (brokerMetadata == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -223,6 +223,7 @@ public class TopicServiceImpl implements TopicService {
|
||||
basicDTO.setCreateTime(topicMetadata.getCreateTime());
|
||||
basicDTO.setModifyTime(topicMetadata.getModifyTime());
|
||||
basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName));
|
||||
basicDTO.setRetentionBytes(PhysicalClusterMetadataManager.getTopicRetentionBytes(clusterId, topicName));
|
||||
|
||||
TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName);
|
||||
if (!ValidateUtils.isNull(topicDO)) {
|
||||
@@ -648,10 +649,11 @@ public class TopicServiceImpl implements TopicService {
|
||||
List<String> dataList = new ArrayList<>();
|
||||
int currentSize = dataList.size();
|
||||
while (dataList.size() < maxMsgNum) {
|
||||
if (remainingWaitMs <= 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
if (remainingWaitMs <= 0) {
|
||||
break;
|
||||
}
|
||||
ConsumerRecords<String, String> records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS);
|
||||
for (ConsumerRecord record : records) {
|
||||
String value = (String) record.value();
|
||||
@@ -661,20 +663,22 @@ public class TopicServiceImpl implements TopicService {
|
||||
: value
|
||||
);
|
||||
}
|
||||
// 当前批次一条数据都没拉取到,则结束拉取
|
||||
if (dataList.size() - currentSize == 0) {
|
||||
break;
|
||||
}
|
||||
currentSize = dataList.size();
|
||||
// 检查是否超时
|
||||
long elapsed = System.currentTimeMillis() - begin;
|
||||
if (elapsed >= maxWaitMs) {
|
||||
break;
|
||||
}
|
||||
remainingWaitMs = maxWaitMs - elapsed;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
|
||||
}
|
||||
|
||||
// 当前批次一条数据都没拉取到,则结束拉取
|
||||
if (dataList.size() - currentSize == 0) {
|
||||
break;
|
||||
}
|
||||
currentSize = dataList.size();
|
||||
|
||||
// 检查是否超时
|
||||
long elapsed = System.currentTimeMillis() - begin;
|
||||
if (elapsed >= maxWaitMs) {
|
||||
break;
|
||||
}
|
||||
remainingWaitMs = maxWaitMs - elapsed;
|
||||
}
|
||||
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
|
||||
}
|
||||
@@ -698,14 +702,15 @@ public class TopicServiceImpl implements TopicService {
|
||||
: value
|
||||
);
|
||||
}
|
||||
if (System.currentTimeMillis() - timestamp > timeout
|
||||
|| dataList.size() >= maxMsgNum) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(10);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() - timestamp > timeout || dataList.size() >= maxMsgNum) {
|
||||
// 超时或者是数据已采集足够时, 直接返回
|
||||
break;
|
||||
}
|
||||
}
|
||||
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
|
||||
}
|
||||
|
||||
@@ -75,11 +75,7 @@ public class LoginServiceImpl implements LoginService {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)
|
||||
|| classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_PREFIX)
|
||||
|| classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX)
|
||||
|| classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_NORMAL_PREFIX)
|
||||
|| classRequestMappingValue.equals(ApiPrefix.GATEWAY_API_V1_PREFIX)) {
|
||||
if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)) {
|
||||
// 白名单接口直接true
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ public class Converts {
|
||||
orderDO.setApprover("");
|
||||
orderDO.setOpinion("");
|
||||
orderDO.setExtensions(orderDTO.getExtensions());
|
||||
orderDO.setType(orderDTO.getType());
|
||||
return orderDO;
|
||||
}
|
||||
}
|
||||
@@ -14,13 +14,14 @@ import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/7/23
|
||||
*/
|
||||
@Component
|
||||
public class FlushTopicRetentionTime {
|
||||
public class FlushTopicProperties {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
|
||||
|
||||
@Autowired
|
||||
@@ -33,7 +34,7 @@ public class FlushTopicRetentionTime {
|
||||
try {
|
||||
flush(clusterDO);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("flush topic retention time failed, clusterId:{}.", clusterDO.getId(), e);
|
||||
LOGGER.error("flush topic properties failed, clusterId:{}.", clusterDO.getId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -41,22 +42,20 @@ public class FlushTopicRetentionTime {
|
||||
private void flush(ClusterDO clusterDO) {
|
||||
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId());
|
||||
if (ValidateUtils.isNull(zkConfig)) {
|
||||
LOGGER.error("flush topic retention time, get zk config failed, clusterId:{}.", clusterDO.getId());
|
||||
LOGGER.error("flush topic properties, get zk config failed, clusterId:{}.", clusterDO.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
|
||||
try {
|
||||
Long retentionTime = KafkaZookeeperUtils.getTopicRetentionTime(zkConfig, topicName);
|
||||
if (retentionTime == null) {
|
||||
LOGGER.warn("get topic retentionTime failed, clusterId:{} topicName:{}.",
|
||||
clusterDO.getId(), topicName);
|
||||
Properties properties = KafkaZookeeperUtils.getTopicProperties(zkConfig, topicName);
|
||||
if (ValidateUtils.isNull(properties)) {
|
||||
LOGGER.warn("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName);
|
||||
continue;
|
||||
}
|
||||
PhysicalClusterMetadataManager.putTopicRetentionTime(clusterDO.getId(), topicName, retentionTime);
|
||||
PhysicalClusterMetadataManager.putTopicProperties(clusterDO.getId(), topicName, properties);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("get topic retentionTime failed, clusterId:{} topicName:{}.",
|
||||
clusterDO.getId(), topicName, e);
|
||||
LOGGER.error("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -61,10 +61,7 @@ public class NormalTopicController {
|
||||
@ApiOperation(value = "Topic基本信息", notes = "")
|
||||
@RequestMapping(value = "{clusterId}/topics/{topicName}/basic-info", method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
public Result<TopicBasicVO> getTopicBasic(
|
||||
@PathVariable Long clusterId,
|
||||
@PathVariable String topicName,
|
||||
@RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
|
||||
public Result<TopicBasicVO> getTopicBasic(@PathVariable Long clusterId, @PathVariable String topicName, @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId);
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
|
||||
@@ -31,6 +31,7 @@ public class TopicModelConverter {
|
||||
vo.setReplicaNum(dto.getReplicaNum());
|
||||
vo.setPrincipals(dto.getPrincipals());
|
||||
vo.setRetentionTime(dto.getRetentionTime());
|
||||
vo.setRetentionBytes(dto.getRetentionBytes());
|
||||
vo.setCreateTime(dto.getCreateTime());
|
||||
vo.setModifyTime(dto.getModifyTime());
|
||||
vo.setScore(dto.getScore());
|
||||
|
||||
Reference in New Issue
Block a user