diff --git a/kafka-manager-console/src/container/user-center/order-list.tsx b/kafka-manager-console/src/container/user-center/order-list.tsx
index 6c81b0ec..5ed5b961 100644
--- a/kafka-manager-console/src/container/user-center/order-list.tsx
+++ b/kafka-manager-console/src/container/user-center/order-list.tsx
@@ -115,11 +115,19 @@ export class OrderList extends SearchAndFilterContainer {
status,
{
title: '申请时间',
- dataIndex: 'gmtTime',
- key: 'gmtTime',
- sorter: (a: IBaseOrder, b: IBaseOrder) => b.gmtTime - a.gmtTime,
- render: (t: number) => moment(t).format(timeFormat),
- }, {
+ dataIndex: 'gmtCreate',
+ key: 'gmtCreate',
+ sorter: (a: IBaseOrder, b: IBaseOrder) => b.gmtCreate - a.gmtCreate,
+ render: (t: number) => t ? moment(t).format(timeFormat) : '-',
+ },
+ {
+ title: '审批时间',
+ dataIndex: 'gmtHandle',
+ key: 'gmtHandle',
+ sorter: (a: IBaseOrder, b: IBaseOrder) => b.gmtHandle - a.gmtHandle,
+ render: (t: number) => t ? moment(t).format(timeFormat) : '-',
+ },
+ {
title: '操作',
key: 'operation',
dataIndex: 'operation',
diff --git a/kafka-manager-console/src/routers/index.htm b/kafka-manager-console/src/routers/index.htm
index b8d8454f..7cb7a0fd 100644
--- a/kafka-manager-console/src/routers/index.htm
+++ b/kafka-manager-console/src/routers/index.htm
@@ -1,12 +1,15 @@
+
-
KafkaManager
+
LogiKM
+
+
\ No newline at end of file
diff --git a/kafka-manager-console/src/store/curve-info.ts b/kafka-manager-console/src/store/curve-info.ts
index 9531c849..fc4c57a9 100644
--- a/kafka-manager-console/src/store/curve-info.ts
+++ b/kafka-manager-console/src/store/curve-info.ts
@@ -1,6 +1,6 @@
import { observable, action } from 'mobx';
import moment = require('moment');
-import { EChartOption } from 'echarts/lib/echarts';
+import { EChartsOption } from 'echarts';
import { ICurve } from 'container/common-curve/config';
import { curveKeys, PERIOD_RADIO_MAP } from 'container/admin/data-curve/config';
import { timeFormat } from 'constants/strategy';
@@ -13,7 +13,7 @@ class CurveInfo {
public timeRange: [moment.Moment, moment.Moment] = PERIOD_RADIO_MAP.get(this.periodKey).dateRange;
@observable
- public curveData: { [key: string]: EChartOption } = {};
+ public curveData: { [key: string]: EChartsOption } = {};
@observable
public curveLoading: { [key: string]: boolean } = {};
@@ -25,7 +25,7 @@ class CurveInfo {
public currentOperator: string;
@action.bound
- public setCurveData(key: curveKeys | string, data: EChartOption) {
+ public setCurveData(key: curveKeys | string, data: EChartsOption) {
this.curveData[key] = data;
}
@@ -59,7 +59,7 @@ class CurveInfo {
public getCommonCurveData = (
options: ICurve,
- parser: (option: ICurve, data: any[]) => EChartOption,
+ parser: (option: ICurve, data: any[]) => EChartsOption,
reload?: boolean) => {
const { path } = options;
this.setCurveData(path, null);
diff --git a/kafka-manager-console/webpack.config.js b/kafka-manager-console/webpack.config.js
index a07d9990..d6d12fa8 100644
--- a/kafka-manager-console/webpack.config.js
+++ b/kafka-manager-console/webpack.config.js
@@ -122,11 +122,11 @@ module.exports = {
},
},
devServer: {
- contentBase: outPath,
+ // contentBase: outPath,
host: '127.0.0.1',
port: 1025,
hot: true,
- disableHostCheck: true,
+ // disableHostCheck: true,
historyApiFallback: true,
proxy: {
'/api/v1/': {
diff --git a/kafka-manager-core/pom.xml b/kafka-manager-core/pom.xml
index 81675a43..e00663ed 100644
--- a/kafka-manager-core/pom.xml
+++ b/kafka-manager-core/pom.xml
@@ -24,7 +24,6 @@
1.8
UTF-8
UTF-8
-
5.1.3.RELEASE
@@ -38,12 +37,10 @@
org.springframework
spring-web
- ${spring-version}
org.springframework
spring-test
- ${spring-version}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java
index 41fd0092..3fd6aaac 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ConsumerMetadataCache.java
@@ -17,6 +17,9 @@ public class ConsumerMetadataCache {
private static final Map CG_METADATA_IN_BK_MAP = new ConcurrentHashMap<>();
+ private ConsumerMetadataCache() {
+ }
+
public static void putConsumerMetadataInZK(Long clusterId, ConsumerMetadata consumerMetadata) {
if (clusterId == null || consumerMetadata == null) {
return;
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java
index 921b13ba..2e1e9e71 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaClientPool.java
@@ -1,7 +1,7 @@
package com.xiaojukeji.kafka.manager.service.cache;
-import com.alibaba.fastjson.JSONObject;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
+import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.factory.KafkaConsumerFactory;
import kafka.admin.AdminClient;
@@ -14,6 +14,8 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.Properties;
@@ -25,20 +27,36 @@ import java.util.concurrent.locks.ReentrantLock;
* @author zengqiao
* @date 19/12/24
*/
+@Service
public class KafkaClientPool {
- private final static Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClientPool.class);
+
+ @Value(value = "${client-pool.kafka-consumer.min-idle-client-num:24}")
+ private Integer kafkaConsumerMinIdleClientNum;
+
+ @Value(value = "${client-pool.kafka-consumer.max-idle-client-num:24}")
+ private Integer kafkaConsumerMaxIdleClientNum;
+
+ @Value(value = "${client-pool.kafka-consumer.max-total-client-num:24}")
+ private Integer kafkaConsumerMaxTotalClientNum;
+
+ @Value(value = "${client-pool.kafka-consumer.borrow-timeout-unit-ms:3000}")
+ private Integer kafkaConsumerBorrowTimeoutUnitMs;
/**
* AdminClient
*/
- private static Map AdminClientMap = new ConcurrentHashMap<>();
+ private static final Map ADMIN_CLIENT_MAP = new ConcurrentHashMap<>();
- private static Map> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
+ private static final Map> KAFKA_PRODUCER_MAP = new ConcurrentHashMap<>();
- private static Map> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
+ private static final Map>> KAFKA_CONSUMER_POOL = new ConcurrentHashMap<>();
private static ReentrantLock lock = new ReentrantLock();
+ private KafkaClientPool() {
+ }
+
private static void initKafkaProducerMap(Long clusterId) {
ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterId);
if (clusterDO == null) {
@@ -55,7 +73,7 @@ public class KafkaClientPool {
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
- KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer(properties));
+ KAFKA_PRODUCER_MAP.put(clusterId, new KafkaProducer<>(properties));
} catch (Exception e) {
LOGGER.error("create kafka producer failed, clusterDO:{}.", clusterDO, e);
} finally {
@@ -77,25 +95,22 @@ public class KafkaClientPool {
if (ValidateUtils.isNull(kafkaProducer)) {
return false;
}
- kafkaProducer.send(new ProducerRecord(topicName, data));
+ kafkaProducer.send(new ProducerRecord<>(topicName, data));
return true;
}
- private static void initKafkaConsumerPool(ClusterDO clusterDO) {
+ private void initKafkaConsumerPool(ClusterDO clusterDO) {
lock.lock();
try {
- GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
+ GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
if (objectPool != null) {
return;
}
- GenericObjectPoolConfig config = new GenericObjectPoolConfig();
- config.setMaxIdle(24);
- config.setMinIdle(24);
- config.setMaxTotal(24);
- KAFKA_CONSUMER_POOL.put(
- clusterDO.getId(),
- new GenericObjectPool(new KafkaConsumerFactory(clusterDO), config)
- );
+ GenericObjectPoolConfig> config = new GenericObjectPoolConfig<>();
+ config.setMaxIdle(kafkaConsumerMaxIdleClientNum);
+ config.setMinIdle(kafkaConsumerMinIdleClientNum);
+ config.setMaxTotal(kafkaConsumerMaxTotalClientNum);
+ KAFKA_CONSUMER_POOL.put(clusterDO.getId(), new GenericObjectPool<>(new KafkaConsumerFactory(clusterDO), config));
} catch (Exception e) {
LOGGER.error("create kafka consumer pool failed, clusterDO:{}.", clusterDO, e);
} finally {
@@ -106,7 +121,7 @@ public class KafkaClientPool {
public static void closeKafkaConsumerPool(Long clusterId) {
lock.lock();
try {
- GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
+ GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.remove(clusterId);
if (objectPool == null) {
return;
}
@@ -118,11 +133,11 @@ public class KafkaClientPool {
}
}
- public static KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
+ public KafkaConsumer borrowKafkaConsumerClient(ClusterDO clusterDO) {
if (ValidateUtils.isNull(clusterDO)) {
return null;
}
- GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
+ GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
if (ValidateUtils.isNull(objectPool)) {
initKafkaConsumerPool(clusterDO);
objectPool = KAFKA_CONSUMER_POOL.get(clusterDO.getId());
@@ -132,18 +147,18 @@ public class KafkaClientPool {
}
try {
- return objectPool.borrowObject(3000);
+ return objectPool.borrowObject(kafkaConsumerBorrowTimeoutUnitMs);
} catch (Exception e) {
LOGGER.error("borrow kafka consumer client failed, clusterDO:{}.", clusterDO, e);
}
return null;
}
- public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) {
+ public static void returnKafkaConsumerClient(Long physicalClusterId, KafkaConsumer kafkaConsumer) {
if (ValidateUtils.isNull(physicalClusterId) || ValidateUtils.isNull(kafkaConsumer)) {
return;
}
- GenericObjectPool objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
+ GenericObjectPool> objectPool = KAFKA_CONSUMER_POOL.get(physicalClusterId);
if (ValidateUtils.isNull(objectPool)) {
return;
}
@@ -155,7 +170,7 @@ public class KafkaClientPool {
}
public static AdminClient getAdminClient(Long clusterId) {
- AdminClient adminClient = AdminClientMap.get(clusterId);
+ AdminClient adminClient = ADMIN_CLIENT_MAP.get(clusterId);
if (adminClient != null) {
return adminClient;
}
@@ -166,26 +181,26 @@ public class KafkaClientPool {
Properties properties = createProperties(clusterDO, false);
lock.lock();
try {
- adminClient = AdminClientMap.get(clusterId);
+ adminClient = ADMIN_CLIENT_MAP.get(clusterId);
if (adminClient != null) {
return adminClient;
}
- AdminClientMap.put(clusterId, AdminClient.create(properties));
+ ADMIN_CLIENT_MAP.put(clusterId, AdminClient.create(properties));
} catch (Exception e) {
LOGGER.error("create kafka admin client failed, clusterId:{}.", clusterId, e);
} finally {
lock.unlock();
}
- return AdminClientMap.get(clusterId);
+ return ADMIN_CLIENT_MAP.get(clusterId);
}
public static void closeAdminClient(ClusterDO cluster) {
- if (AdminClientMap.containsKey(cluster.getId())) {
- AdminClientMap.get(cluster.getId()).close();
+ if (ADMIN_CLIENT_MAP.containsKey(cluster.getId())) {
+ ADMIN_CLIENT_MAP.get(cluster.getId()).close();
}
}
- public static Properties createProperties(ClusterDO clusterDO, Boolean serialize) {
+ public static Properties createProperties(ClusterDO clusterDO, boolean serialize) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterDO.getBootstrapServers());
if (serialize) {
@@ -198,8 +213,7 @@ public class KafkaClientPool {
if (ValidateUtils.isBlank(clusterDO.getSecurityProperties())) {
return properties;
}
- Properties securityProperties = JSONObject.parseObject(clusterDO.getSecurityProperties(), Properties.class);
- properties.putAll(securityProperties);
+ properties.putAll(JsonUtils.stringToObj(clusterDO.getSecurityProperties(), Properties.class));
return properties;
}
}
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java
index 011bc1e6..7ba1e304 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/KafkaMetricsCache.java
@@ -14,7 +14,10 @@ public class KafkaMetricsCache {
/**
*
*/
- private static Map> TopicMetricsMap = new ConcurrentHashMap<>();
+ private static final Map> TOPIC_METRICS_MAP = new ConcurrentHashMap<>();
+
+ private KafkaMetricsCache() {
+ }
public static void putTopicMetricsToCache(Long clusterId, List dataList) {
if (clusterId == null || dataList == null) {
@@ -24,22 +27,22 @@ public class KafkaMetricsCache {
for (TopicMetrics topicMetrics : dataList) {
subMetricsMap.put(topicMetrics.getTopicName(), topicMetrics);
}
- TopicMetricsMap.put(clusterId, subMetricsMap);
+ TOPIC_METRICS_MAP.put(clusterId, subMetricsMap);
}
public static Map getTopicMetricsFromCache(Long clusterId) {
- return TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
+ return TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
}
public static Map> getAllTopicMetricsFromCache() {
- return TopicMetricsMap;
+ return TOPIC_METRICS_MAP;
}
public static TopicMetrics getTopicMetricsFromCache(Long clusterId, String topicName) {
if (clusterId == null || topicName == null) {
return null;
}
- Map subMap = TopicMetricsMap.getOrDefault(clusterId, Collections.emptyMap());
+ Map subMap = TOPIC_METRICS_MAP.getOrDefault(clusterId, Collections.emptyMap());
return subMap.get(topicName);
}
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java
index 5cd81581..d58efc9a 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/LogicalClusterMetadataManager.java
@@ -160,7 +160,7 @@ public class LogicalClusterMetadataManager {
public void flush() {
List logicalClusterDOList = logicalClusterService.listAll();
if (ValidateUtils.isNull(logicalClusterDOList)) {
- logicalClusterDOList = Collections.EMPTY_LIST;
+ logicalClusterDOList = Collections.emptyList();
}
Set inDbLogicalClusterIds = logicalClusterDOList.stream()
.map(LogicalClusterDO::getId)
@@ -208,7 +208,8 @@ public class LogicalClusterMetadataManager {
// 计算逻辑集群到Topic名称的映射
Set topicNameSet = PhysicalClusterMetadataManager.getBrokerTopicNum(
logicalClusterDO.getClusterId(),
- brokerIdSet);
+ brokerIdSet
+ );
LOGICAL_CLUSTER_ID_TOPIC_NAME_MAP.put(logicalClusterDO.getId(), topicNameSet);
// 计算Topic名称到逻辑集群的映射
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
index a7142fa9..47ab8b64 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Service
public class PhysicalClusterMetadataManager {
- private final static Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalClusterMetadataManager.class);
@Autowired
private ControllerDao controllerDao;
@@ -50,22 +50,25 @@ public class PhysicalClusterMetadataManager {
@Autowired
private ClusterService clusterService;
- private final static Map CLUSTER_MAP = new ConcurrentHashMap<>();
+ @Autowired
+ private ThreadPool threadPool;
- private final static Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
+ private static final Map CLUSTER_MAP = new ConcurrentHashMap<>();
- private final static Map ZK_CONFIG_MAP = new ConcurrentHashMap<>();
+ private static final Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>();
- private final static Map> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
+ private static final Map ZK_CONFIG_MAP = new ConcurrentHashMap<>();
- private final static Map> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
+ private static final Map> TOPIC_METADATA_MAP = new ConcurrentHashMap<>();
- private final static Map> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
+ private static final Map> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>();
+
+ private static final Map> BROKER_METADATA_MAP = new ConcurrentHashMap<>();
/**
* JXM连接, 延迟连接
*/
- private final static Map> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
+ private static final Map> JMX_CONNECTOR_MAP = new ConcurrentHashMap<>();
/**
* KafkaBroker版本, 延迟获取
@@ -125,7 +128,7 @@ public class PhysicalClusterMetadataManager {
zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener);
//增加Topic监控
- TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig);
+ TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig, threadPool);
topicListener.init();
zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener);
@@ -314,7 +317,7 @@ public class PhysicalClusterMetadataManager {
metadataMap.put(brokerId, brokerMetadata);
Map jmxMap = JMX_CONNECTOR_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
- jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig));
+ jmxMap.put(brokerId, new JmxConnectorWrap(clusterId, brokerId, brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxConfig));
JMX_CONNECTOR_MAP.put(clusterId, jmxMap);
Map versionMap = KAFKA_VERSION_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>());
@@ -398,7 +401,7 @@ public class PhysicalClusterMetadataManager {
KafkaBrokerRoleEnum roleEnum) {
BrokerMetadata brokerMetadata =
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
- if (ValidateUtils.isNull(brokerMetadata)) {
+ if (brokerMetadata == null) {
return;
}
String hostname = brokerMetadata.getHost().replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, "");
@@ -438,7 +441,7 @@ public class PhysicalClusterMetadataManager {
KafkaBrokerRoleEnum roleEnum) {
BrokerMetadata brokerMetadata =
PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
- if (ValidateUtils.isNull(brokerMetadata)) {
+ if (brokerMetadata == null) {
return;
}
@@ -539,9 +542,12 @@ public class PhysicalClusterMetadataManager {
}
public static Set getBrokerTopicNum(Long clusterId, Set brokerIdSet) {
- Set topicNameSet = new HashSet<>();
-
Map metadataMap = TOPIC_METADATA_MAP.get(clusterId);
+ if (metadataMap == null) {
+ return new HashSet<>();
+ }
+
+ Set topicNameSet = new HashSet<>();
for (String topicName: metadataMap.keySet()) {
try {
TopicMetadata tm = metadataMap.get(topicName);
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java
index f1b685cb..ba870465 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/ThreadPool.java
@@ -1,37 +1,63 @@
package com.xiaojukeji.kafka.manager.service.cache;
import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
-import java.util.concurrent.*;
+import javax.annotation.PostConstruct;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* @author zengqiao
* @date 20/8/24
*/
+@Service
public class ThreadPool {
- private static final ExecutorService COLLECT_METRICS_THREAD_POOL = new ThreadPoolExecutor(
- 256,
- 256,
- 120L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue(),
- new DefaultThreadFactory("Collect-Metrics-Thread")
- );
- private static final ExecutorService API_CALL_THREAD_POOL = new ThreadPoolExecutor(
- 16,
- 16,
- 120L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue(),
- new DefaultThreadFactory("Api-Call-Thread")
- );
+ @Value(value = "${thread-pool.collect-metrics.thread-num:256}")
+ private Integer collectMetricsThreadNum;
- public static void submitCollectMetricsTask(Runnable collectMetricsTask) {
- COLLECT_METRICS_THREAD_POOL.submit(collectMetricsTask);
+ @Value(value = "${thread-pool.collect-metrics.queue-size:10000}")
+ private Integer collectMetricsQueueSize;
+
+ @Value(value = "${thread-pool.api-call.thread-num:16}")
+ private Integer apiCallThreadNum;
+
+ @Value(value = "${thread-pool.api-call.queue-size:10000}")
+ private Integer apiCallQueueSize;
+
+ private ThreadPoolExecutor collectMetricsThreadPool;
+
+ private ThreadPoolExecutor apiCallThreadPool;
+
+ @PostConstruct
+ public void init() {
+ collectMetricsThreadPool = new ThreadPoolExecutor(
+ collectMetricsThreadNum,
+ collectMetricsThreadNum,
+ 120L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(collectMetricsQueueSize),
+ new DefaultThreadFactory("TaskThreadPool")
+ );
+
+ apiCallThreadPool = new ThreadPoolExecutor(
+ apiCallThreadNum,
+ apiCallThreadNum,
+ 120L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(apiCallQueueSize),
+ new DefaultThreadFactory("ApiThreadPool")
+ );
}
- public static void submitApiCallTask(Runnable apiCallTask) {
- API_CALL_THREAD_POOL.submit(apiCallTask);
+ public void submitCollectMetricsTask(Long clusterId, Runnable collectMetricsTask) {
+ collectMetricsThreadPool.submit(collectMetricsTask);
+ }
+
+ public void submitApiCallTask(Long clusterId, Runnable apiCallTask) {
+ apiCallThreadPool.submit(apiCallTask);
}
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java
index 273b62c6..9ab963aa 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java
@@ -13,4 +13,12 @@ public interface TopicExpiredService {
List getExpiredTopicDataList(String username);
ResultStatus retainExpiredTopic(Long physicalClusterId, String topicName, Integer retainDays);
+
+ /**
+ * 通过topictopic名称删除
+ * @param clusterId 集群id
+ * @param topicName topic名称
+ * @return int
+ */
+ int deleteByTopicName(Long clusterId, String topicName);
}
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java
index 0ceb3b30..754a81a7 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java
@@ -185,7 +185,8 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
List gatewayConfigDOList = gatewayConfigDao.getByConfigType(gatewayConfigDO.getType());
Long version = 1L;
for (GatewayConfigDO elem: gatewayConfigDOList) {
- if (elem.getVersion() > version) {
+ if (elem.getVersion() >= version) {
+ // 大于等于的情况下,都需要+1
version = elem.getVersion() + 1L;
}
}
@@ -204,6 +205,7 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
@Override
public Result deleteById(Long id) {
try {
+ // TODO 删除的时候,不能直接删,也需要变更一下version
if (gatewayConfigDao.deleteById(id) > 0) {
return Result.buildSuc();
}
@@ -232,7 +234,8 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
List gatewayConfigDOList = gatewayConfigDao.getByConfigType(newGatewayConfigDO.getType());
Long version = 1L;
for (GatewayConfigDO elem: gatewayConfigDOList) {
- if (elem.getVersion() > version) {
+ if (elem.getVersion() >= version) {
+ // 大于等于的情况下,都需要+1
version = elem.getVersion() + 1L;
}
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java
index 8a0028c7..594f1aa1 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java
@@ -43,6 +43,9 @@ public class AdminServiceImpl implements AdminService {
@Autowired
private TopicManagerService topicManagerService;
+ @Autowired
+ private TopicExpiredService topicExpiredService;
+
@Autowired
private TopicService topicService;
@@ -143,6 +146,7 @@ public class AdminServiceImpl implements AdminService {
// 3. 数据库中删除topic
topicManagerService.deleteByTopicName(clusterDO.getId(), topicName);
+ topicExpiredService.deleteByTopicName(clusterDO.getId(), topicName);
// 4. 数据库中删除authority
authorityService.deleteAuthorityByTopic(clusterDO.getId(), topicName);
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java
index 24eea55f..ac3e0593 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/BrokerServiceImpl.java
@@ -61,6 +61,9 @@ public class BrokerServiceImpl implements BrokerService {
@Autowired
private PhysicalClusterMetadataManager physicalClusterMetadataManager;
+ @Autowired
+ private ThreadPool threadPool;
+
@Override
public ClusterBrokerStatus getClusterBrokerStatus(Long clusterId) {
// 副本同步状态
@@ -201,7 +204,7 @@ public class BrokerServiceImpl implements BrokerService {
return getBrokerMetricsFromJmx(clusterId, brokerId, metricsCode);
}
});
- ThreadPool.submitApiCallTask(taskList[i]);
+ threadPool.submitApiCallTask(clusterId, taskList[i]);
}
List metricsList = new ArrayList<>(brokerIdSet.size());
for (int i = 0; i < brokerIdList.size(); i++) {
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java
index ea9d22da..153576c4 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java
@@ -19,6 +19,8 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,7 +212,7 @@ public class ClusterServiceImpl implements ClusterService {
ZooKeeper zk = null;
try {
- zk = new ZooKeeper(zookeeper, 1000, null);
+ zk = new ZooKeeper(zookeeper, 1000, watchedEvent -> LOGGER.info(" receive event : " + watchedEvent.getType().name()));
for (int i = 0; i < 15; ++i) {
if (zk.getState().isConnected()) {
// 只有状态是connected的时候,才表示地址是合法的
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ExpertServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ExpertServiceImpl.java
index d0b34e3d..94f00d2c 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ExpertServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ExpertServiceImpl.java
@@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
+import java.util.regex.Pattern;
/**
* @author zengqiao
@@ -240,9 +241,11 @@ public class ExpertServiceImpl implements ExpertService {
return new ArrayList<>();
}
+ //获取满足条件的过期Topic
List filteredExpiredTopicList = new ArrayList<>();
for (TopicExpiredDO elem: expiredTopicList) {
- if (config.getIgnoreClusterIdList().contains(elem.getClusterId())) {
+ //判定是否为忽略Cluster或者判定是否为忽略Topic名,使用正则来过滤理论上不属于过期的Topic
+ if (config.getIgnoreClusterIdList().contains(elem.getClusterId()) || Pattern.matches(config.getFilterRegex(), elem.getTopicName())) {
continue;
}
filteredExpiredTopicList.add(elem);
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java
index 611dc203..d0f0c514 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/JmxServiceImpl.java
@@ -39,6 +39,9 @@ public class JmxServiceImpl implements JmxService {
@Autowired
private PhysicalClusterMetadataManager physicalClusterMetadataManager;
+ @Autowired
+ private ThreadPool threadPool;
+
@Override
public BrokerMetrics getBrokerMetrics(Long clusterId, Integer brokerId, Integer metricsCode) {
if (clusterId == null || brokerId == null || metricsCode == null) {
@@ -98,7 +101,7 @@ public class JmxServiceImpl implements JmxService {
);
}
});
- ThreadPool.submitCollectMetricsTask(taskList[i]);
+ threadPool.submitCollectMetricsTask(clusterId, taskList[i]);
}
List metricsList = new ArrayList<>();
@@ -303,7 +306,7 @@ public class JmxServiceImpl implements JmxService {
return metricsList;
}
});
- ThreadPool.submitCollectMetricsTask(taskList[i]);
+ threadPool.submitCollectMetricsTask(clusterId, taskList[i]);
}
Map metricsMap = new HashMap<>();
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java
index 8f957b02..3b2df843 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/RegionServiceImpl.java
@@ -2,6 +2,8 @@ package com.xiaojukeji.kafka.manager.service.service.impl;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
+import com.xiaojukeji.kafka.manager.common.events.RegionCreatedEvent;
+import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
import com.xiaojukeji.kafka.manager.dao.RegionDao;
@@ -59,6 +61,8 @@ public class RegionServiceImpl implements RegionService {
return ResultStatus.BROKER_NOT_EXIST;
}
if (regionDao.insert(regionDO) > 0) {
+ // 发布region创建事件
+ SpringTool.publish(new RegionCreatedEvent(this, regionDO));
return ResultStatus.SUCCESS;
}
} catch (DuplicateKeyException e) {
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java
index c51e1dcb..d310af1a 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java
@@ -75,4 +75,14 @@ public class TopicExpiredServiceImpl implements TopicExpiredService {
}
return ResultStatus.MYSQL_ERROR;
}
+
+ @Override
+ public int deleteByTopicName(Long clusterId, String topicName) {
+ try {
+ return topicExpiredDao.deleteByName(clusterId, topicName);
+ } catch (Exception e) {
+ LOGGER.error("delete topic failed, clusterId:{} topicName:{}", clusterId, topicName, e);
+ }
+ return 0;
+ }
}
\ No newline at end of file
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java
index 4a8f501f..a25115ef 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java
@@ -210,7 +210,7 @@ public class TopicManagerServiceImpl implements TopicManagerService {
}
}
- // 增加流量信息
+ // 增加流量和描述信息
Map> metricMap = KafkaMetricsCache.getAllTopicMetricsFromCache();
for (MineTopicSummary mineTopicSummary : summaryList) {
TopicMetrics topicMetrics = getTopicMetricsFromCacheOrJmx(
@@ -219,6 +219,10 @@ public class TopicManagerServiceImpl implements TopicManagerService {
metricMap);
mineTopicSummary.setBytesIn(topicMetrics.getSpecifiedMetrics("BytesInPerSecOneMinuteRate"));
mineTopicSummary.setBytesOut(topicMetrics.getSpecifiedMetrics("BytesOutPerSecOneMinuteRate"));
+
+ // 增加topic描述信息
+ TopicDO topicDO = topicDao.getByTopicName(mineTopicSummary.getPhysicalClusterId(), mineTopicSummary.getTopicName());
+ mineTopicSummary.setDescription(topicDO.getDescription());
}
return summaryList;
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
index 154faf77..aa4fe3fb 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java
@@ -87,6 +87,9 @@ public class TopicServiceImpl implements TopicService {
@Autowired
private AbstractHealthScoreStrategy healthScoreStrategy;
+ @Autowired
+ private KafkaClientPool kafkaClientPool;
+
@Override
public List getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) {
try {
@@ -340,7 +343,7 @@ public class TopicServiceImpl implements TopicService {
Map topicPartitionLongMap = new HashMap<>();
KafkaConsumer kafkaConsumer = null;
try {
- kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO);
+ kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO);
if ((offsetPosEnum.getCode() & OffsetPosEnum.END.getCode()) > 0) {
topicPartitionLongMap = kafkaConsumer.endOffsets(topicPartitionList);
} else if ((offsetPosEnum.getCode() & OffsetPosEnum.BEGINNING.getCode()) > 0) {
@@ -541,7 +544,7 @@ public class TopicServiceImpl implements TopicService {
List partitionOffsetDTOList = new ArrayList<>();
try {
- kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO);
+ kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO);
Map offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
if (offsetAndTimestampMap == null) {
return new ArrayList<>();
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java
index d75dec5a..51295644 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/strategy/healthscore/DidiHealthScoreStrategy.java
@@ -45,6 +45,9 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy {
@Autowired
private JmxService jmxService;
+ @Autowired
+ private ThreadPool threadPool;
+
@Override
public Integer calBrokerHealthScore(Long clusterId, Integer brokerId) {
BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
@@ -125,7 +128,7 @@ public class DidiHealthScoreStrategy extends AbstractHealthScoreStrategy {
return calBrokerHealthScore(clusterId, brokerId);
}
});
- ThreadPool.submitApiCallTask(taskList[i]);
+ threadPool.submitApiCallTask(clusterId, taskList[i]);
}
Integer topicHealthScore = HEALTH_SCORE_HEALTHY;
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java
index 5df85b5e..40b73868 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java
@@ -1,5 +1,6 @@
package com.xiaojukeji.kafka.manager.service.utils;
+import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@@ -8,38 +9,18 @@ import org.springframework.stereotype.Service;
* @author zengqiao
* @date 20/4/26
*/
+@Data
@Service("configUtils")
public class ConfigUtils {
- @Value(value = "${custom.idc}")
+ private ConfigUtils() {
+ }
+
+ @Value(value = "${custom.idc:cn}")
private String idc;
- @Value(value = "${spring.profiles.active}")
+ @Value(value = "${spring.profiles.active:dev}")
private String kafkaManagerEnv;
- @Value(value = "${custom.store-metrics-task.save-days}")
- private Long maxMetricsSaveDays;
-
- public String getIdc() {
- return idc;
- }
-
- public void setIdc(String idc) {
- this.idc = idc;
- }
-
- public String getKafkaManagerEnv() {
- return kafkaManagerEnv;
- }
-
- public void setKafkaManagerEnv(String kafkaManagerEnv) {
- this.kafkaManagerEnv = kafkaManagerEnv;
- }
-
- public Long getMaxMetricsSaveDays() {
- return maxMetricsSaveDays;
- }
-
- public void setMaxMetricsSaveDays(Long maxMetricsSaveDays) {
- this.maxMetricsSaveDays = maxMetricsSaveDays;
- }
+ @Value(value = "${spring.application.version:unknown}")
+ private String applicationVersion;
}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java
index a94ec9de..f5cdefe8 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java
@@ -74,15 +74,10 @@ public class BrokerStateListener implements StateChangeListener {
BrokerMetadata brokerMetadata = null;
try {
brokerMetadata = zkConfig.get(ZkPathUtil.getBrokerIdNodePath(brokerId), BrokerMetadata.class);
- if (!brokerMetadata.getEndpoints().isEmpty()) {
- String endpoint = brokerMetadata.getEndpoints().get(0);
- int idx = endpoint.indexOf("://");
- endpoint = endpoint.substring(idx + "://".length());
- idx = endpoint.indexOf(":");
- brokerMetadata.setHost(endpoint.substring(0, idx));
- brokerMetadata.setPort(Integer.parseInt(endpoint.substring(idx + 1)));
- }
+ // 解析并更新本次存储的broker元信息
+ BrokerMetadata.parseAndUpdateBrokerMetadata(brokerMetadata);
+
brokerMetadata.setClusterId(clusterId);
brokerMetadata.setBrokerId(brokerId);
PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxConfig);
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java
index 3f43f57b..c417df66 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java
@@ -19,13 +19,13 @@ import org.springframework.dao.DuplicateKeyException;
* @date 20/5/14
*/
public class ControllerStateListener implements StateChangeListener {
- private final static Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class);
- private Long clusterId;
+ private final Long clusterId;
- private ZkConfigImpl zkConfig;
+ private final ZkConfigImpl zkConfig;
- private ControllerDao controllerDao;
+ private final ControllerDao controllerDao;
public ControllerStateListener(Long clusterId, ZkConfigImpl zkConfig, ControllerDao controllerDao) {
this.clusterId = clusterId;
@@ -35,8 +35,11 @@ public class ControllerStateListener implements StateChangeListener {
@Override
public void init() {
+ if (!checkNodeExist()) {
+ LOGGER.warn("kafka-controller data not exist, clusterId:{}.", clusterId);
+ return;
+ }
processControllerChange();
- return;
}
@Override
@@ -49,12 +52,21 @@ public class ControllerStateListener implements StateChangeListener {
break;
}
} catch (Exception e) {
- LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.",
- clusterId, state, path, e);
+ LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.", clusterId, state, path, e);
}
}
- private void processControllerChange(){
+ private boolean checkNodeExist() {
+ try {
+ return zkConfig.checkPathExists(ZkPathUtil.CONTROLLER_ROOT_NODE);
+ } catch (Exception e) {
+ LOGGER.error("init kafka-controller data failed, clusterId:{}.", clusterId, e);
+ }
+
+ return false;
+ }
+
+ private void processControllerChange() {
LOGGER.warn("init controllerData or controller change, clusterId:{}.", clusterId);
ControllerData controllerData = null;
try {
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java
index 4314a101..6f3d33b3 100644
--- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/TopicStateListener.java
@@ -10,6 +10,7 @@ import com.xiaojukeji.kafka.manager.service.cache.ThreadPool;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashSet;
import java.util.List;
@@ -28,9 +29,12 @@ public class TopicStateListener implements StateChangeListener {
private ZkConfigImpl zkConfig;
- public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig) {
+ private ThreadPool threadPool;
+
+ public TopicStateListener(Long clusterId, ZkConfigImpl zkConfig, ThreadPool threadPool) {
this.clusterId = clusterId;
this.zkConfig = zkConfig;
+ this.threadPool = threadPool;
}
@Override
@@ -47,7 +51,7 @@ public class TopicStateListener implements StateChangeListener {
return null;
}
});
- ThreadPool.submitCollectMetricsTask(taskList[i]);
+ threadPool.submitCollectMetricsTask(clusterId, taskList[i]);
}
} catch (Exception e) {
LOGGER.error("init topics metadata failed, clusterId:{}.", clusterId, e);
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/BrokerMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/BrokerMetricsDao.java
index 75399538..9f1d36eb 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/BrokerMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/BrokerMetricsDao.java
@@ -20,5 +20,5 @@ public interface BrokerMetricsDao {
*/
List getBrokerMetrics(Long clusterId, Integer brokerId, Date startTime, Date endTime);
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/ClusterMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/ClusterMetricsDao.java
index d0731508..0e2e68a7 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/ClusterMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/ClusterMetricsDao.java
@@ -10,5 +10,5 @@ public interface ClusterMetricsDao {
List getClusterMetrics(long clusterId, Date startTime, Date endTime);
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicAppMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicAppMetricsDao.java
index 9d02c5d5..e0c3f84e 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicAppMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicAppMetricsDao.java
@@ -30,5 +30,5 @@ public interface TopicAppMetricsDao {
* @param endTime
* @return
*/
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java
index 18698941..ea189eb4 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java
@@ -17,4 +17,6 @@ public interface TopicExpiredDao {
int replace(TopicExpiredDO expiredDO);
TopicExpiredDO getByTopic(Long clusterId, String topicName);
+
+ int deleteByName(Long clusterId, String topicName);
}
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicMetricsDao.java
index 58029f36..5d7af6e0 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicMetricsDao.java
@@ -22,5 +22,5 @@ public interface TopicMetricsDao {
List getLatestTopicMetrics(Long clusterId, Date afterTime);
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicRequestMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicRequestMetricsDao.java
index e7fd5169..5e6b237d 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicRequestMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicRequestMetricsDao.java
@@ -33,9 +33,7 @@ public interface TopicRequestMetricsDao {
* @param endTime
* @return
*/
- int deleteBeforeTime(Date endTime);
-
- int deleteBeforeId(Long id);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
List getById(Long startId, Long endId);
}
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java
index 1010cc17..cc975c52 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicThrottledMetricsDao.java
@@ -32,5 +32,5 @@ public interface TopicThrottledMetricsDao {
List getLatestTopicThrottledMetrics(Long clusterId, Date afterTime);
- int deleteBeforeTime(Date endTime);
+ int deleteBeforeTime(Date endTime, Integer limitSize);
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerMetricsImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerMetricsImpl.java
index 5a06e5ce..bba58185 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerMetricsImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/BrokerMetricsImpl.java
@@ -37,7 +37,10 @@ public class BrokerMetricsImpl implements BrokerMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("BrokerMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("BrokerMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/ClusterMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/ClusterMetricsDaoImpl.java
index b05d3c0f..08948871 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/ClusterMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/ClusterMetricsDaoImpl.java
@@ -27,7 +27,7 @@ public class ClusterMetricsDaoImpl implements ClusterMetricsDao {
@Override
public List getClusterMetrics(long clusterId, Date startTime, Date endTime) {
- Map map = new HashMap(3);
+ Map map = new HashMap<>(3);
map.put("clusterId", clusterId);
map.put("startTime", startTime);
map.put("endTime", endTime);
@@ -35,7 +35,10 @@ public class ClusterMetricsDaoImpl implements ClusterMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("ClusterMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("ClusterMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicAppMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicAppMetricsDaoImpl.java
index fe55a1ab..90ce7e3e 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicAppMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicAppMetricsDaoImpl.java
@@ -46,7 +46,10 @@ public class TopicAppMetricsDaoImpl implements TopicAppMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("TopicAppMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("TopicAppMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java
index 51853db7..936d4931 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java
@@ -50,4 +50,12 @@ public class TopicExpiredDaoImpl implements TopicExpiredDao {
params.put("topicName", topicName);
return sqlSession.selectOne("TopicExpiredDao.getByTopic", params);
}
+
+ @Override
+ public int deleteByName(Long clusterId, String topicName) {
+ Map params = new HashMap<>(2);
+ params.put("clusterId", clusterId);
+ params.put("topicName", topicName);
+ return sqlSession.delete("TopicExpiredDao.deleteByName", params);
+ }
}
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicMetricsDaoImpl.java
index 7397a28c..a7eae32c 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicMetricsDaoImpl.java
@@ -60,7 +60,10 @@ public class TopicMetricsDaoImpl implements TopicMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("TopicMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("TopicMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicRequestMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicRequestMetricsDaoImpl.java
index bfaa552c..e59324f5 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicRequestMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicRequestMetricsDaoImpl.java
@@ -45,13 +45,11 @@ public class TopicRequestMetricsDaoImpl implements TopicRequestMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("TopicRequestMetricsDao.deleteBeforeTime", endTime);
- }
-
- @Override
- public int deleteBeforeId(Long id) {
- return sqlSession.delete("TopicRequestMetricsDao.deleteBeforeId", id);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>();
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("TopicRequestMetricsDao.deleteBeforeTime", params);
}
@Override
diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java
index 784bc242..b1f64d43 100644
--- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java
+++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicThrottledMetricsDaoImpl.java
@@ -75,7 +75,10 @@ public class TopicThrottledMetricsDaoImpl implements TopicThrottledMetricsDao {
}
@Override
- public int deleteBeforeTime(Date endTime) {
- return sqlSession.delete("TopicThrottledMetricsDao.deleteBeforeTime", endTime);
+ public int deleteBeforeTime(Date endTime, Integer limitSize) {
+ Map params = new HashMap<>(2);
+ params.put("endTime", endTime);
+ params.put("limitSize", limitSize);
+ return sqlSession.delete("TopicThrottledMetricsDao.deleteBeforeTime", params);
}
}
diff --git a/kafka-manager-dao/src/main/resources/mapper/BrokerMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/BrokerMetricsDao.xml
index 49746df7..b5115e10 100644
--- a/kafka-manager-dao/src/main/resources/mapper/BrokerMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/BrokerMetricsDao.xml
@@ -29,9 +29,9 @@
]]>
-
+
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/resources/mapper/ClusterMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/ClusterMetricsDao.xml
index 11614d2d..8aca62ee 100644
--- a/kafka-manager-dao/src/main/resources/mapper/ClusterMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/ClusterMetricsDao.xml
@@ -27,9 +27,9 @@
-
+
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicAppMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicAppMetricsDao.xml
index 1c64c0ce..fff5037a 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicAppMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicAppMetricsDao.xml
@@ -30,9 +30,9 @@
]]>
-
+
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml
index 39ebf8ca..1da6753a 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml
@@ -36,4 +36,8 @@
+
+
+ DELETE FROM topic_expired WHERE cluster_id=#{clusterId} AND topic_name=#{topicName}
+
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml
index baa6f4b0..249863f4 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicMetricsDao.xml
@@ -25,6 +25,7 @@
WHERE cluster_id = #{clusterId}
AND topic_name = #{topicName}
AND gmt_create BETWEEN #{startTime} AND #{endTime}
+ ORDER BY gmt_create
]]>
@@ -32,12 +33,13 @@
-
+
\ No newline at end of file
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicRequestMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicRequestMetricsDao.xml
index b9aaa35b..7ad5e679 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicRequestMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicRequestMetricsDao.xml
@@ -34,15 +34,9 @@
ORDER BY gmt_create ASC
-
+
-
-
-
-
diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml
index c5b6474d..e163d30f 100644
--- a/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml
+++ b/kafka-manager-dao/src/main/resources/mapper/TopicThrottledMetricsDao.xml
@@ -54,9 +54,9 @@
AND gmt_create > #{afterTime}
-
+
\ No newline at end of file
diff --git a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/sso/BaseSessionSignOn.java b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/sso/BaseSessionSignOn.java
index 3ccbd17c..3683b193 100644
--- a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/sso/BaseSessionSignOn.java
+++ b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/sso/BaseSessionSignOn.java
@@ -32,16 +32,16 @@ public class BaseSessionSignOn extends AbstractSingleSignOn {
private LdapAuthentication ldapAuthentication;
//是否开启ldap验证
- @Value(value = "${account.ldap.enabled:}")
+ @Value(value = "${account.ldap.enabled:false}")
private Boolean accountLdapEnabled;
//ldap自动注册的默认角色。请注意:它通常来说都是低权限角色
- @Value(value = "${account.ldap.auth-user-registration-role:}")
+ @Value(value = "${account.ldap.auth-user-registration-role:normal}")
private String authUserRegistrationRole;
//ldap自动注册是否开启
- @Value(value = "${account.ldap.auth-user-registration:}")
- private boolean authUserRegistration;
+ @Value(value = "${account.ldap.auth-user-registration:false}")
+ private Boolean authUserRegistration;
@Override
public Result loginAndGetLdap(HttpServletRequest request, HttpServletResponse response, LoginDTO dto) {
diff --git a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java
index 8f079fde..f0299d87 100644
--- a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java
+++ b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/impl/LoginServiceImpl.java
@@ -14,6 +14,7 @@ import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.servlet.http.Cookie;
@@ -27,7 +28,13 @@ import javax.servlet.http.HttpSession;
*/
@Service("loginService")
public class LoginServiceImpl implements LoginService {
- private final static Logger LOGGER = LoggerFactory.getLogger(LoginServiceImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoginServiceImpl.class);
+
+ @Value(value = "${account.jump-login.gateway-api:false}")
+ private Boolean jumpLoginGatewayApi;
+
+ @Value(value = "${account.jump-login.third-part-api:false}")
+ private Boolean jumpLoginThirdPartApi;
@Autowired
private AccountService accountService;
@@ -75,12 +82,10 @@ public class LoginServiceImpl implements LoginService {
return false;
}
- if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX)
- || classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_PREFIX)
- || classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX)
- || classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_NORMAL_PREFIX)
- || classRequestMappingValue.equals(ApiPrefix.GATEWAY_API_V1_PREFIX)) {
- // 白名单接口直接true
+ if (classRequestMappingValue.equals(ApiPrefix.API_V1_SSO_PREFIX) ||
+ (jumpLoginGatewayApi != null && jumpLoginGatewayApi && classRequestMappingValue.equals(ApiPrefix.GATEWAY_API_V1_PREFIX)) ||
+ (jumpLoginThirdPartApi != null && jumpLoginThirdPartApi && classRequestMappingValue.equals(ApiPrefix.API_V1_THIRD_PART_PREFIX))) {
+ // 登录接口 or 允许跳过且是跳过类型的接口,则直接跳过登录
return true;
}
diff --git a/kafka-manager-extends/kafka-manager-kcm/pom.xml b/kafka-manager-extends/kafka-manager-kcm/pom.xml
index 7ffd00e3..12a942d5 100644
--- a/kafka-manager-extends/kafka-manager-kcm/pom.xml
+++ b/kafka-manager-extends/kafka-manager-kcm/pom.xml
@@ -28,7 +28,6 @@
1.8
UTF-8
UTF-8
- 5.1.3.RELEASE
@@ -56,17 +55,14 @@
org.springframework
spring-beans
- ${spring-version}
org.springframework
spring-context
- ${spring-version}
org.springframework
spring-test
- ${spring-version}
diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java
index 6e3fa677..d0a2503b 100644
--- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java
+++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java
@@ -37,21 +37,24 @@ import java.util.Map;
public class N9e extends AbstractAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(N9e.class);
- @Value("${kcm.n9e.base-url}")
+ @Value("${kcm.n9e.base-url:}")
private String baseUrl;
- @Value("${kcm.n9e.user-token}")
+ @Value("${kcm.n9e.user-token:12345678}")
private String userToken;
- @Value("${kcm.n9e.account}")
+ @Value("${kcm.n9e.account:root}")
private String account;
- @Value("${kcm.n9e.timeout}")
+ @Value("${kcm.n9e.timeout:300}")
private Integer timeout;
- @Value("${kcm.n9e.script-file}")
+ @Value("${kcm.n9e.script-file:kcm_script.sh}")
private String scriptFile;
+ @Value("${kcm.n9e.logikm-url:}")
+ private String logiKMUrl;
+
private String script;
private static final String CREATE_TASK_URI = "/api/job-ce/tasks";
@@ -219,7 +222,8 @@ public class N9e extends AbstractAgent {
sb.append(creationTaskData.getKafkaPackageUrl()).append(",,");
sb.append(creationTaskData.getServerPropertiesName().replace(KafkaFileEnum.SERVER_CONFIG.getSuffix(), "")).append(",,");
sb.append(creationTaskData.getServerPropertiesMd5()).append(",,");
- sb.append(creationTaskData.getServerPropertiesUrl());
+ sb.append(creationTaskData.getServerPropertiesUrl()).append(",,");
+ sb.append(this.logiKMUrl);
N9eCreationTask n9eCreationTask = new N9eCreationTask();
n9eCreationTask.setTitle(Constant.TASK_TITLE_PREFIX + "-集群ID:" + creationTaskData.getClusterId());
diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh b/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh
index ffd54a20..16ffb80c 100644
--- a/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh
+++ b/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh
@@ -18,12 +18,13 @@ p_kafka_server_properties_name=${7} #server配置名
p_kafka_server_properties_md5=${8} #server配置MD5
p_kafka_server_properties_url=${9} #server配置文件下载地址
+p_kafka_manager_url=${10} #LogiKM地址
+
#----------------------------------------配置信息------------------------------------------------------#
g_base_dir='/home'
g_cluster_task_dir=${g_base_dir}"/kafka_cluster_task/task_${p_task_id}" #部署升级路径
g_rollback_version=${g_cluster_task_dir}"/rollback_version" #回滚版本
g_new_kafka_package_name='' #最终的包名
-g_kafka_manager_addr='' #kafka-manager地址
g_local_ip=`ifconfig -a|grep inet|grep -v 127.0.0.1|grep -v inet6|awk '{print $2}'|tr -d "addr:"`
g_hostname=${g_local_ip}
@@ -47,7 +48,7 @@ function dchat_alarm() {
# 检查并初始化环境
function check_and_init_env() {
- if [ -z "${p_task_id}" -o -z "${p_cluster_task_type}" -o -z "${p_kafka_package_url}" -o -z "${p_cluster_id}" -o -z "${p_kafka_package_name}" -o -z "${p_kafka_package_md5}" -o -z "${p_kafka_server_properties_name}" -o -z "${p_kafka_server_properties_md5}" ]; then
+ if [ -z "${p_task_id}" -o -z "${p_cluster_task_type}" -o -z "${p_kafka_package_url}" -o -z "${p_cluster_id}" -o -z "${p_kafka_package_name}" -o -z "${p_kafka_package_md5}" -o -z "${p_kafka_server_properties_name}" -o -z "${p_kafka_server_properties_md5}" -o -z "${p_kafka_manager_url}" ]; then
ECHO_LOG "存在为空的参数不合法, 退出集群任务"
dchat_alarm "存在为空的参数不合法, 退出集群任务"
exit 1
@@ -72,11 +73,11 @@ function check_and_init_env() {
# 检查并等待集群所有的副本处于同步的状态
function check_and_wait_broker_stabled() {
- under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${g_kafka_manager_addr}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l`
+ under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${p_kafka_manager_url}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l`
while [ "$under_replication_count" -ne 1 ]; do
ECHO_LOG "存在${under_replication_count}个副本未同步, sleep 10s"
sleep 10
- under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${g_kafka_manager_addr}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l`
+ under_replication_count=`curl -s -G -d "hostname="${g_hostname} ${p_kafka_manager_url}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l`
done
ECHO_LOG "集群副本都已经处于同步的状态, 可以进行集群升级"
}
@@ -324,6 +325,7 @@ ECHO_LOG " p_kafka_package_name=${p_kafka_package_name}"
ECHO_LOG " p_kafka_package_md5=${p_kafka_package_md5}"
ECHO_LOG " p_kafka_server_properties_name=${p_kafka_server_properties_name}"
ECHO_LOG " p_kafka_server_properties_md5=${p_kafka_server_properties_md5}"
+ECHO_LOG " p_kafka_manager_url=${p_kafka_manager_url}"
@@ -342,7 +344,7 @@ fi
ECHO_LOG "停kafka服务"
stop_kafka_server
-ECHO_LOG "停5秒, 确保"
+ECHO_LOG "再停5秒, 确保端口已释放"
sleep 5
if [ "${p_cluster_task_type}" == "0" ];then
diff --git a/kafka-manager-extends/kafka-manager-monitor/pom.xml b/kafka-manager-extends/kafka-manager-monitor/pom.xml
index 0948a190..9f04b7c9 100644
--- a/kafka-manager-extends/kafka-manager-monitor/pom.xml
+++ b/kafka-manager-extends/kafka-manager-monitor/pom.xml
@@ -25,7 +25,6 @@
1.8
UTF-8
UTF-8
- 5.1.3.RELEASE
@@ -63,12 +62,10 @@
org.springframework
spring-beans
- ${spring-version}
org.springframework
spring-context
- ${spring-version}
\ No newline at end of file
diff --git a/kafka-manager-extends/kafka-manager-notify/pom.xml b/kafka-manager-extends/kafka-manager-notify/pom.xml
index a2fd2c4b..348164eb 100644
--- a/kafka-manager-extends/kafka-manager-notify/pom.xml
+++ b/kafka-manager-extends/kafka-manager-notify/pom.xml
@@ -25,7 +25,6 @@
1.8
UTF-8
UTF-8
- 5.1.3.RELEASE
@@ -48,7 +47,6 @@
org.springframework
spring-context
- ${spring-version}
\ No newline at end of file
diff --git a/kafka-manager-extends/kafka-manager-openapi/pom.xml b/kafka-manager-extends/kafka-manager-openapi/pom.xml
index caaa1242..cc6f3316 100644
--- a/kafka-manager-extends/kafka-manager-openapi/pom.xml
+++ b/kafka-manager-extends/kafka-manager-openapi/pom.xml
@@ -24,7 +24,6 @@
1.8
UTF-8
UTF-8
- 5.1.3.RELEASE
@@ -46,7 +45,6 @@
org.springframework
spring-context
- ${spring-version}
\ No newline at end of file
diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java
index 5df7815e..07b0a3e3 100644
--- a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java
+++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java
@@ -42,6 +42,9 @@ public class ThirdPartServiceImpl implements ThirdPartService {
@Autowired
private ConsumerService consumerService;
+ @Autowired
+ private KafkaClientPool kafkaClientPool;
+
@Override
public Result checkConsumeHealth(Long clusterId,
String topicName,
@@ -109,7 +112,7 @@ public class ThirdPartServiceImpl implements ThirdPartService {
Long timestamp) {
KafkaConsumer kafkaConsumer = null;
try {
- kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO);
+ kafkaConsumer = kafkaClientPool.borrowKafkaConsumerClient(clusterDO);
if (ValidateUtils.isNull(kafkaConsumer)) {
return null;
}
diff --git a/kafka-manager-task/pom.xml b/kafka-manager-task/pom.xml
index 8927ef8e..dce8d3c8 100644
--- a/kafka-manager-task/pom.xml
+++ b/kafka-manager-task/pom.xml
@@ -24,7 +24,6 @@
1.8
UTF-8
UTF-8
- 5.1.3.RELEASE
@@ -52,7 +51,6 @@
org.springframework
spring-context
- ${spring-version}
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java
index 7eddb926..564094d5 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/AbstractScheduledTask.java
@@ -1,7 +1,6 @@
package com.xiaojukeji.kafka.manager.task.component;
import com.google.common.collect.Lists;
-import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.NetUtils;
@@ -29,7 +28,7 @@ import java.util.concurrent.*;
* @date 20/8/10
*/
public abstract class AbstractScheduledTask implements SchedulingConfigurer {
- private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduledTask.class);
@Autowired
private HeartbeatDao heartbeatDao;
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/BaseBizTask.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/BaseBizTask.java
index 37a36238..b4cfdd47 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/BaseBizTask.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/component/BaseBizTask.java
@@ -1,6 +1,5 @@
package com.xiaojukeji.kafka.manager.task.component;
-import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -9,11 +8,11 @@ import org.slf4j.LoggerFactory;
* @date 20/8/10
*/
public class BaseBizTask implements Runnable {
- private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduledTask.class);
- private E task;
+ private final E task;
- private AbstractScheduledTask scheduledTask;
+ private final AbstractScheduledTask scheduledTask;
public BaseBizTask(E task, AbstractScheduledTask scheduledTask) {
this.task = task;
@@ -30,6 +29,7 @@ public class BaseBizTask implements Runnable {
} catch (Throwable t) {
LOGGER.error("scheduled task scheduleName:{} execute failed, task:{}", scheduledTask.getScheduledName(), task, t);
}
+
LOGGER.info("scheduled task scheduleName:{} finished, cost-time:{}ms.", scheduledTask.getScheduledName(), System.currentTimeMillis() - startTime);
}
}
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishBrokerMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishBrokerMetrics.java
new file mode 100644
index 00000000..47aa60d4
--- /dev/null
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishBrokerMetrics.java
@@ -0,0 +1,93 @@
+package com.xiaojukeji.kafka.manager.task.dispatch.metrics.collect;
+
+import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
+import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
+import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent;
+import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
+import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
+import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
+import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
+import com.xiaojukeji.kafka.manager.service.service.ClusterService;
+import com.xiaojukeji.kafka.manager.service.service.JmxService;
+import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy;
+import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
+import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Broker指标信息收集
+ * @author zengqiao
+ * @date 20/5/7
+ */
+@CustomScheduled(name = "collectAndPublishBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2)
+@ConditionalOnProperty(prefix = "task.metrics.collect", name = "broker-metrics-enabled", havingValue = "true", matchIfMissing = true)
+public class CollectAndPublishBrokerMetrics extends AbstractScheduledTask {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectAndPublishBrokerMetrics.class);
+
+ @Autowired
+ private JmxService jmxService;
+
+ @Autowired
+ private ClusterService clusterService;
+
+ @Autowired
+ private AbstractHealthScoreStrategy healthScoreStrategy;
+
+ @Override
+ protected List listAllTasks() {
+ return clusterService.list();
+ }
+
+ @Override
+ public void processTask(ClusterDO clusterDO) {
+ long startTime = System.currentTimeMillis();
+
+ try {
+ SpringTool.publish(new BatchBrokerMetricsCollectedEvent(
+ this,
+ clusterDO.getId(),
+ startTime,
+ this.getBrokerMetrics(clusterDO.getId()))
+ );
+ } catch (Exception e) {
+ LOGGER.error("collect broker-metrics failed, physicalClusterId:{}.", clusterDO.getId(), e);
+ }
+
+ LOGGER.info("collect broker-metrics finished, physicalClusterId:{} costTime:{}", clusterDO.getId(), System.currentTimeMillis() - startTime);
+ }
+
+ private List getBrokerMetrics(Long clusterId) {
+ List metricsList = new ArrayList<>();
+ for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) {
+ BrokerMetrics metrics = jmxService.getBrokerMetrics(
+ clusterId,
+ brokerId,
+ KafkaMetricsCollections.BROKER_TO_DB_METRICS
+ );
+
+ if (ValidateUtils.isNull(metrics)) {
+ continue;
+ }
+
+ metrics.getMetricsMap().put(
+ JmxConstant.HEALTH_SCORE,
+ healthScoreStrategy.calBrokerHealthScore(clusterId, brokerId, metrics)
+ );
+
+ metricsList.add(metrics);
+ }
+
+ if (ValidateUtils.isEmptyList(metricsList)) {
+ return new ArrayList<>();
+ }
+
+ return metricsList;
+ }
+}
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java
index cc67428f..28bb1612 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCGData.java
@@ -44,6 +44,9 @@ public class CollectAndPublishCGData extends AbstractScheduledTask {
@Autowired
private ConsumerService consumerService;
+ @Autowired
+ private ThreadPool threadPool;
+
@Override
protected List listAllTasks() {
return clusterService.list();
@@ -82,7 +85,7 @@ public class CollectAndPublishCGData extends AbstractScheduledTask {
return getTopicConsumerMetrics(clusterDO, topicName, startTimeUnitMs);
}
});
- ThreadPool.submitCollectMetricsTask(taskList[i]);
+ threadPool.submitCollectMetricsTask(clusterDO.getId(), taskList[i]);
}
List consumerMetricsList = new ArrayList<>();
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java
index 07a137e6..a6757310 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/collect/CollectAndPublishCommunityTopicMetrics.java
@@ -30,16 +30,23 @@ public class CollectAndPublishCommunityTopicMetrics extends AbstractScheduledTas
@Override
protected List listAllTasks() {
+ // 获取需要进行指标采集的集群列表,这些集群将会被拆分到多台KM中进行执行。
return clusterService.list();
}
@Override
public void processTask(ClusterDO clusterDO) {
+ // 这里需要实现对clusterDO这个集群进行Topic指标采集的代码逻辑
+
+ // 进行Topic指标获取
List metricsList = getTopicMetrics(clusterDO.getId());
+
+ // 获取到Topic流量指标之后,发布一个事件,
SpringTool.publish(new TopicMetricsCollectedEvent(this, clusterDO.getId(), metricsList));
}
private List getTopicMetrics(Long clusterId) {
+ // 具体获取Topic流量指标的入口代码
List metricsList =
jmxService.getTopicMetrics(clusterId, KafkaMetricsCollections.TOPIC_METRICS_TO_DB, true);
if (ValidateUtils.isEmptyList(metricsList)) {
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java
index b8632971..89d7e516 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java
@@ -1,15 +1,15 @@
package com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete;
-import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
+import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
import com.xiaojukeji.kafka.manager.dao.*;
-import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import java.util.Arrays;
import java.util.Date;
@@ -22,10 +22,7 @@ import java.util.List;
*/
@CustomScheduled(name = "deleteMetrics", cron = "0 0/2 * * * ?", threadNum = 1)
public class DeleteMetrics extends AbstractScheduledTask {
- private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
-
- @Autowired
- private ConfigUtils configUtils;
+ private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private TopicMetricsDao topicMetricsDao;
@@ -45,6 +42,27 @@ public class DeleteMetrics extends AbstractScheduledTask {
@Autowired
private TopicThrottledMetricsDao topicThrottledMetricsDao;
+ @Value(value = "${task.metrics.delete.delete-limit-size:1000}")
+ private Integer deleteLimitSize;
+
+ @Value(value = "${task.metrics.delete.cluster-metrics-save-days:14}")
+ private Integer clusterMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete.broker-metrics-save-days:14}")
+ private Integer brokerMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete.topic-metrics-save-days:7}")
+ private Integer topicMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete.topic-request-time-metrics-save-days:7}")
+ private Integer topicRequestTimeMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete.topic-throttled-metrics-save-days:7}")
+ private Integer topicThrottledMetricsSaveDays;
+
+ @Value(value = "${task.metrics.delete.app-topic-metrics-save-days:7}")
+ private Integer appTopicMetricsSaveDays;
+
@Override
public List listAllTasks() {
EmptyEntry emptyEntry = new EmptyEntry();
@@ -54,78 +72,73 @@ public class DeleteMetrics extends AbstractScheduledTask {
@Override
public void processTask(EmptyEntry entryEntry) {
- if (Constant.INVALID_CODE.equals(configUtils.getMaxMetricsSaveDays())) {
- // 无需数据删除
- return;
- }
-
long startTime = System.currentTimeMillis();
LOGGER.info("start delete metrics");
- try {
- deleteTopicMetrics();
- } catch (Exception e) {
- LOGGER.error("delete topic metrics failed.", e);
+
+ // 数据量可能比较大,一次触发多删除几次
+ for (int i = 0; i < 10; ++i) {
+ try {
+ boolean needReDelete = this.deleteCommunityTopicMetrics();
+ if (!needReDelete) {
+ break;
+ }
+
+ // 暂停1000毫秒,避免删除太快导致DB出现问题
+ BackoffUtils.backoff(1000);
+ } catch (Exception e) {
+ LOGGER.error("delete community topic metrics failed.", e);
+ }
+ }
+
+ // 数据量可能比较大,一次触发多删除几次
+ for (int i = 0; i < 10; ++i) {
+ try {
+ boolean needReDelete = this.deleteDiDiTopicMetrics();
+ if (!needReDelete) {
+ break;
+ }
+
+ // 暂停1000毫秒,避免删除太快导致DB出现问题
+ BackoffUtils.backoff(1000);
+ } catch (Exception e) {
+ LOGGER.error("delete didi topic metrics failed.", e);
+ }
}
try {
- deleteTopicAppMetrics();
+ this.deleteClusterBrokerMetrics();
} catch (Exception e) {
- LOGGER.error("delete topic app metrics failed.", e);
+ LOGGER.error("delete cluster and broker metrics failed.", e);
}
- try {
- deleteTopicRequestMetrics();
- } catch (Exception e) {
- LOGGER.error("delete topic request metrics failed.", e);
- }
-
- try {
- deleteThrottledMetrics();
- } catch (Exception e) {
- LOGGER.error("delete topic throttled metrics failed.", e);
- }
-
- try {
- deleteBrokerMetrics();
- } catch (Exception e) {
- LOGGER.error("delete broker metrics failed.", e);
- }
-
- try {
- deleteClusterMetrics();
- } catch (Exception e) {
- LOGGER.error("delete cluster metrics failed.", e);
- }
LOGGER.info("finish delete metrics, costTime:{}ms.", System.currentTimeMillis() - startTime);
}
- private void deleteTopicMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- topicMetricsDao.deleteBeforeTime(endTime);
+ private boolean deleteCommunityTopicMetrics() {
+ return topicMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize;
}
- private void deleteTopicAppMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- topicAppMetricsDao.deleteBeforeTime(endTime);
+ private boolean deleteDiDiTopicMetrics() {
+ boolean needReDelete = false;
+
+ if (topicAppMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.appTopicMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
+ needReDelete = true;
+ }
+
+ if (topicRequestMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicRequestTimeMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
+ needReDelete = true;
+ }
+
+ if (topicThrottledMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.topicThrottledMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize) >= this.deleteLimitSize) {
+ needReDelete = true;
+ }
+
+ return needReDelete;
}
- private void deleteTopicRequestMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- topicRequestMetricsDao.deleteBeforeTime(endTime);
- }
+ private void deleteClusterBrokerMetrics() {
+ brokerMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.brokerMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize);
- private void deleteThrottledMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- topicThrottledMetricsDao.deleteBeforeTime(endTime);
- }
-
- private void deleteBrokerMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- brokerMetricsDao.deleteBeforeTime(endTime);
- }
-
- private void deleteClusterMetrics() {
- Date endTime = new Date(System.currentTimeMillis() - configUtils.getMaxMetricsSaveDays() * 24 * 60 * 60 * 1000);
- clusterMetricsDao.deleteBeforeTime(endTime);
+ clusterMetricsDao.deleteBeforeTime(new Date(System.currentTimeMillis() - this.clusterMetricsSaveDays * 24 * 60 * 60 * 1000), this.deleteLimitSize);
}
}
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java
deleted file mode 100644
index 50f5f633..00000000
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreBrokerMetrics.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package com.xiaojukeji.kafka.manager.task.dispatch.metrics.store;
-
-import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
-import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
-import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
-import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
-import com.xiaojukeji.kafka.manager.common.entity.metrics.ClusterMetrics;
-import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
-import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
-import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
-import com.xiaojukeji.kafka.manager.dao.BrokerMetricsDao;
-import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao;
-import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO;
-import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
-import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO;
-import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
-import com.xiaojukeji.kafka.manager.service.service.ClusterService;
-import com.xiaojukeji.kafka.manager.service.service.JmxService;
-import com.xiaojukeji.kafka.manager.service.strategy.AbstractHealthScoreStrategy;
-import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
-import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
-import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Broker指标信息存DB, Broker流量, 集群流量
- * @author zengqiao
- * @date 20/5/7
- */
-@CustomScheduled(name = "storeBrokerMetrics", cron = "21 0/1 * * * ?", threadNum = 2)
-@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "broker-metrics-enabled", havingValue = "true", matchIfMissing = true)
-public class StoreBrokerMetrics extends AbstractScheduledTask {
- private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
-
- @Autowired
- private JmxService jmxService;
-
- @Autowired
- private ClusterService clusterService;
-
- @Autowired
- private BrokerMetricsDao brokerMetricsDao;
-
- @Autowired
- private ClusterMetricsDao clusterMetricsDao;
-
- @Autowired
- private AbstractHealthScoreStrategy healthScoreStrategy;
-
- private static final Integer INSERT_BATCH_SIZE = 100;
-
- @Override
- protected List listAllTasks() {
- return clusterService.list();
- }
-
- @Override
- public void processTask(ClusterDO clusterDO) {
- long startTime = System.currentTimeMillis();
- List clusterMetricsList = new ArrayList<>();
-
- try {
- List brokerMetricsList = getAndBatchAddMetrics(startTime, clusterDO.getId());
- clusterMetricsList.add(supplyAndConvert2ClusterMetrics(
- clusterDO.getId(),
- MetricsConvertUtils.merge2BaseMetricsByAdd(brokerMetricsList))
- );
- } catch (Throwable t) {
- LOGGER.error("collect failed, clusterId:{}.", clusterDO.getId(), t);
- }
- long endTime = System.currentTimeMillis();
- LOGGER.info("collect finish, clusterId:{} costTime:{}", clusterDO.getId(), endTime - startTime);
-
- List doList = MetricsConvertUtils.convertAndUpdateCreateTime2ClusterMetricsDOList(
- startTime,
- clusterMetricsList
- );
- clusterMetricsDao.batchAdd(doList);
- }
-
- private List getAndBatchAddMetrics(Long startTime, Long clusterId) {
- List metricsList = new ArrayList<>();
- for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) {
- BrokerMetrics metrics = jmxService.getBrokerMetrics(
- clusterId,
- brokerId,
- KafkaMetricsCollections.BROKER_TO_DB_METRICS
- );
- if (ValidateUtils.isNull(metrics)) {
- continue;
- }
- metrics.getMetricsMap().put(
- JmxConstant.HEALTH_SCORE,
- healthScoreStrategy.calBrokerHealthScore(clusterId, brokerId, metrics)
- );
- metricsList.add(metrics);
- }
- if (ValidateUtils.isEmptyList(metricsList)) {
- return new ArrayList<>();
- }
-
- List doList =
- MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(startTime, metricsList);
- int i = 0;
- do {
- brokerMetricsDao.batchAdd(doList.subList(i, Math.min(i + INSERT_BATCH_SIZE, doList.size())));
- i += INSERT_BATCH_SIZE;
- } while (i < doList.size());
- return metricsList;
- }
-
- private ClusterMetrics supplyAndConvert2ClusterMetrics(Long clusterId, BaseMetrics baseMetrics) {
- ClusterMetrics metrics = new ClusterMetrics(clusterId);
- Map metricsMap = metrics.getMetricsMap();
- metricsMap.putAll(baseMetrics.getMetricsMap());
- metricsMap.put(JmxConstant.TOPIC_NUM, PhysicalClusterMetadataManager.getTopicNameList(clusterId).size());
- metricsMap.put(JmxConstant.BROKER_NUM, PhysicalClusterMetadataManager.getBrokerIdList(clusterId).size());
- Integer partitionNum = 0;
- for (String topicName : PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
- TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
- if (ValidateUtils.isNull(topicMetaData)) {
- continue;
- }
- partitionNum += topicMetaData.getPartitionNum();
- }
- metricsMap.put(JmxConstant.PARTITION_NUM, partitionNum);
- return metrics;
- }
-}
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java
index ede6525d..6543f6fa 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiAppTopicMetrics.java
@@ -17,7 +17,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
import java.util.*;
@@ -28,7 +27,7 @@ import java.util.*;
@CustomScheduled(name = "storeDiDiAppTopicMetrics", cron = "41 0/1 * * * ?", threadNum = 5)
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "app-topic-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask {
- private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private JmxService jmxService;
@@ -50,7 +49,7 @@ public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask {
try {
getAndBatchAddTopicAppMetrics(startTime, clusterDO.getId());
- } catch (Throwable t) {
+ } catch (Exception t) {
LOGGER.error("save topic metrics failed, clusterId:{}.", clusterDO.getId(), t);
}
}
@@ -65,7 +64,12 @@ public class StoreDiDiAppTopicMetrics extends AbstractScheduledTask {
MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
int i = 0;
do {
- topicAppMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
+ List subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
+ if (ValidateUtils.isEmptyList(subDOList)) {
+ return;
+ }
+
+ topicAppMetricsDao.batchAdd(subDOList);
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java
index c4caa229..040612f2 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/store/StoreDiDiTopicRequestTimeMetrics.java
@@ -27,7 +27,7 @@ import java.util.*;
@CustomScheduled(name = "storeDiDiTopicRequestTimeMetrics", cron = "51 0/1 * * * ?", threadNum = 5)
@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-request-time-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask {
- private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private JmxService jmxService;
@@ -51,7 +51,7 @@ public class StoreDiDiTopicRequestTimeMetrics extends AbstractScheduledTask subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
+ if (ValidateUtils.isEmptyList(subDOList)) {
+ return;
+ }
+
+ topicRequestMetricsDao.batchAdd(subDOList);
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/biz/RegionCreatedListener.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/biz/RegionCreatedListener.java
new file mode 100644
index 00000000..5daa0e9e
--- /dev/null
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/biz/RegionCreatedListener.java
@@ -0,0 +1,38 @@
+package com.xiaojukeji.kafka.manager.task.listener.biz;
+
+import com.xiaojukeji.kafka.manager.common.events.RegionCreatedEvent;
+import com.xiaojukeji.kafka.manager.task.dispatch.biz.CalRegionCapacity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+/**
+ * Region创建监听器,
+ * TODO 后续需要将其移动到core模块
+ * @author zengqiao
+ * @date 22/01/11
+ */
+@Component
+public class RegionCreatedListener implements ApplicationListener {
+ private static final Logger logger = LoggerFactory.getLogger(RegionCreatedListener.class);
+
+ @Autowired
+ private CalRegionCapacity calRegionCapacity;
+
+ @Async
+ @Override
+ public void onApplicationEvent(RegionCreatedEvent event) {
+ try {
+ logger.info("cal region capacity started when region created, regionDO:{}.", event.getRegionDO());
+
+ calRegionCapacity.processTask(event.getRegionDO());
+
+ logger.info("cal region capacity finished when region created, regionDO:{}.", event.getRegionDO());
+ } catch (Exception e) {
+ logger.error("cal region capacity failed when region created, regionDO:{}.", event.getRegionDO(), e);
+ }
+ }
+}
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkBrokerMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkBrokerMetrics2DB.java
new file mode 100644
index 00000000..923d26b6
--- /dev/null
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkBrokerMetrics2DB.java
@@ -0,0 +1,55 @@
+package com.xiaojukeji.kafka.manager.task.listener.sink.db;
+
+import com.xiaojukeji.kafka.manager.common.constant.Constant;
+import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.BrokerMetricsDO;
+import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent;
+import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
+import com.xiaojukeji.kafka.manager.dao.BrokerMetricsDao;
+import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * @author zengqiao
+ * @date 22/01/17
+ */
+@Component
+@ConditionalOnProperty(prefix = "task.metrics.sink.broker-metrics", name = "sink-db-enabled", havingValue = "true", matchIfMissing = true)
+public class SinkBrokerMetrics2DB implements ApplicationListener {
+ private static final Logger logger = LoggerFactory.getLogger(SinkBrokerMetrics2DB.class);
+
+ @Autowired
+ private BrokerMetricsDao metricsDao;
+
+ @Override
+ public void onApplicationEvent(BatchBrokerMetricsCollectedEvent event) {
+ logger.debug("sink broker-metrics to db start, event:{}.", event);
+
+ List metricsList = event.getMetricsList();
+ if (ValidateUtils.isEmptyList(metricsList)) {
+ logger.warn("sink broker-metrics to db finished, without need sink, event:{}.", event);
+ return;
+ }
+
+ List doList = MetricsConvertUtils.convertAndUpdateCreateTime2BrokerMetricsDOList(event.getCollectTime(), metricsList);
+ int i = 0;
+ while (i < doList.size()) {
+ List subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
+ if (ValidateUtils.isEmptyList(subDOList)) {
+ break;
+ }
+
+ metricsDao.batchAdd(subDOList);
+ i += Constant.BATCH_INSERT_SIZE;
+ }
+
+ logger.debug("sink broker-metrics to db finished, event:{}.", event);
+ }
+}
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkClusterMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkClusterMetrics2DB.java
new file mode 100644
index 00000000..a1aab09c
--- /dev/null
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/SinkClusterMetrics2DB.java
@@ -0,0 +1,80 @@
+package com.xiaojukeji.kafka.manager.task.listener.sink.db;
+
+import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
+import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
+import com.xiaojukeji.kafka.manager.common.entity.metrics.ClusterMetrics;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterMetricsDO;
+import com.xiaojukeji.kafka.manager.common.events.metrics.BatchBrokerMetricsCollectedEvent;
+import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
+import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
+import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
+import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao;
+import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
+import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author zengqiao
+ * @date 22/01/17
+ */
+@Component
+@ConditionalOnProperty(prefix = "task.metrics.sink.cluster-metrics", name = "sink-db-enabled", havingValue = "true", matchIfMissing = true)
+public class SinkClusterMetrics2DB implements ApplicationListener {
+ private static final Logger logger = LoggerFactory.getLogger(SinkClusterMetrics2DB.class);
+
+ @Autowired
+ private ClusterMetricsDao clusterMetricsDao;
+
+ @Override
+ public void onApplicationEvent(BatchBrokerMetricsCollectedEvent event) {
+ logger.debug("sink cluster-metrics to db start, event:{}.", event);
+
+ List metricsList = event.getMetricsList();
+ if (ValidateUtils.isEmptyList(metricsList)) {
+ logger.warn("sink cluster-metrics to db finished, without need sink, event:{}.", event);
+ return;
+ }
+
+ List doList = MetricsConvertUtils.convertAndUpdateCreateTime2ClusterMetricsDOList(
+ event.getCollectTime(),
+ // 合并broker-metrics为cluster-metrics
+ Arrays.asList(supplyAndConvert2ClusterMetrics(event.getPhysicalClusterId(), MetricsConvertUtils.merge2BaseMetricsByAdd(event.getMetricsList())))
+ );
+
+ if (ValidateUtils.isEmptyList(doList)) {
+ logger.warn("sink cluster-metrics to db finished, without need sink, event:{}.", event);
+ return;
+ }
+
+ clusterMetricsDao.batchAdd(doList);
+
+ logger.debug("sink cluster-metrics to db finished, event:{}.", event);
+ }
+
+ private ClusterMetrics supplyAndConvert2ClusterMetrics(Long clusterId, BaseMetrics baseMetrics) {
+ ClusterMetrics metrics = new ClusterMetrics(clusterId);
+ Map metricsMap = metrics.getMetricsMap();
+ metricsMap.putAll(baseMetrics.getMetricsMap());
+ metricsMap.put(JmxConstant.TOPIC_NUM, PhysicalClusterMetadataManager.getTopicNameList(clusterId).size());
+ metricsMap.put(JmxConstant.BROKER_NUM, PhysicalClusterMetadataManager.getBrokerIdList(clusterId).size());
+ Integer partitionNum = 0;
+ for (String topicName : PhysicalClusterMetadataManager.getTopicNameList(clusterId)) {
+ TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
+ if (ValidateUtils.isNull(topicMetaData)) {
+ continue;
+ }
+ partitionNum += topicMetaData.getPartitionNum();
+ }
+ metricsMap.put(JmxConstant.PARTITION_NUM, partitionNum);
+ return metrics;
+ }
+}
\ No newline at end of file
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreCommunityTopicMetrics2DB.java
similarity index 78%
rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java
rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreCommunityTopicMetrics2DB.java
index 0c0714f7..267e32b7 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreCommunityTopicMetrics2DB.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreCommunityTopicMetrics2DB.java
@@ -1,4 +1,4 @@
-package com.xiaojukeji.kafka.manager.task.listener;
+package com.xiaojukeji.kafka.manager.task.listener.sink.db;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
@@ -25,7 +25,7 @@ import java.util.List;
@Component("storeCommunityTopicMetrics2DB")
@ConditionalOnProperty(prefix = "custom.store-metrics-task.community", name = "topic-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class StoreCommunityTopicMetrics2DB implements ApplicationListener {
- private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
@Autowired
private TopicMetricsDao topicMetricsDao;
@@ -40,17 +40,21 @@ public class StoreCommunityTopicMetrics2DB implements ApplicationListener metricsList) throws Exception {
- List doList =
- MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
+ private void store2DB(Long startTime, List metricsList) {
+ List doList = MetricsConvertUtils.convertAndUpdateCreateTime2TopicMetricsDOList(startTime, metricsList);
int i = 0;
do {
- topicMetricsDao.batchAdd(doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size())));
+ List subDOList = doList.subList(i, Math.min(i + Constant.BATCH_INSERT_SIZE, doList.size()));
+ if (ValidateUtils.isEmptyList(subDOList)) {
+ return;
+ }
+
+ topicMetricsDao.batchAdd(subDOList);
i += Constant.BATCH_INSERT_SIZE;
} while (i < doList.size());
}
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreTopicThrottledMetrics2DB.java
similarity index 96%
rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java
rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreTopicThrottledMetrics2DB.java
index 4e34e732..c2d74df3 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/StoreTopicThrottledMetrics2DB.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/db/StoreTopicThrottledMetrics2DB.java
@@ -1,4 +1,4 @@
-package com.xiaojukeji.kafka.manager.task.listener;
+package com.xiaojukeji.kafka.manager.task.listener.sink.db;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
@@ -22,7 +22,7 @@ import java.util.*;
* @date 20/9/24
*/
@Component("storeTopicThrottledMetrics2DB")
-@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics", havingValue = "true", matchIfMissing = true)
+@ConditionalOnProperty(prefix = "custom.store-metrics-task.didi", name = "topic-throttled-metrics-enabled", havingValue = "true", matchIfMissing = true)
public class StoreTopicThrottledMetrics2DB implements ApplicationListener {
private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER);
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Kafka.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkCommunityTopicMetrics2Kafka.java
similarity index 98%
rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Kafka.java
rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkCommunityTopicMetrics2Kafka.java
index ad80ceb2..5f3a0e5c 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Kafka.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkCommunityTopicMetrics2Kafka.java
@@ -1,4 +1,4 @@
-package com.xiaojukeji.kafka.manager.task.listener;
+package com.xiaojukeji.kafka.manager.task.listener.sink.kafka;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Kafka.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkConsumerMetrics2Kafka.java
similarity index 98%
rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Kafka.java
rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkConsumerMetrics2Kafka.java
index 7070dae1..eb6c2d37 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Kafka.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/kafka/SinkConsumerMetrics2Kafka.java
@@ -1,4 +1,4 @@
-package com.xiaojukeji.kafka.manager.task.listener;
+package com.xiaojukeji.kafka.manager.task.listener.sink.kafka;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkCommunityTopicMetrics2Monitor.java
similarity index 98%
rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java
rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkCommunityTopicMetrics2Monitor.java
index e2ac74a9..80b3eccd 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkCommunityTopicMetrics2Monitor.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkCommunityTopicMetrics2Monitor.java
@@ -1,4 +1,4 @@
-package com.xiaojukeji.kafka.manager.task.listener;
+package com.xiaojukeji.kafka.manager.task.listener.sink.monitor;
import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkConsumerMetrics2Monitor.java
similarity index 99%
rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java
rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkConsumerMetrics2Monitor.java
index 4ca276f9..a5c2e008 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkConsumerMetrics2Monitor.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkConsumerMetrics2Monitor.java
@@ -1,4 +1,4 @@
-package com.xiaojukeji.kafka.manager.task.listener;
+package com.xiaojukeji.kafka.manager.task.listener.sink.monitor;
import com.xiaojukeji.kafka.manager.monitor.common.entry.bizenum.MonitorMetricNameEnum;
import com.xiaojukeji.kafka.manager.common.constant.LogConstant;
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkTopicThrottledMetrics2Monitor.java
similarity index 98%
rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java
rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkTopicThrottledMetrics2Monitor.java
index fb95947c..ff1cb823 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/SinkTopicThrottledMetrics2Monitor.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/listener/sink/monitor/SinkTopicThrottledMetrics2Monitor.java
@@ -1,4 +1,4 @@
-package com.xiaojukeji.kafka.manager.task.listener;
+package com.xiaojukeji.kafka.manager.task.listener.sink.monitor;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.monitor.common.MonitorSinkConstant;
diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java
index a7d196af..54321240 100644
--- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java
+++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushZKConsumerGroupMetadata.java
@@ -32,6 +32,9 @@ public class FlushZKConsumerGroupMetadata {
@Autowired
private ClusterService clusterService;
+ @Autowired
+ private ThreadPool threadPool;
+
@Scheduled(cron="35 0/1 * * * ?")
public void schedule() {
List doList = clusterService.list();
@@ -95,7 +98,7 @@ public class FlushZKConsumerGroupMetadata {
return new ArrayList<>();
}
});
- ThreadPool.submitCollectMetricsTask(taskList[i]);
+ threadPool.submitCollectMetricsTask(clusterId, taskList[i]);
}
Map> topicNameConsumerGroupMap = new HashMap<>();
diff --git a/kafka-manager-web/pom.xml b/kafka-manager-web/pom.xml
index 17504ca7..80ccdfe1 100644
--- a/kafka-manager-web/pom.xml
+++ b/kafka-manager-web/pom.xml
@@ -16,10 +16,9 @@
1.8
1.8
- 2.1.1.RELEASE
- 5.1.3.RELEASE
false
- 8.5.66
+ 8.5.72
+ 2.16.0
@@ -72,22 +71,22 @@
org.springframework.boot
spring-boot-starter-web
- ${springframework.boot.version}
+ ${spring.boot.version}
org.springframework.boot
spring-boot-starter-aop
- ${springframework.boot.version}
+ ${spring.boot.version}
org.springframework.boot
spring-boot-starter-logging
- ${springframework.boot.version}
+ ${spring.boot.version}
org.springframework.boot
spring-boot-starter-thymeleaf
- ${springframework.boot.version}
+ ${spring.boot.version}
junit
@@ -104,16 +103,17 @@
org.springframework
spring-context-support
- ${spring-version}
+ kafka-manager
+
org.springframework.boot
spring-boot-maven-plugin
- ${springframework.boot.version}
+ ${spring.boot.version}
@@ -121,6 +121,7 @@
+
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java
index 790b85be..8469afec 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java
@@ -32,7 +32,7 @@ import java.util.stream.Collectors;
*/
@Api(tags = "开放接口-Broker相关接口(REST)")
@RestController
-@RequestMapping(ApiPrefix.API_V1_THIRD_PART_OP_PREFIX)
+@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX)
public class ThirdPartBrokerController {
@Autowired
private BrokerService brokerService;
@@ -44,7 +44,7 @@ public class ThirdPartBrokerController {
private ClusterService clusterService;
@ApiOperation(value = "Broker信息概览", notes = "")
- @RequestMapping(value = "{clusterId}/brokers/{brokerId}/overview", method = RequestMethod.GET)
+ @GetMapping(value = "{clusterId}/brokers/{brokerId}/overview")
@ResponseBody
public Result getBrokerOverview(@PathVariable Long clusterId,
@PathVariable Integer brokerId) {
@@ -70,7 +70,7 @@ public class ThirdPartBrokerController {
}
@ApiOperation(value = "BrokerRegion信息", notes = "所有集群的")
- @RequestMapping(value = "broker-regions", method = RequestMethod.GET)
+ @GetMapping(value = "broker-regions")
@ResponseBody
public Result> getBrokerRegions() {
List clusterDOList = clusterService.list();
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java
index 91d0080c..f8406cfe 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/SwaggerConfig.java
@@ -1,5 +1,7 @@
package com.xiaojukeji.kafka.manager.web.config;
+import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.*;
@@ -20,6 +22,9 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
@EnableWebMvc
@EnableSwagger2
public class SwaggerConfig implements WebMvcConfigurer {
+ @Autowired
+ private ConfigUtils configUtils;
+
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("swagger-ui.html").addResourceLocations("classpath:/META-INF/resources/");
@@ -39,10 +44,9 @@ public class SwaggerConfig implements WebMvcConfigurer {
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
- .title("Logi-KafkaManager 接口文档")
- .description("欢迎使用滴滴Logi-KafkaManager")
- .contact("huangyiminghappy@163.com")
- .version("2.2.0")
+ .title("LogiKM接口文档")
+ .description("欢迎使用滴滴LogiKM")
+ .version(configUtils.getApplicationVersion())
.build();
}
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java
index bbe8c656..ab6c0ba6 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/OrderConverter.java
@@ -1,15 +1,16 @@
package com.xiaojukeji.kafka.manager.web.converters;
-import com.xiaojukeji.kafka.manager.common.entity.ao.account.Account;
import com.xiaojukeji.kafka.manager.bpm.common.OrderResult;
+import com.xiaojukeji.kafka.manager.bpm.common.OrderStatusEnum;
import com.xiaojukeji.kafka.manager.bpm.common.entry.BaseOrderDetailData;
+import com.xiaojukeji.kafka.manager.common.entity.ao.account.Account;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.OrderDO;
import com.xiaojukeji.kafka.manager.common.entity.vo.common.AccountVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.OrderResultVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.OrderVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.order.detail.OrderDetailBaseVO;
import com.xiaojukeji.kafka.manager.common.utils.CopyUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
-import com.xiaojukeji.kafka.manager.common.entity.pojo.OrderDO;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,7 +42,9 @@ public class OrderConverter {
}
OrderVO orderVO = new OrderVO();
CopyUtils.copyProperties(orderVO, orderDO);
- orderVO.setGmtTime(orderDO.getGmtCreate());
+ if (OrderStatusEnum.WAIT_DEAL.getCode().equals(orderDO.getStatus())) {
+ orderVO.setGmtHandle(null);
+ }
return orderVO;
}
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ReassignModelConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ReassignModelConverter.java
index 747fbb8b..06de3ad9 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ReassignModelConverter.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/ReassignModelConverter.java
@@ -95,12 +95,21 @@ public class ReassignModelConverter {
vo.setBeginTime(0L);
vo.setEndTime(0L);
+ StringBuilder clusterAndTopicName = new StringBuilder();
+
Integer completedTopicNum = 0;
Set statusSet = new HashSet<>();
for (ReassignTaskDO elem: doList) {
vo.setGmtCreate(elem.getGmtCreate().getTime());
vo.setOperator(elem.getOperator());
vo.setDescription(elem.getDescription());
+
+ if (clusterAndTopicName.length() == 0) {
+ clusterAndTopicName.append("-").append(elem.getClusterId()).append("-").append(elem.getTopicName());
+ } else {
+ clusterAndTopicName.append("等");
+ }
+
if (TaskStatusReassignEnum.isFinished(elem.getStatus())) {
completedTopicNum += 1;
statusSet.add(elem.getStatus());
@@ -114,6 +123,9 @@ public class ReassignModelConverter {
vo.setBeginTime(elem.getBeginTime().getTime());
}
+ // 任务名称上,增加展示集群ID和Topic名称,多个时,仅展示第一个. PR from Hongten
+ vo.setTaskName(String.format("%s 数据迁移任务%s", DateUtils.getFormattedDate(taskId), clusterAndTopicName.toString()));
+
// 任务整体状态
if (statusSet.contains(TaskStatusReassignEnum.RUNNING.getCode())) {
vo.setStatus(TaskStatusReassignEnum.RUNNING.getCode());
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicMineConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicMineConverter.java
index 97b8f04a..e21c41da 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicMineConverter.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicMineConverter.java
@@ -29,6 +29,7 @@ public class TopicMineConverter {
vo.setClusterName(data.getLogicalClusterName());
vo.setBytesIn(data.getBytesIn());
vo.setBytesOut(data.getBytesOut());
+ vo.setDescription(data.getDescription());
voList.add(vo);
}
return voList;
diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml
index 4463d746..9cd51d46 100644
--- a/kafka-manager-web/src/main/resources/application.yml
+++ b/kafka-manager-web/src/main/resources/application.yml
@@ -9,6 +9,9 @@ server:
spring:
application:
name: kafkamanager
+ version: @project.version@
+ profiles:
+ active: dev
datasource:
kafka-manager:
jdbc-url: jdbc:mysql://localhost:3306/logi_kafka_manager?characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
@@ -18,8 +21,6 @@ spring:
main:
allow-bean-definition-overriding: true
- profiles:
- active: dev
servlet:
multipart:
max-file-size: 100MB
@@ -30,27 +31,57 @@ logging:
custom:
idc: cn
- jmx:
- max-conn: 10 # 2.3版本配置不在这个地方生效
store-metrics-task:
community:
- broker-metrics-enabled: true
topic-metrics-enabled: true
- didi:
+ didi: # 滴滴Kafka特有的指标
app-topic-metrics-enabled: false
topic-request-time-metrics-enabled: false
- topic-throttled-metrics: false
- save-days: 7
+ topic-throttled-metrics-enabled: false
-# 任务相关的开关
+# 任务相关的配置
task:
op:
- sync-topic-enabled: false # 未落盘的Topic定期同步到DB中
- order-auto-exec: # 工单自动化审批线程的开关
- topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启
- app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启
+ sync-topic-enabled: false # 未落盘的Topic定期同步到DB中
+ order-auto-exec: # 工单自动化审批线程的开关
+ topic-enabled: false # Topic工单自动化审批开关, false:关闭自动化审批, true:开启
+ app-enabled: false # App工单自动化审批开关, false:关闭自动化审批, true:开启
+ metrics:
+ collect: # 收集指标
+ broker-metrics-enabled: true # 收集Broker指标
+ sink: # 上报指标
+ cluster-metrics: # 上报cluster指标
+ sink-db-enabled: true # 上报到db
+ broker-metrics: # 上报broker指标
+ sink-db-enabled: true # 上报到db
+ delete: # 删除指标
+ delete-limit-size: 1000 # 单次删除的批大小
+ cluster-metrics-save-days: 14 # 集群指标保存天数
+ broker-metrics-save-days: 14 # Broker指标保存天数
+ topic-metrics-save-days: 7 # Topic指标保存天数
+ topic-request-time-metrics-save-days: 7 # Topic请求耗时指标保存天数
+ topic-throttled-metrics-save-days: 7 # Topic限流指标保存天数
+ app-topic-metrics-save-days: 7 # App+Topic指标保存天数
+
+thread-pool:
+ collect-metrics:
+ thread-num: 256 # 收集指标线程池大小
+ queue-size: 5000 # 收集指标线程池的queue大小
+ api-call:
+ thread-num: 16 # api服务线程池大小
+ queue-size: 5000 # api服务线程池的queue大小
+
+client-pool:
+ kafka-consumer:
+ min-idle-client-num: 24 # 最小空闲客户端数
+ max-idle-client-num: 24 # 最大空闲客户端数
+ max-total-client-num: 24 # 最大客户端数
+ borrow-timeout-unit-ms: 3000 # 租借超时时间,单位毫秒
account:
+ jump-login:
+ gateway-api: false # 网关接口
+ third-part-api: false # 第三方接口
ldap:
enabled: false
url: ldap://127.0.0.1:389/
@@ -64,19 +95,20 @@ account:
auth-user-registration: true
auth-user-registration-role: normal
-kcm:
- enabled: false
- s3:
+kcm: # 集群安装部署,仅安装broker
+ enabled: false # 是否开启
+ s3: # s3 存储服务
endpoint: s3.didiyunapi.com
access-key: 1234567890
secret-key: 0987654321
bucket: logi-kafka
- n9e:
- base-url: http://127.0.0.1:8004
- user-token: 12345678
- timeout: 300
- account: root
- script-file: kcm_script.sh
+ n9e: # 夜莺
+ base-url: http://127.0.0.1:8004 # 夜莺job服务地址
+ user-token: 12345678 # 用户的token
+ timeout: 300 # 当台操作的超时时间
+ account: root # 操作时使用的账号
+ script-file: kcm_script.sh # 脚本,已内置好,在源码的kcm模块内,此处配置无需修改
+ logikm-url: http://127.0.0.1:8080 # logikm部署地址,部署时kcm_script.sh会调用logikm检查部署中的一些状态
monitor:
enabled: false
diff --git a/pom.xml b/pom.xml
index a7c70e54..8b8db3a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,14 +11,15 @@
org.springframework.boot
spring-boot-starter-parent
- 2.1.1.RELEASE
+ 2.1.18.RELEASE
- 2.4.2-SNAPSHOT
- 2.7.0
- 1.5.13
+ 2.6.0
+ 2.1.18.RELEASE
+ 2.9.2
+ 1.5.21
true
true
@@ -26,7 +27,9 @@
1.8
UTF-8
UTF-8
- 8.5.66
+ 8.5.72
+ 2.16.0
+ 3.0.0
@@ -42,6 +45,7 @@
kafka-manager-extends/kafka-manager-openapi
kafka-manager-task
kafka-manager-web
+ distribution
@@ -62,6 +66,11 @@
swagger-annotations
${swagger.version}
+
+ io.swagger
+ swagger-models
+ ${swagger.version}
+
@@ -229,6 +238,25 @@
minio
7.1.0
+
+
+ org.projectlombok
+ lombok
+ 1.18.2
+ provided
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ ${maven-assembly-plugin.version}
+
+
+
\ No newline at end of file