From 0f8be4fadc3e4737a2ca253943dd57308f844337 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 5 Dec 2022 14:04:19 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=E4=BC=98=E5=8C=96=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E8=BE=93=E5=87=BA=20&=20=E6=9C=AC=E5=9C=B0=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E7=BB=9F=E4=B8=80=E7=AE=A1=E7=90=86(#800)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/ClusterTopicsManagerImpl.java | 2 +- .../km/common/constant/MsgConstant.java | 4 + .../km/core/cache/DataBaseDataLocalCache.java | 14 + .../km/core/flusher/DatabaseDataFlusher.java | 38 +++ .../config/PlatformClusterConfigService.java | 1 + .../PlatformClusterConfigServiceImpl.java | 17 + .../service/topic/TopicMetricService.java | 2 +- .../topic/impl/TopicMetricServiceImpl.java | 46 +-- .../km/persistence/es/ESOpClient.java | 293 +++++++----------- .../persistence/es/dao/TopicMetricESDAO.java | 33 +- .../listener/TaskClusterAddedListener.java | 2 +- 11 files changed, 221 insertions(+), 231 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java index c68f9b9a..17a6c63c 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java @@ -44,7 +44,7 @@ public class ClusterTopicsManagerImpl implements ClusterTopicsManager { List topicList = topicService.listTopicsFromDB(clusterPhyId); // 获取集群所有Topic的指标 - Map metricsMap = topicMetricService.getLatestMetricsFromCacheFirst(clusterPhyId); + Map metricsMap = topicMetricService.getLatestMetricsFromCache(clusterPhyId); // 转换成vo List voList = TopicVOConverter.convert2ClusterPhyTopicsOverviewVOList(topicList, metricsMap); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java index 1be8dadf..9072810d 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java @@ -52,6 +52,10 @@ public class MsgConstant { /**************************************************** Partition ****************************************************/ + public static String getPartitionNoLeader(Long clusterPhyId) { + return String.format("集群ID:[%d] 所有分区NoLeader", clusterPhyId); + } + public static String getPartitionNoLeader(Long clusterPhyId, String topicName) { return String.format("集群ID:[%d] Topic名称:[%s] 所有分区NoLeader", clusterPhyId, topicName); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java index 84e2363b..0cb6f832 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/cache/DataBaseDataLocalCache.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.cache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import java.util.List; @@ -10,6 +11,11 @@ import java.util.Map; import java.util.concurrent.TimeUnit; public class DataBaseDataLocalCache { + private static final Cache> topicLatestMetricsCache = Caffeine.newBuilder() + .expireAfterWrite(360, TimeUnit.SECONDS) + .maximumSize(500) + .build(); + private static final Cache clusterLatestMetricsCache = Caffeine.newBuilder() .expireAfterWrite(180, TimeUnit.SECONDS) .maximumSize(500) @@ -20,6 +26,14 @@ public class DataBaseDataLocalCache { .maximumSize(500) .build(); + public static Map getTopicMetrics(Long clusterPhyId) { + return topicLatestMetricsCache.getIfPresent(clusterPhyId); + } + + public static void putTopicMetrics(Long clusterPhyId, Map metricsMap) { + topicLatestMetricsCache.put(clusterPhyId, metricsMap); + } + public static ClusterMetrics getClusterLatestMetrics(Long clusterPhyId) { return clusterLatestMetricsCache.getIfPresent(clusterPhyId); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java index 95519768..1ac1e86a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java @@ -4,13 +4,18 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; +import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; +import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; +import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -18,11 +23,19 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; @Service public class DatabaseDataFlusher { private static final ILog LOGGER = LogFactory.getLog(DatabaseDataFlusher.class); + @Autowired + private TopicService topicService; + + @Autowired + private TopicMetricService topicMetricService; + @Autowired private ClusterPhyService clusterPhyService; @@ -37,6 +50,8 @@ public class DatabaseDataFlusher { this.flushPartitionsCache(); this.flushClusterLatestMetricsCache(); + + this.flushTopicLatestMetricsCache(); } @Scheduled(cron="0 0/1 * * * ?") @@ -81,4 +96,27 @@ public class DatabaseDataFlusher { }); } } + + @Scheduled(cron = "0 0/1 * * * ?") + private void flushTopicLatestMetricsCache() { + for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) { + FutureUtil.quickStartupFutureUtil.submitTask(() -> { + try { + + List topicNameList = topicService.listTopicsFromCacheFirst(clusterPhy.getId()).stream().map(Topic::getTopicName).collect(Collectors.toList()); + + List metricsList = topicMetricService.listTopicLatestMetricsFromES(clusterPhy.getId(), topicNameList, Collections.emptyList()); + + Map metricsMap = metricsList + .stream() + .collect(Collectors.toMap(TopicMetrics::getTopic, Function.identity())); + + DataBaseDataLocalCache.putTopicMetrics(clusterPhy.getId(), metricsMap); + + } catch (Exception e) { + LOGGER.error("method=flushTopicLatestMetricsCache||clusterPhyId={}||errMsg=exception!", clusterPhy.getId(), e); + } + }); + } + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/PlatformClusterConfigService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/PlatformClusterConfigService.java index 1d9e571d..148cec1e 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/PlatformClusterConfigService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/PlatformClusterConfigService.java @@ -17,4 +17,5 @@ public interface PlatformClusterConfigService { Map getByClusterAndGroupWithoutDefault(Long clusterPhyId, String group); + Map> listByGroup(String groupName); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/impl/PlatformClusterConfigServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/impl/PlatformClusterConfigServiceImpl.java index c6b3bd5d..2c65e9fa 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/impl/PlatformClusterConfigServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/config/impl/PlatformClusterConfigServiceImpl.java @@ -12,6 +12,7 @@ import com.xiaojukeji.know.streaming.km.persistence.mysql.config.PlatformCluster import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -68,4 +69,20 @@ public class PlatformClusterConfigServiceImpl implements PlatformClusterConfigSe return configPOMap; } + + @Override + public Map> listByGroup(String groupName) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(PlatformClusterConfigPO::getValueGroup, groupName); + + List poList = platformClusterConfigDAO.selectList(lambdaQueryWrapper); + + Map> poMap = new HashMap<>(); + poList.forEach(elem -> { + poMap.putIfAbsent(elem.getClusterId(), new HashMap<>()); + poMap.get(elem.getClusterId()).put(elem.getValueName(), elem); + }); + + return poMap; + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicMetricService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicMetricService.java index d2428aa3..014b460a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicMetricService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicMetricService.java @@ -23,7 +23,7 @@ public interface TopicMetricService { /** * 优先从本地缓存获取metrics信息 */ - Map getLatestMetricsFromCacheFirst(Long clusterPhyId); + Map getLatestMetricsFromCache(Long clusterPhyId); /** * 获取Topic在具体Broker上最新的一个指标 diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java index cf04f2e8..fe680901 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java @@ -2,13 +2,10 @@ package com.xiaojukeji.know.streaming.km.core.service.topic.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.Table; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricsTopicDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; -import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.PartitionMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; @@ -30,25 +27,22 @@ import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache; +import com.xiaojukeji.know.streaming.km.core.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService; -import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import com.xiaojukeji.know.streaming.km.persistence.es.dao.TopicMetricESDAO; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.management.InstanceNotFoundException; import javax.management.ObjectName; import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*; @@ -58,8 +52,7 @@ import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafk */ @Service public class TopicMetricServiceImpl extends BaseMetricService implements TopicMetricService { - - private static final ILog LOGGER = LogFactory.getLog( TopicMetricServiceImpl.class); + private static final ILog LOGGER = LogFactory.getLog(TopicMetricServiceImpl.class); public static final String TOPIC_METHOD_DO_NOTHING = "doNothing"; public static final String TOPIC_METHOD_GET_HEALTH_SCORE = "getMetricHealthScore"; @@ -86,18 +79,6 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe @Autowired private TopicMetricESDAO topicMetricESDAO; - private final Cache> topicLatestMetricsCache = Caffeine.newBuilder() - .expireAfterWrite(5, TimeUnit.MINUTES) - .maximumSize(200) - .build(); - - @Scheduled(cron = "0 0/2 * * * ?") - private void flushClusterLatestMetricsCache() { - for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) { - this.updateCacheAndGetMetrics(clusterPhy.getId()); - } - } - @Override protected VersionItemTypeEnum getVersionItemType() { return VersionItemTypeEnum.METRIC_TOPIC; @@ -152,13 +133,13 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe } @Override - public Map getLatestMetricsFromCacheFirst(Long clusterPhyId) { - Map metricsMap = topicLatestMetricsCache.getIfPresent(clusterPhyId); - if (metricsMap != null) { - return metricsMap; + public Map getLatestMetricsFromCache(Long clusterPhyId) { + Map metricsMap = DataBaseDataLocalCache.getTopicMetrics(clusterPhyId); + if (metricsMap == null) { + return new HashMap<>(); } - return this.updateCacheAndGetMetrics(clusterPhyId); + return metricsMap; } @Override @@ -308,19 +289,8 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe return Result.buildSuc(count); } + /**************************************************** private method ****************************************************/ - private Map updateCacheAndGetMetrics(Long clusterPhyId) { - List topicNames = topicService.listTopicsFromDB(clusterPhyId) - .stream().map(Topic::getTopicName).collect(Collectors.toList()); - - List metrics = listTopicLatestMetricsFromES(clusterPhyId, topicNames, Arrays.asList()); - - Map metricsMap = metrics.stream() - .collect(Collectors.toMap(TopicMetrics::getTopic, Function.identity())); - - topicLatestMetricsCache.put(clusterPhyId, metricsMap); - return metricsMap; - } private List listTopNTopics(Long clusterId, int topN){ diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index 5e91f7b2..822c44e1 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.didiglobal.logi.elasticsearch.client.ESClient; import com.didiglobal.logi.elasticsearch.client.gateway.document.ESIndexRequest; import com.didiglobal.logi.elasticsearch.client.gateway.document.ESIndexResponse; +import com.didiglobal.logi.elasticsearch.client.model.exception.ESIndexNotFoundException; import com.didiglobal.logi.elasticsearch.client.model.type.ESVersion; import com.didiglobal.logi.elasticsearch.client.request.batch.BatchNode; import com.didiglobal.logi.elasticsearch.client.request.batch.BatchType; @@ -20,10 +21,11 @@ import com.didiglobal.logi.elasticsearch.client.response.indices.puttemplate.ESI import com.didiglobal.logi.elasticsearch.client.response.query.query.ESQueryResponse; import com.didiglobal.logi.elasticsearch.client.response.setting.template.TemplateConfig; import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; import com.google.common.collect.Lists; import com.xiaojukeji.know.streaming.km.common.bean.po.BaseESPO; -import com.xiaojukeji.know.streaming.km.common.utils.EnvUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.LoggerUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpStatus; @@ -36,6 +38,7 @@ import javax.annotation.Nullable; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -43,7 +46,7 @@ import java.util.stream.Collectors; @Component public class ESOpClient { - private static final ILog LOGGER = LogFactory.getLog("ES_LOGGER"); + private static final ILog LOGGER = LoggerUtil.getESLogger(); /** * es 地址 @@ -90,40 +93,27 @@ public class ESOpClient { ESClient esClient = this.buildEsClient(esAddress, esPass, "", ""); if (esClient != null) { this.esClientPool.add(esClient); - LOGGER.info("class=ESOpClient||method=init||msg=add new es client {}", esAddress); + LOGGER.info("method=init||esAddress={}||msg=add new es client", esAddress); } } } /** - * 从更新es http 客户端连接池找那个获取 - * - * @return + * 获取ES客户端 */ public ESClient getESClientFromPool() { + if (ValidateUtils.isEmptyList(esClientPool)) { + return null; + } + return esClientPool.get((int)(System.currentTimeMillis() % clientCnt)); } - /** - * 归还到es http 客户端连接池 - * @param esClient - */ - public void returnESClientToPool(ESClient esClient) { - // 已不需要进行归还,后续再删除该代码 - } - /** * 查询并获取第一个元素 - * - * @param indexName - * @param queryDsl - * @param clzz - * @param - * @return */ - public T performRequestAndTakeFirst(String indexName, String queryDsl, Class clzz) { - List hits = performRequest(indexName, queryDsl, clzz); - + public T performRequestAndTakeFirst(String indexName, String queryDsl, Class clazz) { + List hits = this.performRequest(indexName, queryDsl, clazz); if (CollectionUtils.isEmpty(hits)) { return null; } @@ -133,31 +123,20 @@ public class ESOpClient { /** * 查询并获取第一个元素 - * - * @param indexName - * @param queryDsl - * @param clazz - * @param - * @return */ - public T performRequestAndTakeFirst(String routingValue, String indexName, - String queryDsl, Class clazz) { - List hits = performRequestWithRouting(routingValue, indexName, queryDsl, clazz); - - if (CollectionUtils.isEmpty(hits)) {return null;} + public T performRequestAndTakeFirst(String routingValue, String indexName, String queryDsl, Class clazz) { + List hits = this.performRequestWithRouting(routingValue, indexName, queryDsl, clazz); + if (CollectionUtils.isEmpty(hits)) { + return null; + } return hits.get(0); } /** * 执行查询 - * - * @param indexName - * @param queryDsl - * @return - * @throws IOException */ - public ESQueryResponse performRequest(String indexName,String queryDsl) { + public ESQueryResponse performRequest(String indexName, String queryDsl) { return doQuery(new ESQueryRequest().indices(indexName).source(queryDsl)); } @@ -170,7 +149,7 @@ public class ESOpClient { return func.apply(esQueryResponse); } - public List performRequest(String indexName, String queryDsl, Class clzz) { + public List performRequest(String indexName, String queryDsl, Class clzz) { ESQueryResponse esQueryResponse = doQuery( new ESQueryRequest().indices(indexName).source(queryDsl).clazz(clzz)); if (esQueryResponse == null) { @@ -210,8 +189,7 @@ public class ESOpClient { return hits; } - public R performRequestWithRouting(String routingValue, String indexName, - String queryDsl, Function func, int tryTimes) { + public R performRequestWithRouting(String routingValue, String indexName, String queryDsl, Function func, int tryTimes) { ESQueryResponse esQueryResponse; do { esQueryResponse = doQuery(new ESQueryRequest().routing(routingValue).indices(indexName).source(queryDsl)); @@ -222,16 +200,12 @@ public class ESOpClient { /** * 写入单条 - * - * @param source - * @return */ public boolean index(String indexName, String id, String source) { - ESClient esClient = null; ESIndexResponse response = null; try { - esClient = getESClientFromPool(); + ESClient esClient = this.getESClientFromPool(); if (esClient == null) { return false; } @@ -250,20 +224,11 @@ public class ESOpClient { return response.getRestStatus().getStatus() == HttpStatus.SC_OK || response.getRestStatus().getStatus() == HttpStatus.SC_CREATED; } - } catch (Exception e) { - LOGGER.warn( - "class=ESOpClient||method=index||indexName={}||id={}||source={}||errMsg=index doc error. ", - indexName, id, source, e); - if (response != null) { - LOGGER.warn( - "class=ESOpClient||method=index||indexName={}||id={}||source={}||errMsg=response {}", - indexName, id, source, JSON.toJSONString(response)); - } - } finally { - if (esClient != null) { - returnESClientToPool(esClient); - } + LOGGER.error( + "method=index||indexName={}||id={}||source={}||response={}||errMsg=index failed", + indexName, id, source, ConvertUtil.obj2Json(response), e + ); } return false; @@ -271,19 +236,15 @@ public class ESOpClient { /** * 批量写入 - * - * @param indexName - * @return */ public boolean batchInsert(String indexName, List pos) { if (CollectionUtils.isEmpty(pos)) { return true; } - ESClient esClient = null; ESBatchResponse response = null; try { - esClient = getESClientFromPool(); + ESClient esClient = this.getESClientFromPool(); if (esClient == null) { return false; } @@ -312,16 +273,10 @@ public class ESOpClient { return response.getRestStatus().getStatus() == HttpStatus.SC_OK && !response.getErrors(); } } catch (Exception e) { - LOGGER.warn( - "method=batchInsert||indexName={}||errMsg=batch insert error. ", indexName, e); - if (response != null) { - LOGGER.warn("method=batchInsert||indexName={}||errMsg=response {}", indexName, JSON.toJSONString(response)); - } - - } finally { - if (esClient != null) { - returnESClientToPool(esClient); - } + LOGGER.error( + "method=batchInsert||indexName={}||response={}||errMsg=batch index failed", + indexName, ConvertUtil.obj2Json(response), e + ); } return false; @@ -331,9 +286,8 @@ public class ESOpClient { * 根据表达式判断索引是否已存在 */ public boolean indexExist(String indexName) { - ESClient esClient = null; try { - esClient = this.getESClientFromPool(); + ESClient esClient = this.getESClientFromPool(); if (esClient == null) { return false; } @@ -341,11 +295,7 @@ public class ESOpClient { // 检查索引是否存在 return esClient.admin().indices().prepareExists(indexName).execute().actionGet(30, TimeUnit.SECONDS).isExists(); } catch (Exception e){ - LOGGER.warn("class=ESOpClient||method=indexExist||indexName={}||msg=exception!", indexName, e); - } finally { - if (esClient != null) { - returnESClientToPool(esClient); - } + LOGGER.error("method=indexExist||indexName={}||msg=exception!", indexName, e); } return false; @@ -355,48 +305,45 @@ public class ESOpClient { * 创建索引 */ public boolean createIndex(String indexName) { - if (indexExist(indexName)) { + if (this.indexExist(indexName)) { return true; } - ESClient client = getESClientFromPool(); - if (client != null) { - try { - ESIndicesPutIndexResponse response = client.admin().indices().preparePutIndex(indexName).execute() - .actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS); - return response.getAcknowledged(); - } catch (Exception e){ - LOGGER.warn( "msg=create index fail||indexName={}", indexName, e); - } finally { - returnESClientToPool(client); - } + ESClient client = this.getESClientFromPool(); + try { + ESIndicesPutIndexResponse response = client + .admin() + .indices() + .preparePutIndex(indexName) + .execute() + .actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS); + + return response.getAcknowledged(); + } catch (Exception e){ + LOGGER.error( "method=createIndex||indexName={}||errMsg=exception!", indexName, e); } return false; } public boolean templateExist(String indexTemplateName){ - ESClient esClient = null; - try { - esClient = this.getESClientFromPool(); + ESClient esClient = this.getESClientFromPool(); // 获取es中原来index template的配置 - ESIndicesGetTemplateResponse getTemplateResponse = - esClient.admin().indices().prepareGetTemplate( indexTemplateName ).execute().actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS ); + ESIndicesGetTemplateResponse getTemplateResponse = esClient + .admin() + .indices() + .prepareGetTemplate(indexTemplateName) + .execute() + .actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS ); TemplateConfig templateConfig = getTemplateResponse.getMultiTemplatesConfig().getSingleConfig(); - if (null != templateConfig) { return true; } } catch (Exception e) { - LOGGER.warn( "method=templateExist||indexTemplateName={}||msg=exception!", - indexTemplateName, e); - } finally { - if (esClient != null) { - this.returnESClientToPool(esClient); - } + LOGGER.error( "method=templateExist||indexTemplateName={}||msg=exception!", indexTemplateName, e); } return false; @@ -406,27 +353,29 @@ public class ESOpClient { * 创建索引模板 */ public boolean createIndexTemplateIfNotExist(String indexTemplateName, String config) { - ESClient esClient = null; - try { - esClient = this.getESClientFromPool(); + ESClient esClient = this.getESClientFromPool(); - //存在模板就返回,不存在就创建 - if(templateExist(indexTemplateName)){return true;} + // 存在模板就返回,不存在就创建 + if(this.templateExist(indexTemplateName)) { + return true; + } // 创建新的模板 - ESIndicesPutTemplateResponse response = esClient.admin().indices().preparePutTemplate( indexTemplateName ) - .setTemplateConfig( config ).execute().actionGet( ES_OPERATE_TIMEOUT, TimeUnit.SECONDS ); + ESIndicesPutTemplateResponse response = esClient + .admin() + .indices() + .preparePutTemplate( indexTemplateName ) + .setTemplateConfig(config) + .execute() + .actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS); return response.getAcknowledged(); } catch (Exception e) { - LOGGER.warn( "method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!", + LOGGER.error( + "method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!", indexTemplateName, config, e ); - } finally { - if (esClient != null) { - this.returnESClientToPool(esClient); - } } return false; @@ -434,54 +383,47 @@ public class ESOpClient { /** * 根据索引模板获取所有的索引 - * @param indexName - * @return */ - public List listIndexByName(String indexName){ - ESClient esClient = null; - + public List listIndexByName(String indexName) { try { - esClient = this.getESClientFromPool(); + ESClient esClient = this.getESClientFromPool(); - ESIndicesCatIndicesResponse response = esClient.admin().indices().prepareCatIndices(indexName + "*").execute() + ESIndicesCatIndicesResponse response = esClient + .admin() + .indices() + .prepareCatIndices(indexName + "*") + .execute() .actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS); - - if(null != response){ + if(null != response) { return response.getCatIndexResults().stream().map(CatIndexResult::getIndex).collect(Collectors.toList()); } } catch (Exception e) { - LOGGER.warn( "method=listIndexByTemplate||indexName={}||msg=exception!", - indexName, e); - } finally { - if (esClient != null) { - this.returnESClientToPool(esClient); - } + LOGGER.error( "method=listIndexByName||indexName={}||msg=exception!", indexName, e); } - return new ArrayList<>(); + return Collections.emptyList(); } /** * 删除索引 - * @param indexRealName - * @return */ public boolean delIndexByName(String indexRealName){ - ESClient esClient = null; - try { - esClient = this.getESClientFromPool(); + ESClient esClient = this.getESClientFromPool(); - ESIndicesDeleteIndexResponse response = esClient.admin().indices().prepareDeleteIndex(indexRealName).execute() + ESIndicesDeleteIndexResponse response = esClient + .admin() + .indices() + .prepareDeleteIndex(indexRealName) + .execute() .actionGet(ES_OPERATE_TIMEOUT, TimeUnit.SECONDS); + return response.getAcknowledged(); + } catch (ESIndexNotFoundException nfe) { + // 索引不存在时,debug环境时再进行打印 + LOGGER.debug( "method=delIndexByName||indexRealName={}||errMsg=exception!", indexRealName, nfe); } catch (Exception e) { - LOGGER.warn( "method=delIndexByName||indexRealName={}||msg=exception!", - indexRealName, e); - } finally { - if (esClient != null) { - this.returnESClientToPool(esClient); - } + LOGGER.error( "method=delIndexByName||indexRealName={}||errMsg=exception!", indexRealName, e); } return false; @@ -491,61 +433,55 @@ public class ESOpClient { /** * 执行查询 - * @param request - * @return */ @Nullable private ESQueryResponse doQuery(ESQueryRequest request) { - ESClient esClient = null; try { - esClient = getESClientFromPool(); + ESClient esClient = this.getESClientFromPool(); ESQueryResponse response = esClient.query(request).actionGet(120, TimeUnit.SECONDS); - if(!EnvUtil.isOnline()){ - LOGGER.info("method=doQuery||indexName={}||queryDsl={}||ret={}", - request.indices(), bytesReferenceConvertDsl(request.source()), JSON.toJSONString(response)); - } + LOGGER.debug( + "method=doQuery||indexName={}||queryDsl={}||ret={}", + request.indices(), bytesReferenceConvertDsl(request.source()), JSON.toJSONString(response) + ); return response; } catch (Exception e) { LOGGER.error( "method=doQuery||indexName={}||queryDsl={}||errMsg=query error. ", request.indices(), bytesReferenceConvertDsl(request.source()), e); return null; - }finally { - if (esClient != null) { - returnESClientToPool(esClient); - } } } private boolean handleErrorResponse(String indexName, List pos, ESBatchResponse response) { - if (response.getErrors().booleanValue()) { - int errorItemIndex = 0; - - if (CollectionUtils.isNotEmpty(response.getItems())) { - for (IndexResultItemNode item : response.getItems()) { - recordErrorResponseItem(indexName, pos, errorItemIndex++, item); - } - } - - return true; + if (response.getErrors()) { + return false; } - return false; + int errorItemIndex = 0; + + if (CollectionUtils.isNotEmpty(response.getItems())) { + for (IndexResultItemNode item : response.getItems()) { + recordErrorResponseItem(indexName, pos, errorItemIndex++, item); + } + } + + return true; } private void recordErrorResponseItem(String indexName, List pos, int errorItemIndex, IndexResultItemNode item) { - if (item.getIndex() != null && item.getIndex().getShards() != null + if (item.getIndex() != null + && item.getIndex().getShards() != null && CollectionUtils.isNotEmpty(item.getIndex().getShards().getFailures())) { LOGGER.warn( - "class=ESOpClient||method=batchInsert||indexName={}||errMsg=Failures: {}, content: {}", + "method=batchInsert||indexName={}||errMsg=Failures: {}, content: {}", indexName, item.getIndex().getShards().getFailures().toString(), JSON.toJSONString(pos.get(errorItemIndex))); } if (item.getIndex() != null && item.getIndex().getError() != null) { LOGGER.warn( - "class=ESOpClient||method=batchInsert||indexName={}||errMsg=Error: {}, content: {}", + "method=batchInsert||indexName={}||errMsg=Error: {}, content: {}", indexName, item.getIndex().getError().getReason(), JSON.toJSONString(pos.get(errorItemIndex))); } @@ -553,21 +489,18 @@ public class ESOpClient { /** * 转换dsl语句 - * - * @param bytes - * @return */ private String bytesReferenceConvertDsl(BytesReference bytes) { try { return XContentHelper.convertToJson(bytes, false); } catch (IOException e) { - LOGGER.warn("class=ESOpClient||method=bytesReferenceConvertDsl||errMsg=fail to covert", e); + LOGGER.warn("method=bytesReferenceConvertDsl||errMsg=fail to covert", e); } return ""; } - private ESClient buildEsClient(String address,String password,String clusterName, String version) { + private ESClient buildEsClient(String address, String password,String clusterName, String version) { if (StringUtils.isBlank(address)) { return null; } @@ -602,7 +535,7 @@ public class ESOpClient { // ignore } - LOGGER.error("class=ESESOpClient||method=buildEsClient||errMsg={}||address={}", e.getMessage(), address, e); + LOGGER.error("method=buildEsClient||address={}||errMsg=exception", address, e); return null; } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java index e70f2656..e997a778 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/TopicMetricESDAO.java @@ -207,17 +207,27 @@ public class TopicMetricESDAO extends BaseMetricESDAO { /** * 获取每个 metric 的 topN 个 topic 的指标,如果获取不到 topN 的topics, 则默认返回 defaultTopics 的指标 */ - public Table> listTopicMetricsByTopN(Long clusterPhyId, List defaultTopics, - List metrics, String aggType, int topN, - Long startTime, Long endTime){ + public Table> listTopicMetricsByTopN(Long clusterPhyId, + List defaultTopics, + List metrics, + String aggType, + int topN, + Long startTime, + Long endTime){ //1、获取topN要查询的topic,每一个指标的topN的topic可能不一样 - Map> metricTopics = getTopNTopics(clusterPhyId, metrics, aggType, topN, startTime, endTime); + Map> metricTopics = this.getTopNTopics(clusterPhyId, metrics, aggType, topN, startTime, endTime); Table> table = HashBasedTable.create(); - for(String metric : metrics){ - table.putAll(listTopicMetricsByTopics(clusterPhyId, Arrays.asList(metric), - aggType, metricTopics.getOrDefault(metric, defaultTopics), startTime, endTime)); + for(String metric : metrics) { + table.putAll(this.listTopicMetricsByTopics( + clusterPhyId, + Arrays.asList(metric), + aggType, + metricTopics.getOrDefault(metric, defaultTopics), + startTime, + endTime) + ); } return table; @@ -226,9 +236,12 @@ public class TopicMetricESDAO extends BaseMetricESDAO { /** * 获取每个 metric 指定个 topic 的指标 */ - public Table> listTopicMetricsByTopics(Long clusterPhyId, List metrics, - String aggType, List topics, - Long startTime, Long endTime){ + public Table> listTopicMetricsByTopics(Long clusterPhyId, + List metrics, + String aggType, + List topics, + Long startTime, + Long endTime){ //1、获取需要查下的索引 String realIndex = realIndex(startTime, endTime); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java index d5512843..1c0492c1 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterAddedListener.java @@ -22,7 +22,7 @@ public class TaskClusterAddedListener implements ApplicationListener