diff --git a/distribution/conf/application.yml.example b/distribution/conf/application.yml.example index 138a44fe..1278e3d2 100644 --- a/distribution/conf/application.yml.example +++ b/distribution/conf/application.yml.example @@ -121,4 +121,19 @@ notify: # 通知的功能 cluster-id: 95 # Topic的集群ID topic-name: didi-kafka-notify # Topic名称 order: # 部署的KM的地址 - detail-url: http://127.0.0.1 \ No newline at end of file + detail-url: http://127.0.0.1 + +thread-pool: + collect-metrics: + thread-num: 256 # 收集指标线程池大小 + queue-size: 5000 # 收集指标线程池的queue大小 + api-call: + thread-num: 16 # api服务线程池大小 + queue-size: 5000 # api服务线程池的queue大小 + +client-pool: + kafka-consumer: + min-idle-client-num: 24 # 最小空闲客户端数 + max-idle-client-num: 24 # 最大空闲客户端数 + max-total-client-num: 24 # 最大客户端数 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java index 4d69f914..463e9b1a 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java @@ -17,6 +17,10 @@ public class KafkaConstant { public static final String RETENTION_MS_KEY = "retention.ms"; + public static final String EXTERNAL_KEY = "EXTERNAL"; + + public static final String INTERNAL_KEY = "INTERNAL"; + private KafkaConstant() { } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java new file mode 100644 index 00000000..a16b32b4 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.kafka.manager.common.entity.ao.common; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IpPortData implements Serializable { + private static final long serialVersionUID = -428897032994630685L; + + private String ip; + + private String port; +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java index 3c179b4f..598784ca 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java @@ -1,6 +1,17 @@ package com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; +import com.xiaojukeji.kafka.manager.common.entity.ao.common.IpPortData; +import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @author zengqiao @@ -10,7 +21,7 @@ import java.util.List; * 节点结构: * { * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT"}, - * "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093"], + * "endpoints":["SASL_PLAINTEXT://127.0.0.1:9093"], * "jmx_port":9999, * "host":null, * "timestamp":"1546632983233", @@ -18,22 +29,48 @@ import java.util.List; * "version":4, * "rack": "CY" * } + * + * { + * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT","PLAINTEXT":"PLAINTEXT"}, + * "endpoints":["SASL_PLAINTEXT://127.0.0.1:9093","PLAINTEXT://127.0.0.1:9092"], + * "jmx_port":8099, + * "host":"127.0.0.1", + * "timestamp":"1628833925822", + * "port":9092, + * "version":4 + * } + * + * { + * "listener_security_protocol_map":{"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"}, + * "endpoints":["EXTERNAL://127.0.0.1:7092","INTERNAL://127.0.0.1:7093"], + * "jmx_port":8099, + * "host":null, + * "timestamp":"1627289710439", + * "port":-1, + * "version":4 + * } + * */ -public class BrokerMetadata implements Cloneable { +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class BrokerMetadata implements Serializable { + private static final long serialVersionUID = 3918113492423375809L; + private long clusterId; private int brokerId; private List endpoints; + // > + private Map endpointMap; + private String host; private int port; - /* - * ZK上对应的字段就是这个名字, 不要进行修改 - */ - private int jmx_port; + @JsonProperty("jmx_port") + private int jmxPort; private String version; @@ -41,91 +78,54 @@ public class BrokerMetadata implements Cloneable { private String rack; - public long getClusterId() { - return clusterId; + @JsonIgnore + public String getExternalHost() { + if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) { + return null; + } + return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp(); } - public void setClusterId(long clusterId) { - this.clusterId = clusterId; + @JsonIgnore + public String getInternalHost() { + if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) { + return null; + } + return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp(); } - public int getBrokerId() { - return brokerId; - } + public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) { + brokerMetadata.setEndpointMap(new HashMap<>()); - public void setBrokerId(int brokerId) { - this.brokerId = brokerId; - } + if (brokerMetadata.getEndpoints().isEmpty()) { + return; + } - public List getEndpoints() { - return endpoints; - } + // example EXTERNAL://10.179.162.202:7092 + for (String endpoint: brokerMetadata.getEndpoints()) { + int idx1 = endpoint.indexOf("://"); + int idx2 = endpoint.lastIndexOf(":"); + if (idx1 == -1 || idx2 == -1 || idx1 == idx2) { + continue; + } - public void setEndpoints(List endpoints) { - this.endpoints = endpoints; - } + String brokerHost = endpoint.substring(idx1 + "://".length(), idx2); + String brokerPort = endpoint.substring(idx2 + 1); - public String getHost() { - return host; - } + brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort)); - public void setHost(String host) { - this.host = host; - } + if (KafkaConstant.EXTERNAL_KEY.equals(endpoint.substring(0, idx1))) { + // 优先使用external的地址进行展示 + brokerMetadata.setHost(brokerHost); + brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort)); + } - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public int getJmxPort() { - return jmx_port; - } - - public void setJmxPort(int jmxPort) { - this.jmx_port = jmxPort; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public String getRack() { - return rack; - } - - public void setRack(String rack) { - this.rack = rack; - } - - @Override - public String toString() { - return "BrokerMetadata{" + - "clusterId=" + clusterId + - ", brokerId=" + brokerId + - ", endpoints=" + endpoints + - ", host='" + host + '\'' + - ", port=" + port + - ", jmxPort=" + jmx_port + - ", version='" + version + '\'' + - ", timestamp=" + timestamp + - ", rack='" + rack + '\'' + - '}'; + if (null == brokerMetadata.getHost()) { + brokerMetadata.setHost(brokerHost); + brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort)); + } + } } } + diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java index 56e17ae5..2e1e9e71 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java @@ -14,6 +14,8 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; import java.util.Map; import java.util.Properties; @@ -25,9 +27,22 @@ import java.util.concurrent.locks.ReentrantLock; * @author zengqiao * @date 19/12/24 */ +@Service public class KafkaClientPool { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class); + @Value(value = "${client-pool.kafka-consumer.min-idle-client-num:24}") + private Integer kafkaConsumerMinIdleClientNum; + + @Value(value = "${client-pool.kafka-consumer.max-idle-client-num:24}") + private Integer kafkaConsumerMaxIdleClientNum; + + @Value(value = "${client-pool.kafka-consumer.max-total-client-num:24}") + private Integer kafkaConsumerMaxTotalClientNum; + + @Value(value = "${client-pool.kafka-consumer.borrow-timeout-unit-ms:3000}") + private Integer kafkaConsumerBorrowTimeoutUnitMs; + /** * AdminClient */ @@ -84,7 +99,7 @@ public class KafkaClientPool { return true; } - private static void initKafkaConsumerPool(ClusterDO clusterDO) { + private void initKafkaConsumerPool(ClusterDO clusterDO) { lock.lock(); try { GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId()); @@ -92,9 +107,9 @@ public class KafkaClientPool { return; } GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>(); - config.setMaxIdle(24); - config.setMinIdle(24); - config.setMaxTotal(24); + config.setMaxIdle(kafkaConsumerMaxIdleClientNum); + config.setMinIdle(kafkaConsumerMinIdleClientNum); + config.setMaxTotal(kafkaConsumerMaxTotalClientNum); KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config)); } catch (Exception e) { LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e); @@ -118,7 +133,7 @@ public class KafkaClientPool { } } - public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { + public KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { if (ValidateUtils.isNull(clusterDO)) { return null; } @@ -132,7 +147,7 @@ public class KafkaClientPool { } try { - return objectPool.borrowObject(3000); + return objectPool.borrowObject(kafkaConsumerBorrowTimeoutUnitMs); } catch (Exception e) { LOGGER.error("borrow kafka consumer client failed, clusterDO:{}.", clusterDO, e); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index a04a4d87..47ab8b64 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -50,6 +50,9 @@ public class PhysicalClusterMetadataManager { @Autowired private ClusterService clusterService; + @Autowired + private ThreadPool threadPool; + private static final Map CLUSTER_MAP = new ConcurrentHashMap<>(); private static final Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>(); @@ -125,7 +128,7 @@ public class PhysicalClusterMetadataManager { zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener); //增加Topic监控 - TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig); + TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig, threadPool); topicListener.init(); zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java index f1b685cb..ba870465 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java @@ -1,37 +1,63 @@ package com.xiaojukeji.kafka.manager.service.cache; import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; -import java.util.concurrent.*; +import javax.annotation.PostConstruct; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author zengqiao * @date 20/8/24 */ +@Service public class ThreadPool { - private static final ExecutorService COLLECT_METRICS_THREAD_POOL = new ThreadPoolExecutor( - 256, - 256, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new DefaultThreadFactory("Collect-Metrics-Thread") - ); - private static final ExecutorService API_CALL_THREAD_POOL = new ThreadPoolExecutor( - 16, - 16, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new DefaultThreadFactory("Api-Call-Thread") - ); + @Value(value = "${thread-pool.collect-metrics.thread-num:256}") + private Integer collectMetricsThreadNum; - public static void submitCollectMetricsTask(Runnable collectMetricsTask) { - COLLECT_METRICS_THREAD_POOL.submit(collectMetricsTask); + @Value(value = "${thread-pool.collect-metrics.queue-size:10000}") + private Integer collectMetricsQueueSize; + + @Value(value = "${thread-pool.api-call.thread-num:16}") + private Integer apiCallThreadNum; + + @Value(value = "${thread-pool.api-call.queue-size:10000}") + private Integer apiCallQueueSize; + + private ThreadPoolExecutor collectMetricsThreadPool; + + private ThreadPoolExecutor apiCallThreadPool; + + @PostConstruct + public void init() { + collectMetricsThreadPool = new ThreadPoolExecutor( + collectMetricsThreadNum, + collectMetricsThreadNum, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(collectMetricsQueueSize), + new DefaultThreadFactory("TaskThreadPool") + ); + + apiCallThreadPool = new ThreadPoolExecutor( + apiCallThreadNum, + apiCallThreadNum, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(apiCallQueueSize), + new DefaultThreadFactory("ApiThreadPool") + ); } - public static void submitApiCallTask(Runnable apiCallTask) { - API_CALL_THREAD_POOL.submit(apiCallTask); + public void submitCollectMetricsTask(Long clusterId, Runnable collectMetricsTask) { + collectMetricsThreadPool.submit(collectMetricsTask); + } + + public void submitApiCallTask(Long clusterId, Runnable apiCallTask) { + apiCallThreadPool.submit(apiCallTask); } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java index 24eea55f..ac3e0593 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java @@ -61,6 +61,9 @@ public class BrokerServiceImpl implements BrokerService { @Autowired private PhysicalClusterMetadataManager physicalClusterMetadataManager; + @Autowired + private ThreadPool threadPool; + @Override public ClusterBrokerStatus getClusterBrokerStatus(Long clusterId) { // 副本同步状态 @@ -201,7 +204,7 @@ public class BrokerServiceImpl implements BrokerService { return getBrokerMetricsFromJmx(clusterId, brokerId, metricsCode); } }); - ThreadPool.submitApiCallTask(taskList[i]); + threadPool.submitApiCallTask(clusterId, taskList[i]); } List metricsList = new ArrayList<>(brokerIdSet.size()); for (int i = 0; i < brokerIdList.size(); i++) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java index 611dc203..d0f0c514 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java @@ -39,6 +39,9 @@ public class JmxServiceImpl implements JmxService { @Autowired private PhysicalClusterMetadataManager physicalClusterMetadataManager; + @Autowired + private ThreadPool threadPool; + @Override public BrokerMetrics getBrokerMetrics(Long clusterId, Integer brokerId, Integer metricsCode) { if (clusterId == null || brokerId == null || metricsCode == null) { @@ -98,7 +101,7 @@ public class JmxServiceImpl implements JmxService { ); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } List metricsList = new ArrayList<>(); @@ -303,7 +306,7 @@ public class JmxServiceImpl implements JmxService { return metricsList; } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } Map metricsMap = new HashMap<>(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 154faf77..aa4fe3fb 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -87,6 +87,9 @@ public class TopicServiceImpl implements TopicService { @Autowired private AbstractHealthScoreStrategy healthScoreStrategy; + @Autowired + private KafkaClientPool kafkaClientPool; + @Override public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { try { @@ -340,7 +343,7 @@ public class TopicServiceImpl implements TopicService { Map topicPartitionLongMap = new HashMap<>(); KafkaConsumer kafkaConsumer = null; try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); if ((offsetPosEnum.getCode() & OffsetPosEnum.END.getCode()) > 0) { topicPartitionLongMap = kafkaConsumer.endOffsets(topicPartitionList); } else if ((offsetPosEnum.getCode() & OffsetPosEnum.BEGINNING.getCode()) > 0) { @@ -541,7 +544,7 @@ public class TopicServiceImpl implements TopicService { List partitionOffsetDTOList = new ArrayList<>(); try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); Map offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch); if (offsetAndTimestampMap == null) { return new ArrayList<>(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java index d75dec5a..51295644 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java @@ -45,6 +45,9 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy { @Autowired private JmxService jmxService; + @Autowired + private ThreadPool threadPool; + @Override public Integer calBrokerHealthScore(Long clusterId, Integer brokerId) { BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId); @@ -125,7 +128,7 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy { return calBrokerHealthScore(clusterId, brokerId); } }); - ThreadPool.submitApiCallTask(taskList[i]); + threadPool.submitApiCallTask(clusterId, taskList[i]); } Integer topicHealthScore = HEALTH_SCORE_HEALTHY; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java index a94ec9de..f5cdefe8 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java @@ -74,15 +74,10 @@ public class BrokerStateListener implements StateChangeListener { BrokerMetadata brokerMetadata = null; try { brokerMetadata = zkConfig.get(ZkPathUtil.getBrokerIdNodePath(brokerId), BrokerMetadata.class); - if (!brokerMetadata.getEndpoints().isEmpty()) { - String endpoint = brokerMetadata.getEndpoints().get(0); - int idx = endpoint.indexOf("://"); - endpoint = endpoint.substring(idx + "://".length()); - idx = endpoint.indexOf(":"); - brokerMetadata.setHost(endpoint.substring(0, idx)); - brokerMetadata.setPort(Integer.parseInt(endpoint.substring(idx + 1))); - } + // 解析并更新本次存储的broker元信息 + BrokerMetadata.parseAndUpdateBrokerMetadata(brokerMetadata); + brokerMetadata.setClusterId(clusterId); brokerMetadata.setBrokerId(brokerId); PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxConfig); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java index 4314a101..6f3d33b3 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java @@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.service.cache.ThreadPool; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import java.util.HashSet; import java.util.List; @@ -28,9 +29,12 @@ public class TopicStateListener implements StateChangeListener { private ZkConfigImpl zkConfig; - public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig) { + private ThreadPool threadPool; + + public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig, ThreadPool threadPool) { this.clusterId = clusterId; this.zkConfig = zkConfig; + this.threadPool = threadPool; } @Override @@ -47,7 +51,7 @@ public class TopicStateListener implements StateChangeListener { return null; } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } } catch (Exception e) { LOGGER.error("init topics metadata failed, clusterId:{}.", clusterId, e); diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java index 5df7815e..07b0a3e3 100644 --- a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java @@ -42,6 +42,9 @@ public class ThirdPartServiceImpl implements ThirdPartService { @Autowired private ConsumerService consumerService; + @Autowired + private KafkaClientPool kafkaClientPool; + @Override public Result checkConsumeHealth(Long clusterId, String topicName, @@ -109,7 +112,7 @@ public class ThirdPartServiceImpl implements ThirdPartService { Long timestamp) { KafkaConsumer kafkaConsumer = null; try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); if (ValidateUtils.isNull(kafkaConsumer)) { return null; } diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java index cc67428f..28bb1612 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java @@ -44,6 +44,9 @@ public class CollectAndPublishCGData extends AbstractScheduledTask { @Autowired private ConsumerService consumerService; + @Autowired + private ThreadPool threadPool; + @Override protected List listAllTasks() { return clusterService.list(); @@ -82,7 +85,7 @@ public class CollectAndPublishCGData extends AbstractScheduledTask { return getTopicConsumerMetrics(clusterDO, topicName, startTimeUnitMs); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterDO.getId(), taskList[i]); } List consumerMetricsList = new ArrayList<>(); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java index a7d196af..54321240 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java @@ -32,6 +32,9 @@ public class FlushZKConsumerGroupMetadata { @Autowired private ClusterService clusterService; + @Autowired + private ThreadPool threadPool; + @Scheduled(cron="35 0/1 * * * ?") public void schedule() { List doList = clusterService.list(); @@ -95,7 +98,7 @@ public class FlushZKConsumerGroupMetadata { return new ArrayList<>(); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } Map> topicNameConsumerGroupMap = new HashMap<>(); diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 1d816604..19ba8593 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -102,3 +102,18 @@ notify: topic-name: didi-kafka-notify order: detail-url: http://127.0.0.1 + +thread-pool: + collect-metrics: + thread-num: 256 # 收集指标线程池大小 + queue-size: 5000 # 收集指标线程池的queue大小 + api-call: + thread-num: 16 # api服务线程池大小 + queue-size: 5000 # api服务线程池的queue大小 + +client-pool: + kafka-consumer: + min-idle-client-num: 24 # 最小空闲客户端数 + max-idle-client-num: 24 # 最大空闲客户端数 + max-total-client-num: 24 # 最大客户端数 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒