mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
@@ -1,7 +1,7 @@
|
||||
package com.xiaojukeji.kafka.manager.common.utils.factory;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import org.apache.commons.pool2.BasePooledObjectFactory;
|
||||
import org.apache.commons.pool2.PooledObject;
|
||||
@@ -16,7 +16,7 @@ import java.util.Properties;
|
||||
* @author zengqiao
|
||||
* @date 20/8/24
|
||||
*/
|
||||
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer> {
|
||||
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer<String, String>> {
|
||||
private ClusterDO clusterDO;
|
||||
|
||||
public KafkaConsumerFactory(ClusterDO clusterDO) {
|
||||
@@ -25,17 +25,17 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
|
||||
|
||||
@Override
|
||||
public KafkaConsumer create() {
|
||||
return new KafkaConsumer(createKafkaConsumerProperties(clusterDO));
|
||||
return new KafkaConsumer<String, String>(createKafkaConsumerProperties(clusterDO));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PooledObject<KafkaConsumer> wrap(KafkaConsumer obj) {
|
||||
return new DefaultPooledObject<KafkaConsumer>(obj);
|
||||
public PooledObject<KafkaConsumer<String, String>> wrap(KafkaConsumer<String, String> obj) {
|
||||
return new DefaultPooledObject<>(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroyObject(final PooledObject<KafkaConsumer> p) throws Exception {
|
||||
KafkaConsumer kafkaConsumer = p.getObject();
|
||||
public void destroyObject(final PooledObject<KafkaConsumer<String, String>> p) throws Exception {
|
||||
KafkaConsumer<String, String> kafkaConsumer = p.getObject();
|
||||
if (ValidateUtils.isNull(kafkaConsumer)) {
|
||||
return;
|
||||
}
|
||||
@@ -57,7 +57,7 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
|
||||
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
|
||||
return properties;
|
||||
}
|
||||
properties.putAll(JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class));
|
||||
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,9 @@ public class ConsumerMetadataCache {
|
||||
|
||||
private static final Map<Long, ConsumerMetadata> CG_METADATA_IN_BK_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private ConsumerMetadataCache() {
|
||||
}
|
||||
|
||||
public static void putConsumerMetadataInZK(Long clusterId, ConsumerMetadata consumerMetadata) {
|
||||
if (clusterId == null || consumerMetadata == null) {
|
||||
return;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.xiaojukeji.kafka.manager.service.cache;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
|
||||
import kafka.admin.AdminClient;
|
||||
@@ -26,19 +26,22 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
* @date 19/12/24
|
||||
*/
|
||||
public class KafkaClientPool {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
|
||||
|
||||
/**
|
||||
* AdminClient
|
||||
*/
|
||||
private static Map<Long, AdminClient> AdminClientMap = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, AdminClient> ADMIN_CLIENT_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private static Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private static Map<Long, GenericObjectPool<KafkaConsumer>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, GenericObjectPool<KafkaConsumer<String, String>>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
|
||||
|
||||
private static ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private KafkaClientPool() {
|
||||
}
|
||||
|
||||
private static void initKafkaProducerMap(Long clusterId) {
|
||||
ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterId);
|
||||
if (clusterDO == null) {
|
||||
@@ -55,7 +58,7 @@ public class KafkaClientPool {
|
||||
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
|
||||
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
|
||||
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
|
||||
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<String, String>(properties));
|
||||
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<>(properties));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("create kafka producer failed, clusterDO:{}.", clusterDO, e);
|
||||
} finally {
|
||||
@@ -77,25 +80,22 @@ public class KafkaClientPool {
|
||||
if (ValidateUtils.isNull(kafkaProducer)) {
|
||||
return false;
|
||||
}
|
||||
kafkaProducer.send(new ProducerRecord<String, String>(topicName, data));
|
||||
kafkaProducer.send(new ProducerRecord<>(topicName, data));
|
||||
return true;
|
||||
}
|
||||
|
||||
private static void initKafkaConsumerPool(ClusterDO clusterDO) {
|
||||
lock.lock();
|
||||
try {
|
||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
if (objectPool != null) {
|
||||
return;
|
||||
}
|
||||
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
|
||||
GenericObjectPoolConfig<KafkaConsumer<String, String>> config = new GenericObjectPoolConfig<>();
|
||||
config.setMaxIdle(24);
|
||||
config.setMinIdle(24);
|
||||
config.setMaxTotal(24);
|
||||
KAFKA_CONSUMER_POOL.put(
|
||||
clusterDO.getId(),
|
||||
new GenericObjectPool<KafkaConsumer>(new KafkaConsumerFactory(clusterDO), config)
|
||||
);
|
||||
KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e);
|
||||
} finally {
|
||||
@@ -106,7 +106,7 @@ public class KafkaClientPool {
|
||||
public static void closeKafkaConsumerPool(Long clusterId) {
|
||||
lock.lock();
|
||||
try {
|
||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
|
||||
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
|
||||
if (objectPool == null) {
|
||||
return;
|
||||
}
|
||||
@@ -118,11 +118,11 @@ public class KafkaClientPool {
|
||||
}
|
||||
}
|
||||
|
||||
public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
|
||||
public static KafkaConsumer<String, String> borrowKafkaConsumerClient(ClusterDO clusterDO) {
|
||||
if (ValidateUtils.isNull(clusterDO)) {
|
||||
return null;
|
||||
}
|
||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
if (ValidateUtils.isNull(objectPool)) {
|
||||
initKafkaConsumerPool(clusterDO);
|
||||
objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
|
||||
@@ -139,11 +139,11 @@ public class KafkaClientPool {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) {
|
||||
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer<String, String> kafkaConsumer) {
|
||||
if (ValidateUtils.isNull(physicalClusterId) || ValidateUtils.isNull(kafkaConsumer)) {
|
||||
return;
|
||||
}
|
||||
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
|
||||
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
|
||||
if (ValidateUtils.isNull(objectPool)) {
|
||||
return;
|
||||
}
|
||||
@@ -155,7 +155,7 @@ public class KafkaClientPool {
|
||||
}
|
||||
|
||||
public static AdminClient getAdminClient(Long clusterId) {
|
||||
AdminClient adminClient = AdminClientMap.get(clusterId);
|
||||
AdminClient adminClient = ADMIN_CLIENT_MAP.get(clusterId);
|
||||
if (adminClient != null) {
|
||||
return adminClient;
|
||||
}
|
||||
@@ -166,26 +166,26 @@ public class KafkaClientPool {
|
||||
Properties properties = createProperties(clusterDO, false);
|
||||
lock.lock();
|
||||
try {
|
||||
adminClient = AdminClientMap.get(clusterId);
|
||||
adminClient = ADMIN_CLIENT_MAP.get(clusterId);
|
||||
if (adminClient != null) {
|
||||
return adminClient;
|
||||
}
|
||||
AdminClientMap.put(clusterId, AdminClient.create(properties));
|
||||
ADMIN_CLIENT_MAP.put(clusterId, AdminClient.create(properties));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("create kafka admin client failed, clusterId:{}.", clusterId, e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return AdminClientMap.get(clusterId);
|
||||
return ADMIN_CLIENT_MAP.get(clusterId);
|
||||
}
|
||||
|
||||
public static void closeAdminClient(ClusterDO cluster) {
|
||||
if (AdminClientMap.containsKey(cluster.getId())) {
|
||||
AdminClientMap.get(cluster.getId()).close();
|
||||
if (ADMIN_CLIENT_MAP.containsKey(cluster.getId())) {
|
||||
ADMIN_CLIENT_MAP.get(cluster.getId()).close();
|
||||
}
|
||||
}
|
||||
|
||||
public static Properties createProperties(ClusterDO clusterDO, Boolean serialize) {
|
||||
public static Properties createProperties(ClusterDO clusterDO, boolean serialize) {
|
||||
Properties properties = new Properties();
|
||||
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterDO.getBootstrapServers());
|
||||
if (serialize) {
|
||||
@@ -198,8 +198,7 @@ public class KafkaClientPool {
|
||||
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
|
||||
return properties;
|
||||
}
|
||||
Properties securityProperties = JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class);
|
||||
properties.putAll(securityProperties);
|
||||
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
@@ -14,7 +14,10 @@ public class KafkaMetricsCache {
|
||||
/**
|
||||
* <clusterId, Metrics List>
|
||||
*/
|
||||
private static Map<Long, Map<String, TopicMetrics>> TopicMetricsMap = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<String, TopicMetrics>> TOPIC_METRICS_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private KafkaMetricsCache() {
|
||||
}
|
||||
|
||||
public static void putTopicMetricsToCache(Long clusterId, List<TopicMetrics> dataList) {
|
||||
if (clusterId == null || dataList == null) {
|
||||
@@ -24,22 +27,22 @@ public class KafkaMetricsCache {
|
||||
for (TopicMetrics topicMetrics : dataList) {
|
||||
subMetricsMap.put(topicMetrics.getTopicName(), topicMetrics);
|
||||
}
|
||||
TopicMetricsMap.put(clusterId, subMetricsMap);
|
||||
TOPIC_METRICS_MAP.put(clusterId, subMetricsMap);
|
||||
}
|
||||
|
||||
public static Map<String, TopicMetrics> getTopicMetricsFromCache(Long clusterId) {
|
||||
return TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
|
||||
return TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static Map<Long, Map<String, TopicMetrics>> getAllTopicMetricsFromCache() {
|
||||
return TopicMetricsMap;
|
||||
return TOPIC_METRICS_MAP;
|
||||
}
|
||||
|
||||
public static TopicMetrics getTopicMetricsFromCache(Long clusterId, String topicName) {
|
||||
if (clusterId == null || topicName == null) {
|
||||
return null;
|
||||
}
|
||||
Map<String, TopicMetrics> subMap = TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
|
||||
Map<String, TopicMetrics> subMap = TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
|
||||
return subMap.get(topicName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,7 @@ public class LogicalClusterMetadataManager {
|
||||
public void flush() {
|
||||
List<LogicalClusterDO> logicalClusterDOList = logicalClusterService.listAll();
|
||||
if (ValidateUtils.isNull(logicalClusterDOList)) {
|
||||
logicalClusterDOList = Collections.EMPTY_LIST;
|
||||
logicalClusterDOList = Collections.emptyList();
|
||||
}
|
||||
Set<Long> inDbLogicalClusterIds = logicalClusterDOList.stream()
|
||||
.map(LogicalClusterDO::getId)
|
||||
|
||||
@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
*/
|
||||
@Service
|
||||
public class PhysicalClusterMetadataManager {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
|
||||
|
||||
@Autowired
|
||||
private ControllerDao controllerDao;
|
||||
@@ -50,22 +50,22 @@ public class PhysicalClusterMetadataManager {
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
private final static Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, Map<String, Properties>> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<String, Properties>> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private final static Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* JXM连接, 延迟连接
|
||||
*/
|
||||
private final static Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
|
||||
private static final Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* KafkaBroker版本, 延迟获取
|
||||
@@ -398,7 +398,7 @@ public class PhysicalClusterMetadataManager {
|
||||
KafkaBrokerRoleEnum roleEnum) {
|
||||
BrokerMetadata brokerMetadata =
|
||||
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
|
||||
if (ValidateUtils.isNull(brokerMetadata)) {
|
||||
if (brokerMetadata == null) {
|
||||
return;
|
||||
}
|
||||
String hostname = brokerMetadata.getHost().replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, "");
|
||||
@@ -438,7 +438,7 @@ public class PhysicalClusterMetadataManager {
|
||||
KafkaBrokerRoleEnum roleEnum) {
|
||||
BrokerMetadata brokerMetadata =
|
||||
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
|
||||
if (ValidateUtils.isNull(brokerMetadata)) {
|
||||
if (brokerMetadata == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user