mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-03 19:38:20 +08:00
support dynamic change auth
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
package com.xiaojukeji.kafka.manager.common.entity.pojo;
|
package com.xiaojukeji.kafka.manager.common.entity.pojo;
|
||||||
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author zengqiao
|
* @author zengqiao
|
||||||
@@ -116,4 +117,22 @@ public class ClusterDO implements Comparable<ClusterDO> {
|
|||||||
public int compareTo(ClusterDO clusterDO) {
|
public int compareTo(ClusterDO clusterDO) {
|
||||||
return this.id.compareTo(clusterDO.id);
|
return this.id.compareTo(clusterDO.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
ClusterDO clusterDO = (ClusterDO) o;
|
||||||
|
return Objects.equals(id, clusterDO.id)
|
||||||
|
&& Objects.equals(clusterName, clusterDO.clusterName)
|
||||||
|
&& Objects.equals(zookeeper, clusterDO.zookeeper)
|
||||||
|
&& Objects.equals(bootstrapServers, clusterDO.bootstrapServers)
|
||||||
|
&& Objects.equals(securityProperties, clusterDO.securityProperties)
|
||||||
|
&& Objects.equals(jmxProperties, clusterDO.jmxProperties);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(id, clusterName, zookeeper, bootstrapServers, securityProperties, jmxProperties);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -1,8 +1,8 @@
|
|||||||
package com.xiaojukeji.kafka.manager.service.cache;
|
package com.xiaojukeji.kafka.manager.service.cache;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
|
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
|
||||||
import kafka.admin.AdminClient;
|
import kafka.admin.AdminClient;
|
||||||
import org.apache.commons.pool2.impl.GenericObjectPool;
|
import org.apache.commons.pool2.impl.GenericObjectPool;
|
||||||
@@ -103,6 +103,21 @@ public class KafkaClientPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void closeKafkaConsumerPool(Long clusterId) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
|
||||||
|
if (objectPool == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
objectPool.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("close kafka consumer pool failed, clusterId:{}.", clusterId, e);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
|
public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
|
||||||
if (ValidateUtils.isNull(clusterDO)) {
|
if (ValidateUtils.isNull(clusterDO)) {
|
||||||
return null;
|
return null;
|
||||||
@@ -132,7 +147,11 @@ public class KafkaClientPool {
|
|||||||
if (ValidateUtils.isNull(objectPool)) {
|
if (ValidateUtils.isNull(objectPool)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
objectPool.returnObject(kafkaConsumer);
|
try {
|
||||||
|
objectPool.returnObject(kafkaConsumer);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error("return kafka consumer client failed, clusterId:{}", physicalClusterId, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AdminClient getAdminClient(Long clusterId) {
|
public static AdminClient getAdminClient(Long clusterId) {
|
||||||
|
|||||||
@@ -4,21 +4,23 @@ import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum;
|
|||||||
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
import com.xiaojukeji.kafka.manager.common.constant.Constant;
|
||||||
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
|
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
|
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig;
|
||||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
|
|
||||||
import com.xiaojukeji.kafka.manager.dao.ControllerDao;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
|
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.JmxService;
|
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl;
|
||||||
import com.xiaojukeji.kafka.manager.service.zookeeper.*;
|
|
||||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
|
||||||
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil;
|
import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.ControllerData;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||||
|
import com.xiaojukeji.kafka.manager.dao.ControllerDao;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.service.JmxService;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.zookeeper.BrokerStateListener;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.zookeeper.ControllerStateListener;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.zookeeper.TopicStateListener;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -160,8 +162,12 @@ public class PhysicalClusterMetadataManager {
|
|||||||
CLUSTER_MAP.remove(clusterId);
|
CLUSTER_MAP.remove(clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<Long> getClusterIdSet() {
|
public static Map<Long, ClusterDO> getClusterMap() {
|
||||||
return CLUSTER_MAP.keySet();
|
return CLUSTER_MAP;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void updateClusterMap(ClusterDO clusterDO) {
|
||||||
|
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ClusterDO getClusterFromCache(Long clusterId) {
|
public static ClusterDO getClusterFromCache(Long clusterId) {
|
||||||
|
|||||||
@@ -1,15 +1,17 @@
|
|||||||
package com.xiaojukeji.kafka.manager.task.schedule.metadata;
|
package com.xiaojukeji.kafka.manager.task.schedule.metadata;
|
||||||
|
|
||||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool;
|
||||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.Map;
|
||||||
import java.util.List;
|
import java.util.function.Function;
|
||||||
import java.util.Set;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author zengqiao
|
* @author zengqiao
|
||||||
@@ -25,24 +27,63 @@ public class FlushClusterMetadata {
|
|||||||
|
|
||||||
@Scheduled(cron="0/30 * * * * ?")
|
@Scheduled(cron="0/30 * * * * ?")
|
||||||
public void flush() {
|
public void flush() {
|
||||||
List<ClusterDO> doList = clusterService.list();
|
Map<Long, ClusterDO> dbClusterMap = clusterService.list().stream().collect(Collectors.toMap(ClusterDO::getId, Function.identity(), (key1, key2) -> key2));
|
||||||
|
|
||||||
Set<Long> newClusterIdSet = new HashSet<>();
|
Map<Long, ClusterDO> cacheClusterMap = PhysicalClusterMetadataManager.getClusterMap();
|
||||||
Set<Long> oldClusterIdSet = physicalClusterMetadataManager.getClusterIdSet();
|
|
||||||
for (ClusterDO clusterDO: doList) {
|
|
||||||
newClusterIdSet.add(clusterDO.getId());
|
|
||||||
|
|
||||||
// 添加集群
|
// 新增的集群
|
||||||
physicalClusterMetadataManager.addNew(clusterDO);
|
for (ClusterDO clusterDO: dbClusterMap.values()) {
|
||||||
}
|
if (cacheClusterMap.containsKey(clusterDO.getId())) {
|
||||||
|
// 已经存在
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
add(clusterDO);
|
||||||
|
}
|
||||||
|
|
||||||
for (Long clusterId: oldClusterIdSet) {
|
// 移除的集群
|
||||||
if (newClusterIdSet.contains(clusterId)) {
|
for (ClusterDO clusterDO: cacheClusterMap.values()) {
|
||||||
continue;
|
if (dbClusterMap.containsKey(clusterDO.getId())) {
|
||||||
}
|
// 已经存在
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
remove(clusterDO.getId());
|
||||||
|
}
|
||||||
|
|
||||||
// 移除集群
|
// 被修改配置的集群
|
||||||
physicalClusterMetadataManager.remove(clusterId);
|
for (ClusterDO dbClusterDO: dbClusterMap.values()) {
|
||||||
}
|
ClusterDO cacheClusterDO = cacheClusterMap.get(dbClusterDO.getId());
|
||||||
|
if (ValidateUtils.anyNull(cacheClusterDO) || dbClusterDO.equals(cacheClusterDO)) {
|
||||||
|
// 不存在 || 相等
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
modifyConfig(dbClusterDO);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void add(ClusterDO clusterDO) {
|
||||||
|
if (ValidateUtils.anyNull(clusterDO)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
physicalClusterMetadataManager.addNew(clusterDO);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void modifyConfig(ClusterDO clusterDO) {
|
||||||
|
if (ValidateUtils.anyNull(clusterDO)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
PhysicalClusterMetadataManager.updateClusterMap(clusterDO);
|
||||||
|
KafkaClientPool.closeKafkaConsumerPool(clusterDO.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void remove(Long clusterId) {
|
||||||
|
if (ValidateUtils.anyNull(clusterId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// 移除缓存信息
|
||||||
|
physicalClusterMetadataManager.remove(clusterId);
|
||||||
|
|
||||||
|
// 清除客户端池子
|
||||||
|
KafkaClientPool.closeKafkaConsumerPool(clusterId);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user