From 387d89d3af35ef9b6985c41148ed8f5d84005085 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 13 Jul 2021 10:39:28 +0800 Subject: [PATCH] optimize code format by sonar-lint --- .../utils/factory/KafkaConsumerFactory.java | 16 +++--- .../service/cache/ConsumerMetadataCache.java | 3 ++ .../service/cache/KafkaClientPool.java | 53 +++++++++---------- .../service/cache/KafkaMetricsCache.java | 13 +++-- .../cache/LogicalClusterMetadataManager.java | 2 +- .../cache/PhysicalClusterMetadataManager.java | 20 +++---- 6 files changed, 56 insertions(+), 51 deletions(-) diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/factory/KafkaConsumerFactory.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/factory/KafkaConsumerFactory.java index 68109779..5964d162 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/factory/KafkaConsumerFactory.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/factory/KafkaConsumerFactory.java @@ -1,7 +1,7 @@ package com.xiaojukeji.kafka.manager.common.utils.factory; -import com.alibaba.fastjson.JSONObject; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; @@ -16,7 +16,7 @@ import java.util.Properties; * @author zengqiao * @date 20/8/24 */ -public class KafkaConsumerFactory extends BasePooledObjectFactory { +public class KafkaConsumerFactory extends BasePooledObjectFactory> { private ClusterDO clusterDO; public KafkaConsumerFactory(ClusterDO clusterDO) { @@ -25,17 +25,17 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory @Override public KafkaConsumer create() { - return new KafkaConsumer(createKafkaConsumerProperties(clusterDO)); + return new KafkaConsumer(createKafkaConsumerProperties(clusterDO)); } @Override - public PooledObject wrap(KafkaConsumer obj) { - return new DefaultPooledObject(obj); + public PooledObject> wrap(KafkaConsumer obj) { + return new DefaultPooledObject<>(obj); } @Override - public void destroyObject(final PooledObject p) throws Exception { - KafkaConsumer kafkaConsumer = p.getObject(); + public void destroyObject(final PooledObject> p) throws Exception { + KafkaConsumer kafkaConsumer = p.getObject(); if (ValidateUtils.isNull(kafkaConsumer)) { return; } @@ -57,7 +57,7 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) { return properties; } - properties.putAll(JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class)); + properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class)); return properties; } } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java index 41fd0092..3fd6aaac 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java @@ -17,6 +17,9 @@ public class ConsumerMetadataCache { private static final Map CG_METADATA_IN_BK_MAP = new ConcurrentHashMap<>(); + private ConsumerMetadataCache() { + } + public static void putConsumerMetadataInZK(Long clusterId, ConsumerMetadata consumerMetadata) { if (clusterId == null || consumerMetadata == null) { return; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java index 921b13ba..56e17ae5 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java @@ -1,7 +1,7 @@ package com.xiaojukeji.kafka.manager.service.cache; -import com.alibaba.fastjson.JSONObject; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory; import kafka.admin.AdminClient; @@ -26,19 +26,22 @@ import java.util.concurrent.locks.ReentrantLock; * @date 19/12/24 */ public class KafkaClientPool { - private final static Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class); + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class); /** * AdminClient */ - private static Map AdminClientMap = new ConcurrentHashMap<>(); + private static final Map ADMIN_CLIENT_MAP = new ConcurrentHashMap<>(); - private static Map> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>(); + private static final Map> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>(); - private static Map> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>(); + private static final Map>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>(); private static ReentrantLock lock = new ReentrantLock(); + private KafkaClientPool() { + } + private static void initKafkaProducerMap(Long clusterId) { ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterId); if (clusterDO == null) { @@ -55,7 +58,7 @@ public class KafkaClientPool { properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10"); properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); - KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer(properties)); + KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<>(properties)); } catch (Exception e) { LOGGER.error("create kafka producer failed, clusterDO:{}.", clusterDO, e); } finally { @@ -77,25 +80,22 @@ public class KafkaClientPool { if (ValidateUtils.isNull(kafkaProducer)) { return false; } - kafkaProducer.send(new ProducerRecord(topicName, data)); + kafkaProducer.send(new ProducerRecord<>(topicName, data)); return true; } private static void initKafkaConsumerPool(ClusterDO clusterDO) { lock.lock(); try { - GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId()); + GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId()); if (objectPool != null) { return; } - GenericObjectPoolConfig config = new GenericObjectPoolConfig(); + GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>(); config.setMaxIdle(24); config.setMinIdle(24); config.setMaxTotal(24); - KAFKA_CONSUMER_POOL.put( - clusterDO.getId(), - new GenericObjectPool(new KafkaConsumerFactory(clusterDO), config) - ); + KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config)); } catch (Exception e) { LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e); } finally { @@ -106,7 +106,7 @@ public class KafkaClientPool { public static void closeKafkaConsumerPool(Long clusterId) { lock.lock(); try { - GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.remove(clusterId); + GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId); if (objectPool == null) { return; } @@ -118,11 +118,11 @@ public class KafkaClientPool { } } - public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { + public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { if (ValidateUtils.isNull(clusterDO)) { return null; } - GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId()); + GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId()); if (ValidateUtils.isNull(objectPool)) { initKafkaConsumerPool(clusterDO); objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId()); @@ -139,11 +139,11 @@ public class KafkaClientPool { return null; } - public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) { + public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) { if (ValidateUtils.isNull(physicalClusterId) || ValidateUtils.isNull(kafkaConsumer)) { return; } - GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId); + GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId); if (ValidateUtils.isNull(objectPool)) { return; } @@ -155,7 +155,7 @@ public class KafkaClientPool { } public static AdminClient getAdminClient(Long clusterId) { - AdminClient adminClient = AdminClientMap.get(clusterId); + AdminClient adminClient = ADMIN_CLIENT_MAP.get(clusterId); if (adminClient != null) { return adminClient; } @@ -166,26 +166,26 @@ public class KafkaClientPool { Properties properties = createProperties(clusterDO, false); lock.lock(); try { - adminClient = AdminClientMap.get(clusterId); + adminClient = ADMIN_CLIENT_MAP.get(clusterId); if (adminClient != null) { return adminClient; } - AdminClientMap.put(clusterId, AdminClient.create(properties)); + ADMIN_CLIENT_MAP.put(clusterId, AdminClient.create(properties)); } catch (Exception e) { LOGGER.error("create kafka admin client failed, clusterId:{}.", clusterId, e); } finally { lock.unlock(); } - return AdminClientMap.get(clusterId); + return ADMIN_CLIENT_MAP.get(clusterId); } public static void closeAdminClient(ClusterDO cluster) { - if (AdminClientMap.containsKey(cluster.getId())) { - AdminClientMap.get(cluster.getId()).close(); + if (ADMIN_CLIENT_MAP.containsKey(cluster.getId())) { + ADMIN_CLIENT_MAP.get(cluster.getId()).close(); } } - public static Properties createProperties(ClusterDO clusterDO, Boolean serialize) { + public static Properties createProperties(ClusterDO clusterDO, boolean serialize) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterDO.getBootstrapServers()); if (serialize) { @@ -198,8 +198,7 @@ public class KafkaClientPool { if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) { return properties; } - Properties securityProperties = JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class); - properties.putAll(securityProperties); + properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class)); return properties; } } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java index 011bc1e6..7ba1e304 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java @@ -14,7 +14,10 @@ public class KafkaMetricsCache { /** * */ - private static Map> TopicMetricsMap = new ConcurrentHashMap<>(); + private static final Map> TOPIC_METRICS_MAP = new ConcurrentHashMap<>(); + + private KafkaMetricsCache() { + } public static void putTopicMetricsToCache(Long clusterId, List dataList) { if (clusterId == null || dataList == null) { @@ -24,22 +27,22 @@ public class KafkaMetricsCache { for (TopicMetrics topicMetrics : dataList) { subMetricsMap.put(topicMetrics.getTopicName(), topicMetrics); } - TopicMetricsMap.put(clusterId, subMetricsMap); + TOPIC_METRICS_MAP.put(clusterId, subMetricsMap); } public static Map getTopicMetricsFromCache(Long clusterId) { - return TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap()); + return TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap()); } public static Map> getAllTopicMetricsFromCache() { - return TopicMetricsMap; + return TOPIC_METRICS_MAP; } public static TopicMetrics getTopicMetricsFromCache(Long clusterId, String topicName) { if (clusterId == null || topicName == null) { return null; } - Map subMap = TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap()); + Map subMap = TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap()); return subMap.get(topicName); } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java index 5cd81581..744101ef 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java @@ -160,7 +160,7 @@ public class LogicalClusterMetadataManager { public void flush() { List logicalClusterDOList = logicalClusterService.listAll(); if (ValidateUtils.isNull(logicalClusterDOList)) { - logicalClusterDOList = Collections.EMPTY_LIST; + logicalClusterDOList = Collections.emptyList(); } Set inDbLogicalClusterIds = logicalClusterDOList.stream() .map(LogicalClusterDO::getId) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index a7142fa9..c5f09820 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap; */ @Service public class PhysicalClusterMetadataManager { - private final static Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class); @Autowired private ControllerDao controllerDao; @@ -50,22 +50,22 @@ public class PhysicalClusterMetadataManager { @Autowired private ClusterService clusterService; - private final static Map CLUSTER_MAP = new ConcurrentHashMap<>(); + private static final Map CLUSTER_MAP = new ConcurrentHashMap<>(); - private final static Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>(); + private static final Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>(); - private final static Map ZK_CONFIG_MAP = new ConcurrentHashMap<>(); + private static final Map ZK_CONFIG_MAP = new ConcurrentHashMap<>(); - private final static Map> TOPIC_METADATA_MAP = new ConcurrentHashMap<>(); + private static final Map> TOPIC_METADATA_MAP = new ConcurrentHashMap<>(); - private final static Map> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>(); + private static final Map> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>(); - private final static Map> BROKER_METADATA_MAP = new ConcurrentHashMap<>(); + private static final Map> BROKER_METADATA_MAP = new ConcurrentHashMap<>(); /** * JXM连接, 延迟连接 */ - private final static Map> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>(); + private static final Map> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>(); /** * KafkaBroker版本, 延迟获取 @@ -398,7 +398,7 @@ public class PhysicalClusterMetadataManager { KafkaBrokerRoleEnum roleEnum) { BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId); - if (ValidateUtils.isNull(brokerMetadata)) { + if (brokerMetadata == null) { return; } String hostname = brokerMetadata.getHost().replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, ""); @@ -438,7 +438,7 @@ public class PhysicalClusterMetadataManager { KafkaBrokerRoleEnum roleEnum) { BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId); - if (ValidateUtils.isNull(brokerMetadata)) { + if (brokerMetadata == null) { return; }