optimize code format by sonar-lint

This commit is contained in:
zengqiao
2021-07-13 10:39:28 +08:00
parent c9f7da84d0
commit 387d89d3af
6 changed files with 56 additions and 51 deletions

View File

@@ -1,7 +1,7 @@
package com.xiaojukeji.kafka.manager.common.utils.factory;
import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
@@ -16,7 +16,7 @@ import java.util.Properties;
* @author zengqiao
* @date 20/8/24
*/
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer> {
public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer<String, String>> {
private ClusterDO clusterDO;
public KafkaConsumerFactory(ClusterDO clusterDO) {
@@ -25,17 +25,17 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
@Override
public KafkaConsumer create() {
return new KafkaConsumer(createKafkaConsumerProperties(clusterDO));
return new KafkaConsumer<String, String>(createKafkaConsumerProperties(clusterDO));
}
@Override
public PooledObject<KafkaConsumer> wrap(KafkaConsumer obj) {
return new DefaultPooledObject<KafkaConsumer>(obj);
public PooledObject<KafkaConsumer<String, String>> wrap(KafkaConsumer<String, String> obj) {
return new DefaultPooledObject<>(obj);
}
@Override
public void destroyObject(final PooledObject<KafkaConsumer> p) throws Exception {
KafkaConsumer kafkaConsumer = p.getObject();
public void destroyObject(final PooledObject<KafkaConsumer<String, String>> p) throws Exception {
KafkaConsumer<String, String> kafkaConsumer = p.getObject();
if (ValidateUtils.isNull(kafkaConsumer)) {
return;
}
@@ -57,7 +57,7 @@ public class KafkaConsumerFactory extends BasePooledObjectFactory<KafkaConsumer>
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
return properties;
}
properties.putAll(JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class));
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
return properties;
}
}

View File

@@ -17,6 +17,9 @@ public class ConsumerMetadataCache {
private static final Map<Long, ConsumerMetadata> CG_METADATA_IN_BK_MAP = new ConcurrentHashMap<>();
private ConsumerMetadataCache() {
}
public static void putConsumerMetadataInZK(Long clusterId, ConsumerMetadata consumerMetadata) {
if (clusterId == null || consumerMetadata == null) {
return;

View File

@@ -1,7 +1,7 @@
package com.xiaojukeji.kafka.manager.service.cache;
import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
import kafka.admin.AdminClient;
@@ -26,19 +26,22 @@ import java.util.concurrent.locks.ReentrantLock;
* @date 19/12/24
*/
public class KafkaClientPool {
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
/**
* AdminClient
*/
private static Map<Long, AdminClient> AdminClientMap = new ConcurrentHashMap<>();
private static final Map<Long, AdminClient> ADMIN_CLIENT_MAP = new ConcurrentHashMap<>();
private static Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
private static final Map<Long, KafkaProducer<String, String>> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
private static Map<Long, GenericObjectPool<KafkaConsumer>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
private static final Map<Long, GenericObjectPool<KafkaConsumer<String, String>>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
private static ReentrantLock lock = new ReentrantLock();
private KafkaClientPool() {
}
private static void initKafkaProducerMap(Long clusterId) {
ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterId);
if (clusterDO == null) {
@@ -55,7 +58,7 @@ public class KafkaClientPool {
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<String, String>(properties));
KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<>(properties));
} catch (Exception e) {
LOGGER.error("create kafka producer failed, clusterDO:{}.", clusterDO, e);
} finally {
@@ -77,25 +80,22 @@ public class KafkaClientPool {
if (ValidateUtils.isNull(kafkaProducer)) {
return false;
}
kafkaProducer.send(new ProducerRecord<String, String>(topicName, data));
kafkaProducer.send(new ProducerRecord<>(topicName, data));
return true;
}
private static void initKafkaConsumerPool(ClusterDO clusterDO) {
lock.lock();
try {
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
if (objectPool != null) {
return;
}
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
GenericObjectPoolConfig<KafkaConsumer<String, String>> config = new GenericObjectPoolConfig<>();
config.setMaxIdle(24);
config.setMinIdle(24);
config.setMaxTotal(24);
KAFKA_CONSUMER_POOL.put(
clusterDO.getId(),
new GenericObjectPool<KafkaConsumer>(new KafkaConsumerFactory(clusterDO), config)
);
KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config));
} catch (Exception e) {
LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e);
} finally {
@@ -106,7 +106,7 @@ public class KafkaClientPool {
public static void closeKafkaConsumerPool(Long clusterId) {
lock.lock();
try {
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
if (objectPool == null) {
return;
}
@@ -118,11 +118,11 @@ public class KafkaClientPool {
}
}
public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
public static KafkaConsumer<String, String> borrowKafkaConsumerClient(ClusterDO clusterDO) {
if (ValidateUtils.isNull(clusterDO)) {
return null;
}
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
if (ValidateUtils.isNull(objectPool)) {
initKafkaConsumerPool(clusterDO);
objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
@@ -139,11 +139,11 @@ public class KafkaClientPool {
return null;
}
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) {
public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer<String, String> kafkaConsumer) {
if (ValidateUtils.isNull(physicalClusterId) || ValidateUtils.isNull(kafkaConsumer)) {
return;
}
GenericObjectPool<KafkaConsumer> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
GenericObjectPool<KafkaConsumer<String, String>> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
if (ValidateUtils.isNull(objectPool)) {
return;
}
@@ -155,7 +155,7 @@ public class KafkaClientPool {
}
public static AdminClient getAdminClient(Long clusterId) {
AdminClient adminClient = AdminClientMap.get(clusterId);
AdminClient adminClient = ADMIN_CLIENT_MAP.get(clusterId);
if (adminClient != null) {
return adminClient;
}
@@ -166,26 +166,26 @@ public class KafkaClientPool {
Properties properties = createProperties(clusterDO, false);
lock.lock();
try {
adminClient = AdminClientMap.get(clusterId);
adminClient = ADMIN_CLIENT_MAP.get(clusterId);
if (adminClient != null) {
return adminClient;
}
AdminClientMap.put(clusterId, AdminClient.create(properties));
ADMIN_CLIENT_MAP.put(clusterId, AdminClient.create(properties));
} catch (Exception e) {
LOGGER.error("create kafka admin client failed, clusterId:{}.", clusterId, e);
} finally {
lock.unlock();
}
return AdminClientMap.get(clusterId);
return ADMIN_CLIENT_MAP.get(clusterId);
}
public static void closeAdminClient(ClusterDO cluster) {
if (AdminClientMap.containsKey(cluster.getId())) {
AdminClientMap.get(cluster.getId()).close();
if (ADMIN_CLIENT_MAP.containsKey(cluster.getId())) {
ADMIN_CLIENT_MAP.get(cluster.getId()).close();
}
}
public static Properties createProperties(ClusterDO clusterDO, Boolean serialize) {
public static Properties createProperties(ClusterDO clusterDO, boolean serialize) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterDO.getBootstrapServers());
if (serialize) {
@@ -198,8 +198,7 @@ public class KafkaClientPool {
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
return properties;
}
Properties securityProperties = JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class);
properties.putAll(securityProperties);
properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
return properties;
}
}

View File

@@ -14,7 +14,10 @@ public class KafkaMetricsCache {
/**
* <clusterId, Metrics List>
*/
private static Map<Long, Map<String, TopicMetrics>> TopicMetricsMap = new ConcurrentHashMap<>();
private static final Map<Long, Map<String, TopicMetrics>> TOPIC_METRICS_MAP = new ConcurrentHashMap<>();
private KafkaMetricsCache() {
}
public static void putTopicMetricsToCache(Long clusterId, List<TopicMetrics> dataList) {
if (clusterId == null || dataList == null) {
@@ -24,22 +27,22 @@ public class KafkaMetricsCache {
for (TopicMetrics topicMetrics : dataList) {
subMetricsMap.put(topicMetrics.getTopicName(), topicMetrics);
}
TopicMetricsMap.put(clusterId, subMetricsMap);
TOPIC_METRICS_MAP.put(clusterId, subMetricsMap);
}
public static Map<String, TopicMetrics> getTopicMetricsFromCache(Long clusterId) {
return TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
return TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
}
public static Map<Long, Map<String, TopicMetrics>> getAllTopicMetricsFromCache() {
return TopicMetricsMap;
return TOPIC_METRICS_MAP;
}
public static TopicMetrics getTopicMetricsFromCache(Long clusterId, String topicName) {
if (clusterId == null || topicName == null) {
return null;
}
Map<String, TopicMetrics> subMap = TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
Map<String, TopicMetrics> subMap = TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
return subMap.get(topicName);
}
}

View File

@@ -160,7 +160,7 @@ public class LogicalClusterMetadataManager {
public void flush() {
List<LogicalClusterDO> logicalClusterDOList = logicalClusterService.listAll();
if (ValidateUtils.isNull(logicalClusterDOList)) {
logicalClusterDOList = Collections.EMPTY_LIST;
logicalClusterDOList = Collections.emptyList();
}
Set<Long> inDbLogicalClusterIds = logicalClusterDOList.stream()
.map(LogicalClusterDO::getId)

View File

@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Service
public class PhysicalClusterMetadataManager {
private final static Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
@Autowired
private ControllerDao controllerDao;
@@ -50,22 +50,22 @@ public class PhysicalClusterMetadataManager {
@Autowired
private ClusterService clusterService;
private final static Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
private static final Map<Long, ClusterDO> CLUSTER_MAP = new ConcurrentHashMap<>();
private final static Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
private static final Map<Long, ControllerData> CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
private final static Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
private static final Map<Long, ZkConfigImpl> ZK_CONFIG_MAP = new ConcurrentHashMap<>();
private final static Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
private static final Map<Long, Map<String, TopicMetadata>> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
private final static Map<Long, Map<String, Properties>> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
private static final Map<Long, Map<String, Properties>> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
private final static Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
private static final Map<Long, Map<Integer, BrokerMetadata>> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
/**
* JXM连接, 延迟连接
*/
private final static Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
private static final Map<Long, Map<Integer, JmxConnectorWrap>> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
/**
* KafkaBroker版本, 延迟获取
@@ -398,7 +398,7 @@ public class PhysicalClusterMetadataManager {
KafkaBrokerRoleEnum roleEnum) {
BrokerMetadata brokerMetadata =
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
if (ValidateUtils.isNull(brokerMetadata)) {
if (brokerMetadata == null) {
return;
}
String hostname = brokerMetadata.getHost().replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, "");
@@ -438,7 +438,7 @@ public class PhysicalClusterMetadataManager {
KafkaBrokerRoleEnum roleEnum) {
BrokerMetadata brokerMetadata =
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
if (ValidateUtils.isNull(brokerMetadata)) {
if (brokerMetadata == null) {
return;
}