From 1ed933b7ada59b01ef65f8fed588a65e5395449d Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 23 Feb 2021 16:34:21 +0800 Subject: [PATCH] support dynamic change auth --- .../manager/common/entity/pojo/ClusterDO.java | 19 +++++ .../service/cache/KafkaClientPool.java | 23 +++++- .../cache/PhysicalClusterMetadataManager.java | 28 ++++--- .../metadata/FlushClusterMetadata.java | 77 ++++++++++++++----- 4 files changed, 116 insertions(+), 31 deletions(-) diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java index 04ee265d..5ebebc75 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ClusterDO.java @@ -1,6 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.pojo; import java.util.Date; +import java.util.Objects; /** * @author zengqiao @@ -116,4 +117,22 @@ public class ClusterDO implements Comparable { public int compareTo(ClusterDO clusterDO) { return this.id.compareTo(clusterDO.id); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterDO clusterDO = (ClusterDO) o; + return Objects.equals(id, clusterDO.id) + && Objects.equals(clusterName, clusterDO.clusterName) + && Objects.equals(zookeeper, clusterDO.zookeeper) + && Objects.equals(bootstrapServers, clusterDO.bootstrapServers) + && Objects.equals(securityProperties, clusterDO.securityProperties) + && Objects.equals(jmxProperties, clusterDO.jmxProperties); + } + + @Override + public int hashCode() { + return Objects.hash(id, clusterName, zookeeper, bootstrapServers, securityProperties, jmxProperties); + } } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java index ce0753e4..921b13ba 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java @@ -1,8 +1,8 @@ package com.xiaojukeji.kafka.manager.service.cache; import com.alibaba.fastjson.JSONObject; -import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory; import kafka.admin.AdminClient; import org.apache.commons.pool2.impl.GenericObjectPool; @@ -103,6 +103,21 @@ public class KafkaClientPool { } } + public static void closeKafkaConsumerPool(Long clusterId) { + lock.lock(); + try { + GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.remove(clusterId); + if (objectPool == null) { + return; + } + objectPool.close(); + } catch (Exception e) { + LOGGER.error("close kafka consumer pool failed, clusterId:{}.", clusterId, e); + } finally { + lock.unlock(); + } + } + public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { if (ValidateUtils.isNull(clusterDO)) { return null; @@ -132,7 +147,11 @@ public class KafkaClientPool { if (ValidateUtils.isNull(objectPool)) { return; } - objectPool.returnObject(kafkaConsumer); + try { + objectPool.returnObject(kafkaConsumer); + } catch (Exception e) { + LOGGER.error("return kafka consumer client failed, clusterId:{}", physicalClusterId, e); + } } public static AdminClient getAdminClient(Long clusterId) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index e3b8f23f..631b254f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -4,21 +4,23 @@ import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum; import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ListUtils; -import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig; -import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; -import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData; -import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; -import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl; -import com.xiaojukeji.kafka.manager.dao.ControllerDao; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap; -import com.xiaojukeji.kafka.manager.service.service.JmxService; -import com.xiaojukeji.kafka.manager.service.zookeeper.*; -import com.xiaojukeji.kafka.manager.service.service.ClusterService; +import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl; import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; +import com.xiaojukeji.kafka.manager.dao.ControllerDao; +import com.xiaojukeji.kafka.manager.service.service.ClusterService; +import com.xiaojukeji.kafka.manager.service.service.JmxService; +import com.xiaojukeji.kafka.manager.service.zookeeper.BrokerStateListener; +import com.xiaojukeji.kafka.manager.service.zookeeper.ControllerStateListener; +import com.xiaojukeji.kafka.manager.service.zookeeper.TopicStateListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -160,8 +162,12 @@ public class PhysicalClusterMetadataManager { CLUSTER_MAP.remove(clusterId); } - public Set getClusterIdSet() { - return CLUSTER_MAP.keySet(); + public static Map getClusterMap() { + return CLUSTER_MAP; + } + + public static void updateClusterMap(ClusterDO clusterDO) { + CLUSTER_MAP.put(clusterDO.getId(), clusterDO); } public static ClusterDO getClusterFromCache(Long clusterId) { diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java index e2c63e06..e88ad696 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushClusterMetadata.java @@ -1,15 +1,17 @@ package com.xiaojukeji.kafka.manager.task.schedule.metadata; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.ClusterService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; /** * @author zengqiao @@ -25,24 +27,63 @@ public class FlushClusterMetadata { @Scheduled(cron="0/30 * * * * ?") public void flush() { - List doList = clusterService.list(); + Map dbClusterMap = clusterService.list().stream().collect(Collectors.toMap(ClusterDO::getId, Function.identity(), (key1, key2) -> key2)); - Set newClusterIdSet = new HashSet<>(); - Set oldClusterIdSet = physicalClusterMetadataManager.getClusterIdSet(); - for (ClusterDO clusterDO: doList) { - newClusterIdSet.add(clusterDO.getId()); + Map cacheClusterMap = PhysicalClusterMetadataManager.getClusterMap(); - // 添加集群 - physicalClusterMetadataManager.addNew(clusterDO); - } + // 新增的集群 + for (ClusterDO clusterDO: dbClusterMap.values()) { + if (cacheClusterMap.containsKey(clusterDO.getId())) { + // 已经存在 + continue; + } + add(clusterDO); + } - for (Long clusterId: oldClusterIdSet) { - if (newClusterIdSet.contains(clusterId)) { - continue; - } + // 移除的集群 + for (ClusterDO clusterDO: cacheClusterMap.values()) { + if (dbClusterMap.containsKey(clusterDO.getId())) { + // 已经存在 + continue; + } + remove(clusterDO.getId()); + } - // 移除集群 - physicalClusterMetadataManager.remove(clusterId); - } + // 被修改配置的集群 + for (ClusterDO dbClusterDO: dbClusterMap.values()) { + ClusterDO cacheClusterDO = cacheClusterMap.get(dbClusterDO.getId()); + if (ValidateUtils.anyNull(cacheClusterDO) || dbClusterDO.equals(cacheClusterDO)) { + // 不存在 || 相等 + continue; + } + modifyConfig(dbClusterDO); + } } + + private void add(ClusterDO clusterDO) { + if (ValidateUtils.anyNull(clusterDO)) { + return; + } + physicalClusterMetadataManager.addNew(clusterDO); + } + + private void modifyConfig(ClusterDO clusterDO) { + if (ValidateUtils.anyNull(clusterDO)) { + return; + } + PhysicalClusterMetadataManager.updateClusterMap(clusterDO); + KafkaClientPool.closeKafkaConsumerPool(clusterDO.getId()); + } + + private void remove(Long clusterId) { + if (ValidateUtils.anyNull(clusterId)) { + return; + } + // 移除缓存信息 + physicalClusterMetadataManager.remove(clusterId); + + // 清除客户端池子 + KafkaClientPool.closeKafkaConsumerPool(clusterId); + } + } \ No newline at end of file