From c0f3259cf66b04b39324cc9197f4d2fd7a2e2c3a Mon Sep 17 00:00:00 2001 From: xuguang Date: Wed, 12 Jan 2022 19:56:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BA=BF=E7=A8=8B=E6=B1=A0?= =?UTF-8?q?=E3=80=81=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=B1=A0=E5=8F=AF=E9=85=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/cache/KafkaClientPool.java | 27 ++++++-- .../cache/PhysicalClusterMetadataManager.java | 5 +- .../manager/service/cache/ThreadPool.java | 68 +++++++++++++------ .../service/impl/BrokerServiceImpl.java | 5 +- .../service/service/impl/JmxServiceImpl.java | 7 +- .../service/impl/TopicServiceImpl.java | 7 +- .../healthscore/DidiHealthScoreStrategy.java | 5 +- .../service/zookeeper/TopicStateListener.java | 8 ++- .../openapi/impl/ThirdPartServiceImpl.java | 5 +- .../collect/CollectAndPublishCGData.java | 5 +- .../FlushZKConsumerGroupMetadata.java | 5 +- .../src/main/resources/application.yml | 15 ++++ 12 files changed, 123 insertions(+), 39 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java index 56e17ae5..2e1e9e71 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java @@ -14,6 +14,8 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; import java.util.Map; import java.util.Properties; @@ -25,9 +27,22 @@ import java.util.concurrent.locks.ReentrantLock; * @author zengqiao * @date 19/12/24 */ +@Service public class KafkaClientPool { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class); + @Value(value = "${client-pool.kafka-consumer.min-idle-client-num:24}") + private Integer kafkaConsumerMinIdleClientNum; + + @Value(value = "${client-pool.kafka-consumer.max-idle-client-num:24}") + private Integer kafkaConsumerMaxIdleClientNum; + + @Value(value = "${client-pool.kafka-consumer.max-total-client-num:24}") + private Integer kafkaConsumerMaxTotalClientNum; + + @Value(value = "${client-pool.kafka-consumer.borrow-timeout-unit-ms:3000}") + private Integer kafkaConsumerBorrowTimeoutUnitMs; + /** * AdminClient */ @@ -84,7 +99,7 @@ public class KafkaClientPool { return true; } - private static void initKafkaConsumerPool(ClusterDO clusterDO) { + private void initKafkaConsumerPool(ClusterDO clusterDO) { lock.lock(); try { GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId()); @@ -92,9 +107,9 @@ public class KafkaClientPool { return; } GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>(); - config.setMaxIdle(24); - config.setMinIdle(24); - config.setMaxTotal(24); + config.setMaxIdle(kafkaConsumerMaxIdleClientNum); + config.setMinIdle(kafkaConsumerMinIdleClientNum); + config.setMaxTotal(kafkaConsumerMaxTotalClientNum); KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config)); } catch (Exception e) { LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e); @@ -118,7 +133,7 @@ public class KafkaClientPool { } } - public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { + public KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { if (ValidateUtils.isNull(clusterDO)) { return null; } @@ -132,7 +147,7 @@ public class KafkaClientPool { } try { - return objectPool.borrowObject(3000); + return objectPool.borrowObject(kafkaConsumerBorrowTimeoutUnitMs); } catch (Exception e) { LOGGER.error("borrow kafka consumer client failed, clusterDO:{}.", clusterDO, e); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index c5f09820..79ecada1 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -50,6 +50,9 @@ public class PhysicalClusterMetadataManager { @Autowired private ClusterService clusterService; + @Autowired + private ThreadPool threadPool; + private static final Map CLUSTER_MAP = new ConcurrentHashMap<>(); private static final Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>(); @@ -125,7 +128,7 @@ public class PhysicalClusterMetadataManager { zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener); //增加Topic监控 - TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig); + TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig, threadPool); topicListener.init(); zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java index f1b685cb..34f94871 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java @@ -1,37 +1,63 @@ package com.xiaojukeji.kafka.manager.service.cache; import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; -import java.util.concurrent.*; +import javax.annotation.PostConstruct; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author zengqiao * @date 20/8/24 */ +@Service public class ThreadPool { - private static final ExecutorService COLLECT_METRICS_THREAD_POOL = new ThreadPoolExecutor( - 256, - 256, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new DefaultThreadFactory("Collect-Metrics-Thread") - ); - private static final ExecutorService API_CALL_THREAD_POOL = new ThreadPoolExecutor( - 16, - 16, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new DefaultThreadFactory("Api-Call-Thread") - ); + @Value(value = "${thread-pool.collect-metrics.thread-num:256}") + private Integer collectMetricsThreadNum; - public static void submitCollectMetricsTask(Runnable collectMetricsTask) { - COLLECT_METRICS_THREAD_POOL.submit(collectMetricsTask); + @Value(value = "${thread-pool.collect-metrics.queue-size:10000}") + private Integer collectMetricsQueueSize; + + @Value(value = "${thread-pool.api-call.thread-num:16}") + private Integer apiCallThreadNum; + + @Value(value = "${thread-pool.api-call.queue-size:10000}") + private Integer apiCallQueueSize; + + private ThreadPoolExecutor collectMetricsThreadPool; + + private ThreadPoolExecutor apiCallThreadPool; + + @PostConstruct + public void init() { + collectMetricsThreadPool = new ThreadPoolExecutor( + collectMetricsThreadNum, + collectMetricsThreadNum, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(collectMetricsQueueSize), + new DefaultThreadFactory("Collect-Metrics-Thread") + ); + + apiCallThreadPool = new ThreadPoolExecutor( + apiCallThreadNum, + apiCallThreadNum, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(apiCallQueueSize), + new DefaultThreadFactory("Api-Call-Thread") + ); } - public static void submitApiCallTask(Runnable apiCallTask) { - API_CALL_THREAD_POOL.submit(apiCallTask); + public void submitCollectMetricsTask(Long clusterId, Runnable collectMetricsTask) { + collectMetricsThreadPool.submit(collectMetricsTask); + } + + public void submitApiCallTask(Long clusterId, Runnable apiCallTask) { + apiCallThreadPool.submit(apiCallTask); } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java index 24eea55f..ac3e0593 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java @@ -61,6 +61,9 @@ public class BrokerServiceImpl implements BrokerService { @Autowired private PhysicalClusterMetadataManager physicalClusterMetadataManager; + @Autowired + private ThreadPool threadPool; + @Override public ClusterBrokerStatus getClusterBrokerStatus(Long clusterId) { // 副本同步状态 @@ -201,7 +204,7 @@ public class BrokerServiceImpl implements BrokerService { return getBrokerMetricsFromJmx(clusterId, brokerId, metricsCode); } }); - ThreadPool.submitApiCallTask(taskList[i]); + threadPool.submitApiCallTask(clusterId, taskList[i]); } List metricsList = new ArrayList<>(brokerIdSet.size()); for (int i = 0; i < brokerIdList.size(); i++) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java index 611dc203..d0f0c514 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java @@ -39,6 +39,9 @@ public class JmxServiceImpl implements JmxService { @Autowired private PhysicalClusterMetadataManager physicalClusterMetadataManager; + @Autowired + private ThreadPool threadPool; + @Override public BrokerMetrics getBrokerMetrics(Long clusterId, Integer brokerId, Integer metricsCode) { if (clusterId == null || brokerId == null || metricsCode == null) { @@ -98,7 +101,7 @@ public class JmxServiceImpl implements JmxService { ); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } List metricsList = new ArrayList<>(); @@ -303,7 +306,7 @@ public class JmxServiceImpl implements JmxService { return metricsList; } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } Map metricsMap = new HashMap<>(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 154faf77..aa4fe3fb 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -87,6 +87,9 @@ public class TopicServiceImpl implements TopicService { @Autowired private AbstractHealthScoreStrategy healthScoreStrategy; + @Autowired + private KafkaClientPool kafkaClientPool; + @Override public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { try { @@ -340,7 +343,7 @@ public class TopicServiceImpl implements TopicService { Map topicPartitionLongMap = new HashMap<>(); KafkaConsumer kafkaConsumer = null; try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); if ((offsetPosEnum.getCode() & OffsetPosEnum.END.getCode()) > 0) { topicPartitionLongMap = kafkaConsumer.endOffsets(topicPartitionList); } else if ((offsetPosEnum.getCode() & OffsetPosEnum.BEGINNING.getCode()) > 0) { @@ -541,7 +544,7 @@ public class TopicServiceImpl implements TopicService { List partitionOffsetDTOList = new ArrayList<>(); try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); Map offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch); if (offsetAndTimestampMap == null) { return new ArrayList<>(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java index d75dec5a..51295644 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java @@ -45,6 +45,9 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy { @Autowired private JmxService jmxService; + @Autowired + private ThreadPool threadPool; + @Override public Integer calBrokerHealthScore(Long clusterId, Integer brokerId) { BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId); @@ -125,7 +128,7 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy { return calBrokerHealthScore(clusterId, brokerId); } }); - ThreadPool.submitApiCallTask(taskList[i]); + threadPool.submitApiCallTask(clusterId, taskList[i]); } Integer topicHealthScore = HEALTH_SCORE_HEALTHY; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java index 4314a101..6f3d33b3 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java @@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.service.cache.ThreadPool; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import java.util.HashSet; import java.util.List; @@ -28,9 +29,12 @@ public class TopicStateListener implements StateChangeListener { private ZkConfigImpl zkConfig; - public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig) { + private ThreadPool threadPool; + + public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig, ThreadPool threadPool) { this.clusterId = clusterId; this.zkConfig = zkConfig; + this.threadPool = threadPool; } @Override @@ -47,7 +51,7 @@ public class TopicStateListener implements StateChangeListener { return null; } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } } catch (Exception e) { LOGGER.error("init topics metadata failed, clusterId:{}.", clusterId, e); diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java index 5df7815e..07b0a3e3 100644 --- a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java @@ -42,6 +42,9 @@ public class ThirdPartServiceImpl implements ThirdPartService { @Autowired private ConsumerService consumerService; + @Autowired + private KafkaClientPool kafkaClientPool; + @Override public Result checkConsumeHealth(Long clusterId, String topicName, @@ -109,7 +112,7 @@ public class ThirdPartServiceImpl implements ThirdPartService { Long timestamp) { KafkaConsumer kafkaConsumer = null; try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); if (ValidateUtils.isNull(kafkaConsumer)) { return null; } diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java index cc67428f..28bb1612 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java @@ -44,6 +44,9 @@ public class CollectAndPublishCGData extends AbstractScheduledTask { @Autowired private ConsumerService consumerService; + @Autowired + private ThreadPool threadPool; + @Override protected List listAllTasks() { return clusterService.list(); @@ -82,7 +85,7 @@ public class CollectAndPublishCGData extends AbstractScheduledTask { return getTopicConsumerMetrics(clusterDO, topicName, startTimeUnitMs); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterDO.getId(), taskList[i]); } List consumerMetricsList = new ArrayList<>(); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java index a7d196af..54321240 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java @@ -32,6 +32,9 @@ public class FlushZKConsumerGroupMetadata { @Autowired private ClusterService clusterService; + @Autowired + private ThreadPool threadPool; + @Scheduled(cron="35 0/1 * * * ?") public void schedule() { List doList = clusterService.list(); @@ -95,7 +98,7 @@ public class FlushZKConsumerGroupMetadata { return new ArrayList<>(); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } Map> topicNameConsumerGroupMap = new HashMap<>(); diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 4463d746..6b776773 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -96,3 +96,18 @@ notify: topic-name: didi-kafka-notify order: detail-url: http://127.0.0.1 + +thread-pool: + collect-metrics: + thread-num: 256 # 收集指标线程池大小 + queue-size: 5000 # 收集指标线程池的queue大小 + api-call: + thread-num: 16 # api服务线程池大小 + queue-size: 5000 # api服务线程池的queue大小 + +client-pool: + kafka-consumer: + min-idle-client-num: 24 # 最小空闲客户端数 + max-idle-client-num: 24 # 最大空闲客户端数 + max-total-client-num: 24 # 最大客户端数 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒