From c3d47d309336bf7ebd04da7b2ee2f0cbc9ff98f5 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 6 Dec 2022 16:46:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B1=A0=E5=8C=96KafkaAdminClient=EF=BC=8C?= =?UTF-8?q?=E9=81=BF=E5=85=8DKafkaAdminClient=E5=87=BA=E7=8E=B0=E6=80=A7?= =?UTF-8?q?=E8=83=BD=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../persistence/kafka/KafkaAdminClient.java | 79 ++++++++++++++----- km-rest/src/main/resources/application.yml | 3 +- 2 files changed, 60 insertions(+), 22 deletions(-) diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java index 46fb1dae..20447798 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java @@ -1,31 +1,39 @@ package com.xiaojukeji.know.streaming.km.persistence.kafka; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.persistence.AbstractClusterLoadedChangedHandler; import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; -import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; -@Slf4j @Component public class KafkaAdminClient extends AbstractClusterLoadedChangedHandler { - private static final Map KAFKA_ADMIN_CLIENT_MAP = new ConcurrentHashMap<>(); + private static final ILog LOGGER = LogFactory.getLog(KafkaAdminClient.class); + + @Value("${client-pool.kafka-admin.client-cnt:1}") + private Integer clientCnt; + + private static final Map> KAFKA_ADMIN_CLIENT_MAP = new ConcurrentHashMap<>(); public AdminClient getClient(Long clusterPhyId) throws NotExistException { - AdminClient adminClient = KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId); - if (adminClient != null) { - return adminClient; + List adminClientList = KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId); + if (adminClientList != null) { + return adminClientList.get((int)(System.currentTimeMillis() % clientCnt)); } - adminClient = this.createKafkaAdminClient(clusterPhyId); + AdminClient adminClient = this.createKafkaAdminClient(clusterPhyId); if (adminClient == null) { throw new NotExistException("kafka admin-client not exist due to create failed"); } @@ -61,18 +69,20 @@ public class KafkaAdminClient extends AbstractClusterLoadedChangedHandler { try { modifyClientMapLock.lock(); - AdminClient adminClient = KAFKA_ADMIN_CLIENT_MAP.remove(clusterPhyId); - if (adminClient == null) { + List adminClientList = KAFKA_ADMIN_CLIENT_MAP.remove(clusterPhyId); + if (adminClientList == null) { return; } - log.info("close kafka AdminClient starting, clusterPhyId:{}", clusterPhyId); + LOGGER.info("close kafka AdminClient starting, clusterPhyId:{}", clusterPhyId); - adminClient.close(); + boolean allSuccess = this.closeAdminClientList(adminClientList); - log.info("close kafka AdminClient success, clusterPhyId:{}", clusterPhyId); + if (allSuccess) { + LOGGER.info("close kafka AdminClient success, clusterPhyId:{}", clusterPhyId); + } } catch (Exception e) { - log.error("close kafka AdminClient failed, clusterPhyId:{}", clusterPhyId, e); + LOGGER.error("close kafka AdminClient failed, clusterPhyId:{}", clusterPhyId, e); } finally { modifyClientMapLock.unlock(); } @@ -88,29 +98,56 @@ public class KafkaAdminClient extends AbstractClusterLoadedChangedHandler { } private AdminClient createKafkaAdminClient(Long clusterPhyId, String bootstrapServers, Properties props) { + List adminClientList = null; try { modifyClientMapLock.lock(); - AdminClient adminClient = KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId); - if (adminClient != null) { - return adminClient; + adminClientList = KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId); + if (adminClientList != null) { + return adminClientList.get((int)(System.currentTimeMillis() % clientCnt)); } - log.debug("create kafka AdminClient starting, clusterPhyId:{} props:{}", clusterPhyId, props); + LOGGER.debug("create kafka AdminClient starting, clusterPhyId:{} props:{}", clusterPhyId, props); if (props == null) { props = new Properties(); } props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - KAFKA_ADMIN_CLIENT_MAP.put(clusterPhyId, AdminClient.create(props)); - log.info("create kafka AdminClient success, clusterPhyId:{}", clusterPhyId); + adminClientList = new ArrayList<>(); + for (int i = 0; i < clientCnt; ++i) { + adminClientList.add(AdminClient.create(props)); + } + + KAFKA_ADMIN_CLIENT_MAP.put(clusterPhyId, adminClientList); + + LOGGER.info("create kafka AdminClient success, clusterPhyId:{}", clusterPhyId); } catch (Exception e) { - log.error("create kafka AdminClient failed, clusterPhyId:{} props:{}", clusterPhyId, props, e); + LOGGER.error("create kafka AdminClient failed, clusterPhyId:{} props:{}", clusterPhyId, props, e); + + this.closeAdminClientList(adminClientList); } finally { modifyClientMapLock.unlock(); } - return KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId); + return KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId).get((int)(System.currentTimeMillis() % clientCnt)); + } + + private boolean closeAdminClientList(List adminClientList) { + if (adminClientList == null) { + return true; + } + + boolean allSuccess = true; + for (AdminClient adminClient: adminClientList) { + try { + adminClient.close(); + } catch (Exception e) { + // ignore + allSuccess = false; + } + } + + return allSuccess; } } diff --git a/km-rest/src/main/resources/application.yml b/km-rest/src/main/resources/application.yml index 3b01022e..c9753b7e 100644 --- a/km-rest/src/main/resources/application.yml +++ b/km-rest/src/main/resources/application.yml @@ -78,7 +78,8 @@ client-pool: max-idle-client-num: 20 # 最大空闲客户端数 max-total-client-num: 20 # 最大客户端数 borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒 - + kafka-admin: + client-cnt: 1 # 每个Kafka集群创建的KafkaAdminClient数 # ES客户端配置 es: