Merge remote-tracking branch 'origin/master' into dev_v2.5.0

This commit is contained in:
shirenchuang
2021-11-15 11:45:22 +08:00
27 changed files with 284 additions and 200 deletions

View File

@@ -1,13 +1,13 @@
---
![kafka-manager-logo](./docs/assets/images/common/logo_name.png)
![logikm_logo](https://user-images.githubusercontent.com/71620349/125024570-9e07a100-e0b3-11eb-8ebc-22e73e056771.png)
**一站式`Apache Kafka`集群指标监控与运维管控平台**
阅读本README文档您可以了解到滴滴Logi-KafkaManager的用户群体、产品定位等信息并通过体验地址快速体验Kafka集群指标监控与运维管控的全流程。<br>若滴滴Logi-KafkaManager已在贵司的生产环境进行使用并想要获得官方更好地支持和指导可以通过[`OCE认证`](http://obsuite.didiyun.com/open/openAuth),加入官方交流平台。
阅读本README文档您可以了解到滴滴Logi-KafkaManager的用户群体、产品定位等信息并通过体验地址快速体验Kafka集群指标监控与运维管控的全流程。
## 1 产品简介
@@ -73,15 +73,17 @@
![image](https://user-images.githubusercontent.com/5287750/111266722-e531d800-8665-11eb-9242-3484da5a3099.png)
微信加群:关注公众号 Obsuite(官方公众号) 回复 "Logi加群"
微信加群:添加mike_zhangliang的微信号备注Logi加群或关注公众号 云原生可观测性 回复 "Logi加群"
![dingding_group](./docs/assets/images/common/dingding_group.jpg)
钉钉群ID32821440
## 4 知识星球
![image](https://user-images.githubusercontent.com/51046167/140718512-5ab1b336-5c48-46c0-90bd-44b5c7e168c8.png)
## 4 OCE认证
OCE是一个认证机制和交流平台为滴滴Logi-KafkaManager生产用户量身打造我们会为OCE企业提供更好的技术支持比如专属的技术沙龙、企业一对一的交流机会、专属的答疑群等如果贵司Logi-KafkaManager上了生产[快来加入吧](http://obsuite.didiyun.com/open/openAuth)
✅知识星球首个【Kafka中文社区】内测期免费加入https://z.didi.cn/5gSF9
有问必答~
互动有礼~
1100+群友一起共建国内最专业的【Kafka中文社区】
PS:提问请尽量把问题一次性描述清楚,并告知环境信息情况哦~!如使用版本、操作步骤、报错/警告信息等,方便嘉宾们快速解答~
## 5 项目成员
@@ -97,4 +99,4 @@ OCE是一个认证机制和交流平台为滴滴Logi-KafkaManager生产用户
## 6 协议
`kafka-manager`基于`Apache-2.0`协议进行分发和使用,更多信息参见[协议文件](./LICENSE)
`LogiKM`基于`Apache-2.0`协议进行分发和使用,更多信息参见[协议文件](./LICENSE)

View File

@@ -1,43 +1,28 @@
FROM openjdk:16-jdk-alpine3.13
LABEL author="yangvipguang"
ENV VERSION 2.3.1
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
RUN apk add --no-cache --virtual .build-deps \
font-adobe-100dpi \
ttf-dejavu \
fontconfig \
curl \
apr \
apr-util \
apr-dev \
tomcat-native \
&& apk del .build-deps
RUN apk add --no-cache tini
LABEL author="fengxsong"
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && apk add --no-cache tini
ENV VERSION 2.4.2
WORKDIR /opt/
ENV AGENT_HOME /opt/agent/
WORKDIR /tmp
COPY $JAR_PATH/kafka-manager.jar app.jar
# COPY application.yml application.yml ##默认使用helm 挂载,防止敏感配置泄露
COPY docker-depends/config.yaml $AGENT_HOME
COPY docker-depends/jmx_prometheus_javaagent-0.15.0.jar $AGENT_HOME
ENV JAVA_AGENT="-javaagent:$AGENT_HOME/jmx_prometheus_javaagent-0.15.0.jar=9999:$AGENT_HOME/config.yaml"
ENV JAVA_HEAP_OPTS="-Xms1024M -Xmx1024M -Xmn100M "
ENV JAVA_OPTS="-verbose:gc \
-XX:MaxMetaspaceSize=256M -XX:+DisableExplicitGC -XX:+UseStringDeduplication \
-XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:-UseContainerSupport"
-XX:MaxMetaspaceSize=256M -XX:+DisableExplicitGC -XX:+UseStringDeduplication \
-XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:-UseContainerSupport"
RUN wget https://github.com/didi/Logi-KafkaManager/releases/download/v${VERSION}/kafka-manager-${VERSION}.tar.gz && \
tar xvf kafka-manager-${VERSION}.tar.gz && \
mv kafka-manager-${VERSION}/kafka-manager.jar /opt/app.jar && \
rm -rf kafka-manager-${VERSION}*
EXPOSE 8080 9999
ENTRYPOINT ["tini", "--"]
CMD ["sh","-c","java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS app.jar --spring.config.location=application.yml"]
CMD [ "sh", "-c", "java -jar $JAVA_AGENT $JAVA_HEAP_OPTS $JAVA_OPTS app.jar --spring.config.location=application.yml"]

View File

@@ -0,0 +1,6 @@
dependencies:
- name: mysql
repository: https://charts.bitnami.com/bitnami
version: 8.6.3
digest: sha256:d250c463c1d78ba30a24a338a06a551503c7a736621d974fe4999d2db7f6143e
generated: "2021-06-24T11:34:54.625217+08:00"

View File

@@ -1,6 +1,6 @@
apiVersion: v2
name: didi-km
description: A Helm chart for Kubernetes
description: Logi-KafkaManager
# A chart can be either an 'application' or a 'library' chart.
#
@@ -21,4 +21,9 @@ version: 0.1.0
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.16.0"
appVersion: "2.4.2"
dependencies:
- condition: mysql.enabled
name: mysql
repository: https://charts.bitnami.com/bitnami
version: 8.x.x

Binary file not shown.

View File

@@ -1,7 +1,17 @@
{{- define "datasource.mysql" -}}
{{- if .Values.mysql.enabled }}
{{- printf "%s-mysql" (include "didi-km.fullname" .) -}}
{{- else -}}
{{- printf "%s" .Values.externalDatabase.host -}}
{{- end -}}
{{- end -}}
apiVersion: v1
kind: ConfigMap
metadata:
name: km-cm
name: {{ include "didi-km.fullname" . }}-configs
labels:
{{- include "didi-km.labels" . | nindent 4 }}
data:
application.yml: |
server:
@@ -17,9 +27,9 @@ data:
name: kafkamanager
datasource:
kafka-manager:
jdbc-url: jdbc:mysql://xxxxx:3306/kafka-manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
username: admin
password: admin
jdbc-url: jdbc:mysql://{{ include "datasource.mysql" . }}:3306/{{ .Values.mysql.auth.database }}?characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false
username: {{ .Values.mysql.auth.username }}
password: {{ .Values.mysql.auth.password }}
driver-class-name: com.mysql.jdbc.Driver
main:
allow-bean-definition-overriding: true
@@ -54,7 +64,10 @@ data:
sync-topic-enabled: false # 未落盘的Topic定期同步到DB中
account:
# ldap settings
ldap:
enabled: false
authUserRegistration: false
kcm:
enabled: false

View File

@@ -42,6 +42,10 @@ spec:
protocol: TCP
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:
- name: configs
mountPath: /tmp/application.yml
subPath: application.yml
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
@@ -54,3 +58,7 @@ spec:
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
volumes:
- name: configs
configMap:
name: {{ include "didi-km.fullname" . }}-configs

View File

@@ -5,13 +5,14 @@
replicaCount: 1
image:
repository: docker.io/yangvipguang/km
repository: docker.io/fengxsong/logi-kafka-manager
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: "v18"
tag: "v2.4.2"
imagePullSecrets: []
nameOverride: ""
# fullnameOverride must set same as release name
fullnameOverride: "km"
serviceAccount:
@@ -59,10 +60,10 @@ resources:
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
limits:
cpu: 50m
cpu: 500m
memory: 2048Mi
requests:
cpu: 10m
cpu: 100m
memory: 200Mi
autoscaling:
@@ -77,3 +78,16 @@ nodeSelector: {}
tolerations: []
affinity: {}
# more configurations are set with configmap in file template/configmap.yaml
externalDatabase:
host: ""
mysql:
# if enabled is set to false, then you should manually specified externalDatabase.host
enabled: true
architecture: standalone
auth:
rootPassword: "s3cretR00t"
database: "logi_kafka_manager"
username: "logi_kafka_manager"
password: "n0tp@55w0rd"

Binary file not shown.

Before

Width:  |  Height:  |  Size: 20 KiB

View File

@@ -19,9 +19,9 @@
| 模块 |对比指标 |底层依赖 |开源版 |商业版 |备注 |
| --- | --- | --- | --- | --- | --- |
| 服务发现 | bootstrap地址变更对客户端无影响 | | | 是| |
| 安全管控 | 身份鉴权appID+password | | | 是 | |
| | 权限鉴权Topic+appID | | | 是 | |
| 服务发现 | bootstrap地址变更对客户端无影响 | Gateway | | 是| |
| 安全管控 | 身份鉴权appID+password | Gateway | | 是 | |
| | 权限鉴权Topic+appID | Gateway | | 是 | |
| 指标监控 | Topic实时流量、历史流量 | | 是 | 是 | |
| | Broker实时耗时、历史耗时 | 引擎 | | 是 | |
| | 分区落盘 | 引擎 | | 是 | |
@@ -49,7 +49,7 @@
**总结**
Logi-KafkaManager的商业特性体现在滴滴Kafka Gateway、滴滴Kafka引擎、内部沉淀出的资源治理专家经验、可定制化的健康分算法。
滴滴LogiKM的商业特性体现在滴滴Kafka Gateway、滴滴Kafka引擎、内部沉淀出的资源治理专家经验、可定制化的健康分算法。
从场景来看滴滴Logi-KafkaManager的开源版本在kafka集群运维、的Topic管理、监控告警、资源治理等kafka核心场景都充分开源用户的使用需求并且有着出色的表现。而商业版相较于开源版在安全管控、流量管控、更丰富的指标监控、资源治理专家经验的具有明显提升更加符合企业业务需求。
除此之外,商业版还可根据企业实际需求对平台源码进行定制化改造,并提供运维保障,稳定性保障,运营保障等服务。

View File

@@ -25,6 +25,8 @@ public class TopicCreationConstant {
public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms";
public static final String TOPIC_RETENTION_BYTES_KEY_NAME = "retention.bytes";
public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L;
public static Properties createNewProperties(Long retentionTime) {

View File

@@ -37,6 +37,8 @@ public class TopicBasicDTO {
private Long retentionTime;
private Long retentionBytes;
public Long getClusterId() {
return clusterId;
}
@@ -157,6 +159,14 @@ public class TopicBasicDTO {
this.retentionTime = retentionTime;
}
public Long getRetentionBytes() {
return retentionBytes;
}
public void setRetentionBytes(Long retentionBytes) {
this.retentionBytes = retentionBytes;
}
@Override
public String toString() {
return "TopicBasicDTO{" +
@@ -166,7 +176,7 @@ public class TopicBasicDTO {
", principals='" + principals + '\'' +
", topicName='" + topicName + '\'' +
", description='" + description + '\'' +
", regionNameList='" + regionNameList + '\'' +
", regionNameList=" + regionNameList +
", score=" + score +
", topicCodeC='" + topicCodeC + '\'' +
", partitionNum=" + partitionNum +
@@ -175,6 +185,7 @@ public class TopicBasicDTO {
", modifyTime=" + modifyTime +
", createTime=" + createTime +
", retentionTime=" + retentionTime +
", retentionBytes=" + retentionBytes +
'}';
}
}

View File

@@ -33,6 +33,9 @@ public class TopicBasicVO {
@ApiModelProperty(value = "存储时间(ms)")
private Long retentionTime;
@ApiModelProperty(value = "单分区数据保存大小(Byte)")
private Long retentionBytes;
@ApiModelProperty(value = "创建时间")
private Long createTime;
@@ -62,12 +65,20 @@ public class TopicBasicVO {
this.clusterId = clusterId;
}
public String getTopicCodeC() {
return topicCodeC;
public String getAppId() {
return appId;
}
public void setTopicCodeC(String topicCodeC) {
this.topicCodeC = topicCodeC;
public void setAppId(String appId) {
this.appId = appId;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public Integer getPartitionNum() {
@@ -86,22 +97,6 @@ public class TopicBasicVO {
this.replicaNum = replicaNum;
}
public Long getModifyTime() {
return modifyTime;
}
public void setModifyTime(Long modifyTime) {
this.modifyTime = modifyTime;
}
public Long getCreateTime() {
return createTime;
}
public void setCreateTime(Long createTime) {
this.createTime = createTime;
}
public String getPrincipals() {
return principals;
}
@@ -110,30 +105,6 @@ public class TopicBasicVO {
this.principals = principals;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public void setAppId(String appId) {
this.appId = appId;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getAppId() {
return appId;
}
public String getBootstrapServers() {
return bootstrapServers;
}
public Long getRetentionTime() {
return retentionTime;
}
@@ -142,12 +113,28 @@ public class TopicBasicVO {
this.retentionTime = retentionTime;
}
public String getAppName() {
return appName;
public Long getRetentionBytes() {
return retentionBytes;
}
public void setAppName(String appName) {
this.appName = appName;
public void setRetentionBytes(Long retentionBytes) {
this.retentionBytes = retentionBytes;
}
public Long getCreateTime() {
return createTime;
}
public void setCreateTime(Long createTime) {
this.createTime = createTime;
}
public Long getModifyTime() {
return modifyTime;
}
public void setModifyTime(Long modifyTime) {
this.modifyTime = modifyTime;
}
public Integer getScore() {
@@ -158,6 +145,30 @@ public class TopicBasicVO {
this.score = score;
}
public String getTopicCodeC() {
return topicCodeC;
}
public void setTopicCodeC(String topicCodeC) {
this.topicCodeC = topicCodeC;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public List<String> getRegionNameList() {
return regionNameList;
}
@@ -176,6 +187,7 @@ public class TopicBasicVO {
", replicaNum=" + replicaNum +
", principals='" + principals + '\'' +
", retentionTime=" + retentionTime +
", retentionBytes=" + retentionBytes +
", createTime=" + createTime +
", modifyTime=" + modifyTime +
", score=" + score +

View File

@@ -1,7 +1,7 @@
package com.xiaojukeji.kafka.manager.common.utils.factory;
import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
@@ -16,7 +16,7 @@ import java.util.Properties;
* @author zengqiao
* @date 20/8/24
*/
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer> {
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer<String, String>> {
private ClusterDO clusterDO;
public KafkaConsumerFactory(ClusterDO clusterDO) {
@@ -25,17 +25,17 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
@Override
public KafkaConsumer create() {
return new KafkaConsumer(createKafkaConsumerProperties(clusterDO));
return new KafkaConsumer<String, String>(createKafkaConsumerProperties(clusterDO));
}
@Override
public PooledObject<KafkaConsumer> wrap(KafkaConsumer obj) {
return new DefaultPooledObject<KafkaConsumer>(obj);
public PooledObject<KafkaConsumer<String, String>> wrap(KafkaConsumer<String, String> obj) {
return new DefaultPooledObject<>(obj);
}
@Override
public void destroyObject(final PooledObject<KafkaConsumer> p) throws Exception {
KafkaConsumer kafkaConsumer = p.getObject();
public void destroyObject(final PooledObject<KafkaConsumer<String, String>> p) throws Exception {
KafkaConsumer<String, String> kafkaConsumer = p.getObject();
if (ValidateUtils.isNull(kafkaConsumer)) {
return;
}
@@ -57,7 +57,7 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
return properties;
}
properties.putAll(JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class));
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
return properties;
}
}

View File

@@ -79,7 +79,8 @@ public class JmxConnectorWrap {
try {
Map<String, Object> environment = new HashMap<String, Object>();
if (!ValidateUtils.isBlank(this.jmxConfig.getUsername()) && !ValidateUtils.isBlank(this.jmxConfig.getPassword())) {
environment.put(JMXConnector.CREDENTIALS, Arrays.asList(this.jmxConfig.getUsername(), this.jmxConfig.getPassword()));
// fixed by riyuetianmu
environment.put(JMXConnector.CREDENTIALS, new String[]{this.jmxConfig.getUsername(), this.jmxConfig.getPassword()});
}
if (jmxConfig.isOpenSSL() != null && this.jmxConfig.isOpenSSL()) {
environment.put(Context.SECURITY_PROTOCOL, "ssl");

View File

@@ -17,6 +17,9 @@ public class ConsumerMetadataCache {
private static final Map<Long, ConsumerMetadata> CG_METADATA_IN_BK_MAP = new ConcurrentHashMap<>();
private ConsumerMetadataCache() {
}
public static void putConsumerMetadataInZK(Long clusterId, ConsumerMetadata consumerMetadata) {
if (clusterId == null || consumerMetadata == null) {
return;

View File

@@ -1,7 +1,7 @@
package com.xiaojukeji.kafka.manager.service.cache;
import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
import kafka.admin.AdminClient;
@@ -26,19 +26,22 @@ import java.util.concurrent.locks.ReentrantLock;
* @date 19/12/24
*/
public class KafkaClientPool {
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
/**
* AdminClient
*/
private static Map<Long, AdminClient> AdminClientMap = new ConcurrentHashMap<>();
private static final Map<Long, AdminClient> ADMIN_CLIENT_MAP = new ConcurrentHashMap<>();
private static Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
private static final Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
private static Map<Long, GenericObjectPool<KafkaConsumer>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
private static final Map<Long, GenericObjectPool<KafkaConsumer<String, String>>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
private static ReentrantLock lock = new ReentrantLock();
private KafkaClientPool() {
}
private static void initKafkaProducerMap(Long clusterId) {
ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterId);
if (clusterDO == null) {
@@ -55,7 +58,7 @@ public class KafkaClientPool {
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<String, String>(properties));
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<>(properties));
} catch (Exception e) {
LOGGER.error("create kafka producer failed, clusterDO:{}.", clusterDO, e);
} finally {
@@ -77,25 +80,22 @@ public class KafkaClientPool {
if (ValidateUtils.isNull(kafkaProducer)) {
return false;
}
kafkaProducer.send(new ProducerRecord<String, String>(topicName, data));
kafkaProducer.send(new ProducerRecord<>(topicName, data));
return true;
}
private static void initKafkaConsumerPool(ClusterDO clusterDO) {
lock.lock();
try {
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
if (objectPool != null) {
return;
}
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
GenericObjectPoolConfig<KafkaConsumer<String, String>> config = new GenericObjectPoolConfig<>();
config.setMaxIdle(24);
config.setMinIdle(24);
config.setMaxTotal(24);
KAFKA_CONSUMER_POOL.put(
clusterDO.getId(),
new GenericObjectPool<KafkaConsumer>(new KafkaConsumerFactory(clusterDO), config)
);
KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config));
} catch (Exception e) {
LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e);
} finally {
@@ -106,7 +106,7 @@ public class KafkaClientPool {
public static void closeKafkaConsumerPool(Long clusterId) {
lock.lock();
try {
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
if (objectPool == null) {
return;
}
@@ -118,11 +118,11 @@ public class KafkaClientPool {
}
}
public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
public static KafkaConsumer<String, String> borrowKafkaConsumerClient(ClusterDO clusterDO) {
if (ValidateUtils.isNull(clusterDO)) {
return null;
}
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
if (ValidateUtils.isNull(objectPool)) {
initKafkaConsumerPool(clusterDO);
objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
@@ -139,11 +139,11 @@ public class KafkaClientPool {
return null;
}
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) {
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer<String, String> kafkaConsumer) {
if (ValidateUtils.isNull(physicalClusterId) || ValidateUtils.isNull(kafkaConsumer)) {
return;
}
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
if (ValidateUtils.isNull(objectPool)) {
return;
}
@@ -155,7 +155,7 @@ public class KafkaClientPool {
}
public static AdminClient getAdminClient(Long clusterId) {
AdminClient adminClient = AdminClientMap.get(clusterId);
AdminClient adminClient = ADMIN_CLIENT_MAP.get(clusterId);
if (adminClient != null) {
return adminClient;
}
@@ -166,26 +166,26 @@ public class KafkaClientPool {
Properties properties = createProperties(clusterDO, false);
lock.lock();
try {
adminClient = AdminClientMap.get(clusterId);
adminClient = ADMIN_CLIENT_MAP.get(clusterId);
if (adminClient != null) {
return adminClient;
}
AdminClientMap.put(clusterId, AdminClient.create(properties));
ADMIN_CLIENT_MAP.put(clusterId, AdminClient.create(properties));
} catch (Exception e) {
LOGGER.error("create kafka admin client failed, clusterId:{}.", clusterId, e);
} finally {
lock.unlock();
}
return AdminClientMap.get(clusterId);
return ADMIN_CLIENT_MAP.get(clusterId);
}
public static void closeAdminClient(ClusterDO cluster) {
if (AdminClientMap.containsKey(cluster.getId())) {
AdminClientMap.get(cluster.getId()).close();
if (ADMIN_CLIENT_MAP.containsKey(cluster.getId())) {
ADMIN_CLIENT_MAP.get(cluster.getId()).close();
}
}
public static Properties createProperties(ClusterDO clusterDO, Boolean serialize) {
public static Properties createProperties(ClusterDO clusterDO, boolean serialize) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterDO.getBootstrapServers());
if (serialize) {
@@ -198,8 +198,7 @@ public class KafkaClientPool {
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
return properties;
}
Properties securityProperties = JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class);
properties.putAll(securityProperties);
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
return properties;
}
}

View File

@@ -14,7 +14,10 @@ public class KafkaMetricsCache {
/**
* <clusterId, Metrics List>
*/
private static Map<Long, Map<String, TopicMetrics>> TopicMetricsMap = new ConcurrentHashMap<>();
private static final Map<Long, Map<String, TopicMetrics>> TOPIC_METRICS_MAP = new ConcurrentHashMap<>();
private KafkaMetricsCache() {
}
public static void putTopicMetricsToCache(Long clusterId, List<TopicMetrics> dataList) {
if (clusterId == null || dataList == null) {
@@ -24,22 +27,22 @@ public class KafkaMetricsCache {
for (TopicMetrics topicMetrics : dataList) {
subMetricsMap.put(topicMetrics.getTopicName(), topicMetrics);
}
TopicMetricsMap.put(clusterId, subMetricsMap);
TOPIC_METRICS_MAP.put(clusterId, subMetricsMap);
}
public static Map<String, TopicMetrics> getTopicMetricsFromCache(Long clusterId) {
return TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
return TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
}
public static Map<Long, Map<String, TopicMetrics>> getAllTopicMetricsFromCache() {
return TopicMetricsMap;
return TOPIC_METRICS_MAP;
}
public static TopicMetrics getTopicMetricsFromCache(Long clusterId, String topicName) {
if (clusterId == null || topicName == null) {
return null;
}
Map<String, TopicMetrics> subMap = TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
Map<String, TopicMetrics> subMap = TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
return subMap.get(topicName);
}
}

View File

@@ -160,7 +160,7 @@ public class LogicalClusterMetadataManager {
public void flush() {
List<LogicalClusterDO> logicalClusterDOList = logicalClusterService.listAll();
if (ValidateUtils.isNull(logicalClusterDOList)) {
logicalClusterDOList = Collections.EMPTY_LIST;
logicalClusterDOList = Collections.emptyList();
}
Set<Long> inDbLogicalClusterIds = logicalClusterDOList.stream()
.map(LogicalClusterDO::getId)

View File

@@ -3,10 +3,12 @@ package com.xiaojukeji.kafka.manager.service.cache;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
@@ -37,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Service
public class PhysicalClusterMetadataManager {
private final static Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
@Autowired
private ControllerDao controllerDao;
@@ -48,22 +50,22 @@ public class PhysicalClusterMetadataManager {
@Autowired
private ClusterService clusterService;
private final static Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
private static final Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
private final static Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
private static final Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
private final static Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
private static final Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
private final static Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
private static final Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
private final static Map<Long, Map<String, Long>> TOPIC_RETENTION_TIME_MAP = new ConcurrentHashMap<>();
private static final Map<Long, Map<String, Properties>> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
private final static Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
private static final Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
/**
* JXM连接, 延迟连接
*/
private final static Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
private static final Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
/**
* KafkaBroker版本, 延迟获取
@@ -95,7 +97,7 @@ public class PhysicalClusterMetadataManager {
// 初始化topic-map
TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
TOPIC_PROPERTIES_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
// 初始化cluster-map
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
@@ -158,7 +160,7 @@ public class PhysicalClusterMetadataManager {
KAFKA_VERSION_MAP.remove(clusterId);
TOPIC_METADATA_MAP.remove(clusterId);
TOPIC_RETENTION_TIME_MAP.remove(clusterId);
TOPIC_PROPERTIES_MAP.remove(clusterId);
CLUSTER_MAP.remove(clusterId);
}
@@ -262,24 +264,45 @@ public class PhysicalClusterMetadataManager {
//---------------------------配置相关元信息--------------
public static void putTopicRetentionTime(Long clusterId, String topicName, Long retentionTime) {
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
if (timeMap == null) {
public static void putTopicProperties(Long clusterId, String topicName, Properties properties) {
if (ValidateUtils.isNull(clusterId) || ValidateUtils.isBlank(topicName) || ValidateUtils.isNull(properties)) {
return;
}
timeMap.put(topicName, retentionTime);
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
if (ValidateUtils.isNull(propertiesMap)) {
return;
}
propertiesMap.put(topicName, properties);
}
public static Long getTopicRetentionTime(Long clusterId, String topicName) {
Map<String, Long> timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId);
if (timeMap == null) {
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
if (ValidateUtils.isNull(propertiesMap)) {
return null;
}
return timeMap.get(topicName);
Properties properties = propertiesMap.get(topicName);
if (ValidateUtils.isNull(properties)) {
return null;
}
return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_TIME_KEY_NAME));
}
public static Long getTopicRetentionBytes(Long clusterId, String topicName) {
Map<String, Properties> propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId);
if (ValidateUtils.isNull(propertiesMap)) {
return null;
}
Properties properties = propertiesMap.get(topicName);
if (ValidateUtils.isNull(properties)) {
return null;
}
return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_BYTES_KEY_NAME));
}
//---------------------------Broker元信息相关--------------
@@ -375,7 +398,7 @@ public class PhysicalClusterMetadataManager {
KafkaBrokerRoleEnum roleEnum) {
BrokerMetadata brokerMetadata =
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
if (ValidateUtils.isNull(brokerMetadata)) {
if (brokerMetadata == null) {
return;
}
String hostname = brokerMetadata.getHost().replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, "");
@@ -415,7 +438,7 @@ public class PhysicalClusterMetadataManager {
KafkaBrokerRoleEnum roleEnum) {
BrokerMetadata brokerMetadata =
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
if (ValidateUtils.isNull(brokerMetadata)) {
if (brokerMetadata == null) {
return;
}

View File

@@ -223,6 +223,7 @@ public class TopicServiceImpl implements TopicService {
basicDTO.setCreateTime(topicMetadata.getCreateTime());
basicDTO.setModifyTime(topicMetadata.getModifyTime());
basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName));
basicDTO.setRetentionBytes(PhysicalClusterMetadataManager.getTopicRetentionBytes(clusterId, topicName));
TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName);
if (!ValidateUtils.isNull(topicDO)) {
@@ -648,10 +649,11 @@ public class TopicServiceImpl implements TopicService {
List<String> dataList = new ArrayList<>();
int currentSize = dataList.size();
while (dataList.size() < maxMsgNum) {
if (remainingWaitMs <= 0) {
break;
}
try {
if (remainingWaitMs <= 0) {
break;
}
ConsumerRecords<String, String> records = kafkaConsumer.poll(TopicSampleConstant.POLL_TIME_OUT_UNIT_MS);
for (ConsumerRecord record : records) {
String value = (String) record.value();
@@ -661,20 +663,22 @@ public class TopicServiceImpl implements TopicService {
: value
);
}
// 当前批次一条数据都没拉取到,则结束拉取
if (dataList.size() - currentSize == 0) {
break;
}
currentSize = dataList.size();
// 检查是否超时
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs) {
break;
}
remainingWaitMs = maxWaitMs - elapsed;
} catch (Exception e) {
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
}
// 当前批次一条数据都没拉取到,则结束拉取
if (dataList.size() - currentSize == 0) {
break;
}
currentSize = dataList.size();
// 检查是否超时
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs) {
break;
}
remainingWaitMs = maxWaitMs - elapsed;
}
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
}
@@ -698,14 +702,15 @@ public class TopicServiceImpl implements TopicService {
: value
);
}
if (System.currentTimeMillis() - timestamp > timeout
|| dataList.size() >= maxMsgNum) {
break;
}
Thread.sleep(10);
} catch (Exception e) {
LOGGER.error("fetch topic data failed, TopicPartitions:{}.", kafkaConsumer.assignment(), e);
}
if (System.currentTimeMillis() - timestamp > timeout || dataList.size() >= maxMsgNum) {
// 超时或者是数据已采集足够时, 直接返回
break;
}
}
return dataList.subList(0, Math.min(dataList.size(), maxMsgNum));
}

View File

@@ -75,11 +75,7 @@ public class LoginServiceImpl implements LoginService {
return false;
}
if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)
|| classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_PREFIX)
|| classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX)
|| classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_NORMAL_PREFIX)
|| classRequestMappingValue.equals(ApiPrefix.GATEWAY_API_V1_PREFIX)) {
if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)) {
// 白名单接口直接true
return true;
}

View File

@@ -19,7 +19,6 @@ public class Converts {
orderDO.setApprover("");
orderDO.setOpinion("");
orderDO.setExtensions(orderDTO.getExtensions());
orderDO.setType(orderDTO.getType());
return orderDO;
}
}

View File

@@ -14,13 +14,14 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Properties;
/**
* @author zengqiao
* @date 20/7/23
*/
@Component
public class FlushTopicRetentionTime {
public class FlushTopicProperties {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
@@ -33,7 +34,7 @@ public class FlushTopicRetentionTime {
try {
flush(clusterDO);
} catch (Exception e) {
LOGGER.error("flush topic retention time failed, clusterId:{}.", clusterDO.getId(), e);
LOGGER.error("flush topic properties failed, clusterId:{}.", clusterDO.getId(), e);
}
}
}
@@ -41,22 +42,20 @@ public class FlushTopicRetentionTime {
private void flush(ClusterDO clusterDO) {
ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId());
if (ValidateUtils.isNull(zkConfig)) {
LOGGER.error("flush topic retention time, get zk config failed, clusterId:{}.", clusterDO.getId());
LOGGER.error("flush topic properties, get zk config failed, clusterId:{}.", clusterDO.getId());
return;
}
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
try {
Long retentionTime = KafkaZookeeperUtils.getTopicRetentionTime(zkConfig, topicName);
if (retentionTime == null) {
LOGGER.warn("get topic retentionTime failed, clusterId:{} topicName:{}.",
clusterDO.getId(), topicName);
Properties properties = KafkaZookeeperUtils.getTopicProperties(zkConfig, topicName);
if (ValidateUtils.isNull(properties)) {
LOGGER.warn("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName);
continue;
}
PhysicalClusterMetadataManager.putTopicRetentionTime(clusterDO.getId(), topicName, retentionTime);
PhysicalClusterMetadataManager.putTopicProperties(clusterDO.getId(), topicName, properties);
} catch (Exception e) {
LOGGER.error("get topic retentionTime failed, clusterId:{} topicName:{}.",
clusterDO.getId(), topicName, e);
LOGGER.error("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e);
}
}
}

View File

@@ -61,10 +61,7 @@ public class NormalTopicController {
@ApiOperation(value = "Topic基本信息", notes = "")
@RequestMapping(value = "{clusterId}/topics/{topicName}/basic-info", method = RequestMethod.GET)
@ResponseBody
public Result<TopicBasicVO> getTopicBasic(
@PathVariable Long clusterId,
@PathVariable String topicName,
@RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
public Result<TopicBasicVO> getTopicBasic(@PathVariable Long clusterId, @PathVariable String topicName, @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) {
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId);
if (ValidateUtils.isNull(physicalClusterId)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);

View File

@@ -31,6 +31,7 @@ public class TopicModelConverter {
vo.setReplicaNum(dto.getReplicaNum());
vo.setPrincipals(dto.getPrincipals());
vo.setRetentionTime(dto.getRetentionTime());
vo.setRetentionBytes(dto.getRetentionBytes());
vo.setCreateTime(dto.getCreateTime());
vo.setModifyTime(dto.getModifyTime());
vo.setScore(dto.getScore());

View File

@@ -150,7 +150,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10.5</version>
<version>2.9.10.8</version>
</dependency>
<!-- commons -->