From 4df2dc09fea79e2238541f408202fef01d13ae56 Mon Sep 17 00:00:00 2001 From: xuguang Date: Wed, 12 Jan 2022 16:15:46 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9BrokerMetadata?= =?UTF-8?q?=E4=B8=ADendpoints=E4=B8=BAinternal|External=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=E7=9A=84=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-manager-common/pom.xml | 5 + .../common/constant/KafkaConstant.java | 4 + .../common/entity/ao/common/IpPortData.java | 18 ++ .../znode/brokers/BrokerMetadata.java | 160 +++++++++--------- .../zookeeper/BrokerStateListener.java | 11 +- 5 files changed, 110 insertions(+), 88 deletions(-) create mode 100644 kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java diff --git a/kafka-manager-common/pom.xml b/kafka-manager-common/pom.xml index 6a8ff0cb..c914ffeb 100644 --- a/kafka-manager-common/pom.xml +++ b/kafka-manager-common/pom.xml @@ -109,5 +109,10 @@ junit junit + + org.projectlombok + lombok + compile + \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java index 4d69f914..463e9b1a 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/KafkaConstant.java @@ -17,6 +17,10 @@ public class KafkaConstant { public static final String RETENTION_MS_KEY = "retention.ms"; + public static final String EXTERNAL_KEY = "EXTERNAL"; + + public static final String INTERNAL_KEY = "INTERNAL"; + private KafkaConstant() { } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java new file mode 100644 index 00000000..a16b32b4 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/common/IpPortData.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.kafka.manager.common.entity.ao.common; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IpPortData implements Serializable { + private static final long serialVersionUID = -428897032994630685L; + + private String ip; + + private String port; +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java index 3c179b4f..e4e5063d 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java @@ -1,6 +1,17 @@ package com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; +import com.xiaojukeji.kafka.manager.common.entity.ao.common.IpPortData; +import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @author zengqiao @@ -18,22 +29,48 @@ import java.util.List; * "version":4, * "rack": "CY" * } + * + * { + * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT","PLAINTEXT":"PLAINTEXT"}, + * "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093","PLAINTEXT://10.179.162.202:9092"], + * "jmx_port":8099, + * "host":"10.179.162.202", + * "timestamp":"1628833925822", + * "port":9092, + * "version":4 + * } + * + * { + * "listener_security_protocol_map":{"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"}, + * "endpoints":["EXTERNAL://10.179.162.202:7092","INTERNAL://10.179.162.202:7093"], + * "jmx_port":8099, + * "host":null, + * "timestamp":"1627289710439", + * "port":-1, + * "version":4 + * } + * */ -public class BrokerMetadata implements Cloneable { +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class BrokerMetadata implements Serializable { + private static final long serialVersionUID = 3918113492423375809L; + private long clusterId; private int brokerId; private List endpoints; + // > + private Map endpointMap; + private String host; private int port; - /* - * ZK上对应的字段就是这个名字, 不要进行修改 - */ - private int jmx_port; + @JsonProperty("jmx_port") + private int jmxPort; private String version; @@ -41,91 +78,54 @@ public class BrokerMetadata implements Cloneable { private String rack; - public long getClusterId() { - return clusterId; + @JsonIgnore + public String getExternalHost() { + if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) { + return null; + } + return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp(); } - public void setClusterId(long clusterId) { - this.clusterId = clusterId; + @JsonIgnore + public String getInternalHost() { + if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) { + return null; + } + return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp(); } - public int getBrokerId() { - return brokerId; - } + public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) { + brokerMetadata.setEndpointMap(new HashMap<>()); - public void setBrokerId(int brokerId) { - this.brokerId = brokerId; - } + if (brokerMetadata.getEndpoints().isEmpty()) { + return; + } - public List getEndpoints() { - return endpoints; - } + // example EXTERNAL://10.179.162.202:7092 + for (String endpoint: brokerMetadata.getEndpoints()) { + int idx1 = endpoint.indexOf("://"); + int idx2 = endpoint.lastIndexOf(":"); + if (idx1 == -1 || idx2 == -1 || idx1 == idx2) { + continue; + } - public void setEndpoints(List endpoints) { - this.endpoints = endpoints; - } + String brokerHost = endpoint.substring(idx1 + "://".length(), idx2); + String brokerPort = endpoint.substring(idx2 + 1); - public String getHost() { - return host; - } + brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort)); - public void setHost(String host) { - this.host = host; - } + if (KafkaConstant.EXTERNAL_KEY.equals(endpoint.substring(0, idx1))) { + // 优先使用external的地址进行展示 + brokerMetadata.setHost(brokerHost); + brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort)); + } - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public int getJmxPort() { - return jmx_port; - } - - public void setJmxPort(int jmxPort) { - this.jmx_port = jmxPort; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public String getRack() { - return rack; - } - - public void setRack(String rack) { - this.rack = rack; - } - - @Override - public String toString() { - return "BrokerMetadata{" + - "clusterId=" + clusterId + - ", brokerId=" + brokerId + - ", endpoints=" + endpoints + - ", host='" + host + '\'' + - ", port=" + port + - ", jmxPort=" + jmx_port + - ", version='" + version + '\'' + - ", timestamp=" + timestamp + - ", rack='" + rack + '\'' + - '}'; + if (null == brokerMetadata.getHost()) { + brokerMetadata.setHost(brokerHost); + brokerMetadata.setPort(NumberUtils.string2Integer(brokerPort)); + } + } } } + diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java index a94ec9de..f5cdefe8 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java @@ -74,15 +74,10 @@ public class BrokerStateListener implements StateChangeListener { BrokerMetadata brokerMetadata = null; try { brokerMetadata = zkConfig.get(ZkPathUtil.getBrokerIdNodePath(brokerId), BrokerMetadata.class); - if (!brokerMetadata.getEndpoints().isEmpty()) { - String endpoint = brokerMetadata.getEndpoints().get(0); - int idx = endpoint.indexOf("://"); - endpoint = endpoint.substring(idx + "://".length()); - idx = endpoint.indexOf(":"); - brokerMetadata.setHost(endpoint.substring(0, idx)); - brokerMetadata.setPort(Integer.parseInt(endpoint.substring(idx + 1))); - } + // 解析并更新本次存储的broker元信息 + BrokerMetadata.parseAndUpdateBrokerMetadata(brokerMetadata); + brokerMetadata.setClusterId(clusterId); brokerMetadata.setBrokerId(brokerId); PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxConfig); From c0f3259cf66b04b39324cc9197f4d2fd7a2e2c3a Mon Sep 17 00:00:00 2001 From: xuguang Date: Wed, 12 Jan 2022 19:56:37 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E3=80=81=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=B1=A0=E5=8F=AF?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/cache/KafkaClientPool.java | 27 ++++++-- .../cache/PhysicalClusterMetadataManager.java | 5 +- .../manager/service/cache/ThreadPool.java | 68 +++++++++++++------ .../service/impl/BrokerServiceImpl.java | 5 +- .../service/service/impl/JmxServiceImpl.java | 7 +- .../service/impl/TopicServiceImpl.java | 7 +- .../healthscore/DidiHealthScoreStrategy.java | 5 +- .../service/zookeeper/TopicStateListener.java | 8 ++- .../openapi/impl/ThirdPartServiceImpl.java | 5 +- .../collect/CollectAndPublishCGData.java | 5 +- .../FlushZKConsumerGroupMetadata.java | 5 +- .../src/main/resources/application.yml | 15 ++++ 12 files changed, 123 insertions(+), 39 deletions(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java index 56e17ae5..2e1e9e71 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java @@ -14,6 +14,8 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; import java.util.Map; import java.util.Properties; @@ -25,9 +27,22 @@ import java.util.concurrent.locks.ReentrantLock; * @author zengqiao * @date 19/12/24 */ +@Service public class KafkaClientPool { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class); + @Value(value = "${client-pool.kafka-consumer.min-idle-client-num:24}") + private Integer kafkaConsumerMinIdleClientNum; + + @Value(value = "${client-pool.kafka-consumer.max-idle-client-num:24}") + private Integer kafkaConsumerMaxIdleClientNum; + + @Value(value = "${client-pool.kafka-consumer.max-total-client-num:24}") + private Integer kafkaConsumerMaxTotalClientNum; + + @Value(value = "${client-pool.kafka-consumer.borrow-timeout-unit-ms:3000}") + private Integer kafkaConsumerBorrowTimeoutUnitMs; + /** * AdminClient */ @@ -84,7 +99,7 @@ public class KafkaClientPool { return true; } - private static void initKafkaConsumerPool(ClusterDO clusterDO) { + private void initKafkaConsumerPool(ClusterDO clusterDO) { lock.lock(); try { GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId()); @@ -92,9 +107,9 @@ public class KafkaClientPool { return; } GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>(); - config.setMaxIdle(24); - config.setMinIdle(24); - config.setMaxTotal(24); + config.setMaxIdle(kafkaConsumerMaxIdleClientNum); + config.setMinIdle(kafkaConsumerMinIdleClientNum); + config.setMaxTotal(kafkaConsumerMaxTotalClientNum); KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config)); } catch (Exception e) { LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e); @@ -118,7 +133,7 @@ public class KafkaClientPool { } } - public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { + public KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) { if (ValidateUtils.isNull(clusterDO)) { return null; } @@ -132,7 +147,7 @@ public class KafkaClientPool { } try { - return objectPool.borrowObject(3000); + return objectPool.borrowObject(kafkaConsumerBorrowTimeoutUnitMs); } catch (Exception e) { LOGGER.error("borrow kafka consumer client failed, clusterDO:{}.", clusterDO, e); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index c5f09820..79ecada1 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -50,6 +50,9 @@ public class PhysicalClusterMetadataManager { @Autowired private ClusterService clusterService; + @Autowired + private ThreadPool threadPool; + private static final Map CLUSTER_MAP = new ConcurrentHashMap<>(); private static final Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>(); @@ -125,7 +128,7 @@ public class PhysicalClusterMetadataManager { zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener); //增加Topic监控 - TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig); + TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig, threadPool); topicListener.init(); zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java index f1b685cb..34f94871 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java @@ -1,37 +1,63 @@ package com.xiaojukeji.kafka.manager.service.cache; import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; -import java.util.concurrent.*; +import javax.annotation.PostConstruct; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author zengqiao * @date 20/8/24 */ +@Service public class ThreadPool { - private static final ExecutorService COLLECT_METRICS_THREAD_POOL = new ThreadPoolExecutor( - 256, - 256, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new DefaultThreadFactory("Collect-Metrics-Thread") - ); - private static final ExecutorService API_CALL_THREAD_POOL = new ThreadPoolExecutor( - 16, - 16, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new DefaultThreadFactory("Api-Call-Thread") - ); + @Value(value = "${thread-pool.collect-metrics.thread-num:256}") + private Integer collectMetricsThreadNum; - public static void submitCollectMetricsTask(Runnable collectMetricsTask) { - COLLECT_METRICS_THREAD_POOL.submit(collectMetricsTask); + @Value(value = "${thread-pool.collect-metrics.queue-size:10000}") + private Integer collectMetricsQueueSize; + + @Value(value = "${thread-pool.api-call.thread-num:16}") + private Integer apiCallThreadNum; + + @Value(value = "${thread-pool.api-call.queue-size:10000}") + private Integer apiCallQueueSize; + + private ThreadPoolExecutor collectMetricsThreadPool; + + private ThreadPoolExecutor apiCallThreadPool; + + @PostConstruct + public void init() { + collectMetricsThreadPool = new ThreadPoolExecutor( + collectMetricsThreadNum, + collectMetricsThreadNum, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(collectMetricsQueueSize), + new DefaultThreadFactory("Collect-Metrics-Thread") + ); + + apiCallThreadPool = new ThreadPoolExecutor( + apiCallThreadNum, + apiCallThreadNum, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(apiCallQueueSize), + new DefaultThreadFactory("Api-Call-Thread") + ); } - public static void submitApiCallTask(Runnable apiCallTask) { - API_CALL_THREAD_POOL.submit(apiCallTask); + public void submitCollectMetricsTask(Long clusterId, Runnable collectMetricsTask) { + collectMetricsThreadPool.submit(collectMetricsTask); + } + + public void submitApiCallTask(Long clusterId, Runnable apiCallTask) { + apiCallThreadPool.submit(apiCallTask); } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java index 24eea55f..ac3e0593 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java @@ -61,6 +61,9 @@ public class BrokerServiceImpl implements BrokerService { @Autowired private PhysicalClusterMetadataManager physicalClusterMetadataManager; + @Autowired + private ThreadPool threadPool; + @Override public ClusterBrokerStatus getClusterBrokerStatus(Long clusterId) { // 副本同步状态 @@ -201,7 +204,7 @@ public class BrokerServiceImpl implements BrokerService { return getBrokerMetricsFromJmx(clusterId, brokerId, metricsCode); } }); - ThreadPool.submitApiCallTask(taskList[i]); + threadPool.submitApiCallTask(clusterId, taskList[i]); } List metricsList = new ArrayList<>(brokerIdSet.size()); for (int i = 0; i < brokerIdList.size(); i++) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java index 611dc203..d0f0c514 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java @@ -39,6 +39,9 @@ public class JmxServiceImpl implements JmxService { @Autowired private PhysicalClusterMetadataManager physicalClusterMetadataManager; + @Autowired + private ThreadPool threadPool; + @Override public BrokerMetrics getBrokerMetrics(Long clusterId, Integer brokerId, Integer metricsCode) { if (clusterId == null || brokerId == null || metricsCode == null) { @@ -98,7 +101,7 @@ public class JmxServiceImpl implements JmxService { ); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } List metricsList = new ArrayList<>(); @@ -303,7 +306,7 @@ public class JmxServiceImpl implements JmxService { return metricsList; } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } Map metricsMap = new HashMap<>(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 154faf77..aa4fe3fb 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -87,6 +87,9 @@ public class TopicServiceImpl implements TopicService { @Autowired private AbstractHealthScoreStrategy healthScoreStrategy; + @Autowired + private KafkaClientPool kafkaClientPool; + @Override public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) { try { @@ -340,7 +343,7 @@ public class TopicServiceImpl implements TopicService { Map topicPartitionLongMap = new HashMap<>(); KafkaConsumer kafkaConsumer = null; try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); if ((offsetPosEnum.getCode() & OffsetPosEnum.END.getCode()) > 0) { topicPartitionLongMap = kafkaConsumer.endOffsets(topicPartitionList); } else if ((offsetPosEnum.getCode() & OffsetPosEnum.BEGINNING.getCode()) > 0) { @@ -541,7 +544,7 @@ public class TopicServiceImpl implements TopicService { List partitionOffsetDTOList = new ArrayList<>(); try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); Map offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch); if (offsetAndTimestampMap == null) { return new ArrayList<>(); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java index d75dec5a..51295644 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java @@ -45,6 +45,9 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy { @Autowired private JmxService jmxService; + @Autowired + private ThreadPool threadPool; + @Override public Integer calBrokerHealthScore(Long clusterId, Integer brokerId) { BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId); @@ -125,7 +128,7 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy { return calBrokerHealthScore(clusterId, brokerId); } }); - ThreadPool.submitApiCallTask(taskList[i]); + threadPool.submitApiCallTask(clusterId, taskList[i]); } Integer topicHealthScore = HEALTH_SCORE_HEALTHY; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java index 4314a101..6f3d33b3 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java @@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.service.cache.ThreadPool; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import java.util.HashSet; import java.util.List; @@ -28,9 +29,12 @@ public class TopicStateListener implements StateChangeListener { private ZkConfigImpl zkConfig; - public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig) { + private ThreadPool threadPool; + + public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig, ThreadPool threadPool) { this.clusterId = clusterId; this.zkConfig = zkConfig; + this.threadPool = threadPool; } @Override @@ -47,7 +51,7 @@ public class TopicStateListener implements StateChangeListener { return null; } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } } catch (Exception e) { LOGGER.error("init topics metadata failed, clusterId:{}.", clusterId, e); diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java index 5df7815e..07b0a3e3 100644 --- a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java @@ -42,6 +42,9 @@ public class ThirdPartServiceImpl implements ThirdPartService { @Autowired private ConsumerService consumerService; + @Autowired + private KafkaClientPool kafkaClientPool; + @Override public Result checkConsumeHealth(Long clusterId, String topicName, @@ -109,7 +112,7 @@ public class ThirdPartServiceImpl implements ThirdPartService { Long timestamp) { KafkaConsumer kafkaConsumer = null; try { - kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO); if (ValidateUtils.isNull(kafkaConsumer)) { return null; } diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java index cc67428f..28bb1612 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java @@ -44,6 +44,9 @@ public class CollectAndPublishCGData extends AbstractScheduledTask { @Autowired private ConsumerService consumerService; + @Autowired + private ThreadPool threadPool; + @Override protected List listAllTasks() { return clusterService.list(); @@ -82,7 +85,7 @@ public class CollectAndPublishCGData extends AbstractScheduledTask { return getTopicConsumerMetrics(clusterDO, topicName, startTimeUnitMs); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterDO.getId(), taskList[i]); } List consumerMetricsList = new ArrayList<>(); diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java index a7d196af..54321240 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java @@ -32,6 +32,9 @@ public class FlushZKConsumerGroupMetadata { @Autowired private ClusterService clusterService; + @Autowired + private ThreadPool threadPool; + @Scheduled(cron="35 0/1 * * * ?") public void schedule() { List doList = clusterService.list(); @@ -95,7 +98,7 @@ public class FlushZKConsumerGroupMetadata { return new ArrayList<>(); } }); - ThreadPool.submitCollectMetricsTask(taskList[i]); + threadPool.submitCollectMetricsTask(clusterId, taskList[i]); } Map> topicNameConsumerGroupMap = new HashMap<>(); diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 4463d746..6b776773 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -96,3 +96,18 @@ notify: topic-name: didi-kafka-notify order: detail-url: http://127.0.0.1 + +thread-pool: + collect-metrics: + thread-num: 256 # 收集指标线程池大小 + queue-size: 5000 # 收集指标线程池的queue大小 + api-call: + thread-num: 16 # api服务线程池大小 + queue-size: 5000 # api服务线程池的queue大小 + +client-pool: + kafka-consumer: + min-idle-client-num: 24 # 最小空闲客户端数 + max-idle-client-num: 24 # 最大空闲客户端数 + max-total-client-num: 24 # 最大客户端数 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 From 52ccaeffd59ec8a27fcf28a67a35e7cc22c4fa51 Mon Sep 17 00:00:00 2001 From: xuguang Date: Thu, 13 Jan 2022 11:48:43 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=BE=9D=E8=B5=96?= =?UTF-8?q?=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-manager-common/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka-manager-common/pom.xml b/kafka-manager-common/pom.xml index 6a8ff0cb..c914ffeb 100644 --- a/kafka-manager-common/pom.xml +++ b/kafka-manager-common/pom.xml @@ -109,5 +109,10 @@ junit junit + + org.projectlombok + lombok + compile + \ No newline at end of file From 9e3bc80495d360b8c10f0605f84126f827e1fc54 Mon Sep 17 00:00:00 2001 From: xuguang Date: Thu, 13 Jan 2022 15:35:11 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=20&=20BrokerMet?= =?UTF-8?q?adata=20=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- distribution/conf/application.yml.example | 17 ++++++++++++++++- .../zookeeper/znode/brokers/BrokerMetadata.java | 8 ++++---- .../kafka/manager/service/cache/ThreadPool.java | 4 ++-- .../src/main/resources/application.yml | 6 +++--- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/distribution/conf/application.yml.example b/distribution/conf/application.yml.example index 138a44fe..d4d57777 100644 --- a/distribution/conf/application.yml.example +++ b/distribution/conf/application.yml.example @@ -121,4 +121,19 @@ notify: # 通知的功能 cluster-id: 95 # Topic的集群ID topic-name: didi-kafka-notify # Topic名称 order: # 部署的KM的地址 - detail-url: http://127.0.0.1 \ No newline at end of file + detail-url: http://127.0.0.1 + +thread-pool: + collect-metrics: + thread-num: 256 # 收集指标线程池大小 + queue-size: 5000 # 收集指标线程池的queue大小 + api-call: + thread-num: 16 # api服务线程池大小 + queue-size: 5000 # api服务线程池的queue大小 + +client-pool: + kafka-consumer: + min-idle-client-num: 24 # 最小空闲客户端数 + max-idle-client-num: 24 # 最大空闲客户端数 + max-total-client-num: 24 # 最大客户端数 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java index e4e5063d..598784ca 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/zookeeper/znode/brokers/BrokerMetadata.java @@ -21,7 +21,7 @@ import java.util.Map; * 节点结构: * { * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT"}, - * "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093"], + * "endpoints":["SASL_PLAINTEXT://127.0.0.1:9093"], * "jmx_port":9999, * "host":null, * "timestamp":"1546632983233", @@ -32,9 +32,9 @@ import java.util.Map; * * { * "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT","PLAINTEXT":"PLAINTEXT"}, - * "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093","PLAINTEXT://10.179.162.202:9092"], + * "endpoints":["SASL_PLAINTEXT://127.0.0.1:9093","PLAINTEXT://127.0.0.1:9092"], * "jmx_port":8099, - * "host":"10.179.162.202", + * "host":"127.0.0.1", * "timestamp":"1628833925822", * "port":9092, * "version":4 @@ -42,7 +42,7 @@ import java.util.Map; * * { * "listener_security_protocol_map":{"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"}, - * "endpoints":["EXTERNAL://10.179.162.202:7092","INTERNAL://10.179.162.202:7093"], + * "endpoints":["EXTERNAL://127.0.0.1:7092","INTERNAL://127.0.0.1:7093"], * "jmx_port":8099, * "host":null, * "timestamp":"1627289710439", diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java index 34f94871..ba870465 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java @@ -40,7 +40,7 @@ public class ThreadPool { 120L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(collectMetricsQueueSize), - new DefaultThreadFactory("Collect-Metrics-Thread") + new DefaultThreadFactory("TaskThreadPool") ); apiCallThreadPool = new ThreadPoolExecutor( @@ -49,7 +49,7 @@ public class ThreadPool { 120L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(apiCallQueueSize), - new DefaultThreadFactory("Api-Call-Thread") + new DefaultThreadFactory("ApiThreadPool") ); } diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 6f297554..0bfa8972 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -105,15 +105,15 @@ notify: thread-pool: collect-metrics: - thread-num: 256 # 收集指标线程池大小 + thread-num: 256 # 收集指标线程池大小 queue-size: 5000 # 收集指标线程池的queue大小 api-call: - thread-num: 16 # api服务线程池大小 + thread-num: 16 # api服务线程池大小 queue-size: 5000 # api服务线程池的queue大小 client-pool: kafka-consumer: - min-idle-client-num: 24 # 最小空闲客户端数 + min-idle-client-num: 24 # 最小空闲客户端数 max-idle-client-num: 24 # 最大空闲客户端数 max-total-client-num: 24 # 最大客户端数 borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 From 373680d85466e56de6aabf75b6f32641f46209c9 Mon Sep 17 00:00:00 2001 From: xuguang Date: Thu, 13 Jan 2022 15:39:39 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=20&=20BrokerMet?= =?UTF-8?q?adata=20=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka-manager-common/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka-manager-common/pom.xml b/kafka-manager-common/pom.xml index c914ffeb..f6c33def 100644 --- a/kafka-manager-common/pom.xml +++ b/kafka-manager-common/pom.xml @@ -109,6 +109,7 @@ junit junit + org.projectlombok lombok From fe0f6fcd0b3f15aff72fa0524f93e26fe45a401b Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 13 Jan 2022 16:02:33 +0800 Subject: [PATCH 6/6] fix config incorrectly comment --- distribution/conf/application.yml.example | 2 +- kafka-manager-web/src/main/resources/application.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distribution/conf/application.yml.example b/distribution/conf/application.yml.example index d4d57777..1278e3d2 100644 --- a/distribution/conf/application.yml.example +++ b/distribution/conf/application.yml.example @@ -136,4 +136,4 @@ client-pool: min-idle-client-num: 24 # 最小空闲客户端数 max-idle-client-num: 24 # 最大空闲客户端数 max-total-client-num: 24 # 最大客户端数 - borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 \ No newline at end of file + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒 \ No newline at end of file diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 0bfa8972..19ba8593 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -116,4 +116,4 @@ client-pool: min-idle-client-num: 24 # 最小空闲客户端数 max-idle-client-num: 24 # 最大空闲客户端数 max-total-client-num: 24 # 最大客户端数 - borrow-timeout-unit-ms: 3000 # 租借超时时间,单位秒 + borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒