From 1e256ae1fdd8ce0a6ef95278cd65c84bbf95d1ce Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 28 Sep 2022 19:44:33 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E5=88=9B=E5=BB=BAES=E7=B4=A2=E5=BC=95=E6=A8=A1=E7=89=88?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/persistence/es/ESOpClient.java | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java index 1200699a..c70a4df6 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/ESOpClient.java @@ -392,10 +392,7 @@ public class ESOpClient { return false; } - /** - * 创建索引模板 - */ - public boolean createIndexTemplateIfNotExist(String indexTemplateName, String config) { + public boolean templateExist(String indexTemplateName){ ESClient esClient = null; try { @@ -410,6 +407,29 @@ public class ESOpClient { if (null != templateConfig) { return true; } + } catch (Exception e) { + LOGGER.warn( "method=templateExist||indexTemplateName={}||msg=exception!", + indexTemplateName, e); + } finally { + if (esClient != null) { + this.returnESClientToPool(esClient); + } + } + + return false; + } + + /** + * 创建索引模板 + */ + public boolean createIndexTemplateIfNotExist(String indexTemplateName, String config) { + ESClient esClient = null; + + try { + esClient = this.getESClientFromPool(); + + //存在模板就返回,不存在就创建 + if(templateExist(indexTemplateName)){return true;} // 创建新的模板 ESIndicesPutTemplateResponse response = esClient.admin().indices().preparePutTemplate( indexTemplateName ) @@ -417,8 +437,7 @@ public class ESOpClient { return response.getAcknowledged(); } catch (Exception e) { - LOGGER.warn( - "class=ESOpClient||method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!", + LOGGER.warn( "method=createIndexTemplateIfNotExist||indexTemplateName={}||config={}||msg=exception!", indexTemplateName, config, e ); } finally { From a5fa9de54b04a8edd904ec37fd7d1cdf3dbd6bfb Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 28 Sep 2022 19:52:11 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8DGroup=E6=8C=87=E6=A0=87?= =?UTF-8?q?=E9=98=B2=E9=87=8D=E5=A4=8D=E4=B8=8D=E7=94=9F=E6=95=88=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../group/impl/GroupMetricServiceImpl.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java index ea324888..427edc2c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupMetricServiceImpl.java @@ -90,23 +90,31 @@ public class GroupMetricServiceImpl extends BaseMetricService implements GroupMe @Override public Result> collectGroupMetricsFromKafka(Long clusterId, String groupName, List metrics) { - List allGroupMetrics = new ArrayList<>(); - Map topicPartitionGroupMap = new HashMap<>(); + List allGroupMetrics = new ArrayList<>(); + Map topicPartitionGroupMap = new HashMap<>(); GroupMetrics groupMetrics = new GroupMetrics(clusterId, groupName, true); - for(String metric : metrics){ - if(null != groupMetrics.getMetrics().get(metric)){continue;} + Set existMetricSet = new HashSet<>(); + for (String metric : metrics) { + if (existMetricSet.contains(metric)) { + continue; + } Result> ret = collectGroupMetricsFromKafka(clusterId, groupName, metric); - if(null != ret && ret.successful()){ + if (null != ret && ret.successful()) { List groupMetricsList = ret.getData(); - for(GroupMetrics gm : groupMetricsList){ - if(gm.isBGroupMetric()){ + + for (GroupMetrics gm : groupMetricsList) { + + //记录已存在的指标 + existMetricSet.addAll(gm.getMetrics().keySet()); + + if (gm.isBGroupMetric()) { groupMetrics.getMetrics().putAll(gm.getMetrics()); - }else { + } else { GroupMetrics topicGroupMetric = topicPartitionGroupMap.getOrDefault( gm.getTopic() + gm.getPartitionId(), - new GroupMetrics(clusterId, groupName, false)); + new GroupMetrics(clusterId, gm.getPartitionId(), gm.getTopic(), groupName, false)); topicGroupMetric.getMetrics().putAll(gm.getMetrics()); topicPartitionGroupMap.put(gm.getTopic() + gm.getPartitionId(), topicGroupMetric); From 95c9582d8b8569fd340915fca75851bd6fb7ac7d Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 28 Sep 2022 20:03:23 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E7=BB=84=E8=AF=A6=E6=83=85=E6=8C=87=E6=A0=87=E4=B8=BA=E5=AE=9E?= =?UTF-8?q?=E6=97=B6=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../streaming/km/biz/group/impl/GroupManagerImpl.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index 1095d5ee..5ccc3e98 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -272,15 +272,11 @@ public class GroupManagerImpl implements GroupManager { // 获取Group指标信息 - Result> groupMetricsResult = groupMetricService.listPartitionLatestMetricsFromES( - clusterPhyId, - groupName, - topicName, - latestMetricNames == null? Arrays.asList(): latestMetricNames - ); + Result> groupMetricsResult = groupMetricService.collectGroupMetricsFromKafka(clusterPhyId, groupName, latestMetricNames == null ? Arrays.asList() : latestMetricNames); + // 转换Group指标 - List esGroupMetricsList = groupMetricsResult.hasData()? groupMetricsResult.getData(): new ArrayList<>(); + List esGroupMetricsList = groupMetricsResult.hasData() ? groupMetricsResult.getData().stream().filter(elem -> topicName.equals(elem.getTopic())).collect(Collectors.toList()) : new ArrayList<>(); Map esMetricsMap = new HashMap<>(); for (GroupMetrics groupMetrics: esGroupMetricsList) { esMetricsMap.put(groupMetrics.getPartitionId(), groupMetrics);