From 9e3c4dc06b914afa3498c0f00d3b3276449bc41e Mon Sep 17 00:00:00 2001 From: zengqiao Date: Tue, 30 Aug 2022 19:39:15 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3raft=E9=9B=86=E7=BE=A4control?= =?UTF-8?q?ler=E4=BF=A1=E6=81=AF=E4=B8=8D=E6=96=AD=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/common/constant/Constant.java | 1 + .../km/core/service/broker/BrokerService.java | 1 + .../broker/impl/BrokerMetricServiceImpl.java | 20 ++++-- .../broker/impl/BrokerServiceImpl.java | 16 +++++ .../impl/KafkaControllerServiceImpl.java | 70 ++++++++++++------- .../persistence/es/dao/BrokerMetricESDAO.java | 67 ++++++++++++++---- 6 files changed, 130 insertions(+), 45 deletions(-) diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index d7c1c960..fae5db21 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -63,4 +63,5 @@ public class Constant { public static final String COLLECT_METRICS_COST_TIME_METRICS_NAME = "CollectMetricsCostTimeUnitSec"; public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F; + public static final Integer DEFAULT_RETRY_TIME = 3; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java index d1f181b4..62f03e65 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java @@ -44,6 +44,7 @@ public interface BrokerService { * 获取具体Broker */ Broker getBroker(Long clusterPhyId, Integer brokerId); + Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId); /** * 获取BrokerLog-Dir信息 diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java index d47aa2ea..6c7dec0e 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerMetricServiceImpl.java @@ -178,11 +178,16 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker @Override public Result> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, MetricDTO dto) { - Map metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(clusterPhyId, brokerId, - dto.getMetricsNames(), dto.getAggType(), dto.getStartTime(), dto.getEndTime()); + Map metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint( + clusterPhyId, + brokerId, + dto.getMetricsNames(), + dto.getAggType(), + dto.getStartTime(), + dto.getEndTime() + ); - List metricPoints = new ArrayList<>(metricPointMap.values()); - return Result.buildSuc(metricPoints); + return Result.buildSuc(new ArrayList<>(metricPointMap.values())); } @Override @@ -199,8 +204,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker brokerMetrics.add(ConvertUtil.obj2Obj(brokerMetricPO, BrokerMetrics.class)); } catch (Exception e) { - LOGGER.error("method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception", - clusterPhyId, brokerId, e); + LOGGER.error( + "method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception", + clusterPhyId, brokerId, e + ); } } @@ -219,6 +226,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker } /**************************************************** private method ****************************************************/ + private List listTopNBrokerIds(Long clusterId, Integer topN){ List brokers = brokerService.listAliveBrokersFromDB(clusterId); if(CollectionUtils.isEmpty(brokers)){return new ArrayList<>();} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index 3ab9f3fa..e9f8c933 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -206,6 +206,22 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class); } + @Override + public Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId) { + List brokerList = this.listAliveBrokersFromCacheFirst(clusterPhyId); + if (brokerList == null) { + return null; + } + + for (Broker broker: brokerList) { + if (brokerId.equals(broker.getBrokerId())) { + return broker; + } + } + + return null; + } + @Override public Result> getBrokerLogDirDescFromKafka(Long clusterPhyId, Integer brokerId) { try { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java index 42311eef..1fb3f488 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java @@ -56,7 +56,7 @@ public class KafkaControllerServiceImpl implements KafkaControllerService { @Override public int insertAndIgnoreDuplicateException(KafkaController kafkaController) { try { - Broker broker = brokerService.getBroker(kafkaController.getClusterPhyId(), kafkaController.getBrokerId()); + Broker broker = brokerService.getBrokerFromCacheFirst(kafkaController.getClusterPhyId(), kafkaController.getBrokerId()); KafkaControllerPO kafkaControllerPO = new KafkaControllerPO(); kafkaControllerPO.setClusterPhyId(kafkaController.getClusterPhyId()); @@ -136,34 +136,56 @@ public class KafkaControllerServiceImpl implements KafkaControllerService { /**************************************************** private method ****************************************************/ private Result getControllerFromAdminClient(ClusterPhy clusterPhy) { + AdminClient adminClient = null; try { - AdminClient adminClient = null; - try { - adminClient = kafkaAdminClient.getClient(clusterPhy.getId()); - } catch (Exception e) { - log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); - - // 集群已经加载进来,但是创建admin-client失败,则设置无controller - return Result.buildSuc(); - } - - DescribeClusterResult describeClusterResult = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)); - - Node controllerNode = describeClusterResult.controller().get(); - if (controllerNode == null) { - return Result.buildSuc(); - } - - return Result.buildSuc(new KafkaController( - clusterPhy.getId(), - controllerNode.id(), - System.currentTimeMillis() - )); + adminClient = kafkaAdminClient.getClient(clusterPhy.getId()); } catch (Exception e) { log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e); - return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); + // 集群已经加载进来,但是创建admin-client失败,则设置无controller + return Result.buildSuc(); } + + // 先从DB获取该集群controller + KafkaController dbKafkaController = null; + + for (int i = 1; i <= Constant.DEFAULT_RETRY_TIME; ++i) { + try { + if (i == 1) { + // 获取DB中的controller信息 + dbKafkaController = this.getKafkaControllerFromDB(clusterPhy.getId()); + } + + DescribeClusterResult describeClusterResult = adminClient.describeCluster( + new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + Node controllerNode = describeClusterResult.controller().get(); + if (controllerNode == null) { + return Result.buildSuc(); + } + + if (dbKafkaController != null && controllerNode.id() == dbKafkaController.getBrokerId()) { + // ID没有变化,直接返回原先的 + return Result.buildSuc(dbKafkaController); + } + + // 发生了变化 + return Result.buildSuc(new KafkaController( + clusterPhy.getId(), + controllerNode.id(), + System.currentTimeMillis() + )); + } catch (Exception e) { + log.error( + "class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||tryTime={}||errMsg=exception", + clusterPhy.getId(), i, e + ); + } + } + + // 三次出错,则直接返回无controller + return Result.buildSuc(); } private Result getControllerFromZKClient(ClusterPhy clusterPhy) { diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java index 1af1e357..b80c1ca0 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/es/dao/BrokerMetricESDAO.java @@ -41,7 +41,11 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { DslsConstant.GET_BROKER_LATEST_METRICS, clusterId, brokerId, startTime, endTime); BrokerMetricPO brokerMetricPO = esOpClient.performRequestAndTakeFirst( - brokerId.toString(), realIndex(startTime, endTime), dsl, BrokerMetricPO.class); + brokerId.toString(), + realIndex(startTime, endTime), + dsl, + BrokerMetricPO.class + ); return (null == brokerMetricPO) ? new BrokerMetricPO(clusterId, brokerId) : brokerMetricPO; } @@ -49,8 +53,12 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { /** * 获取集群 clusterPhyId 中每个 metric 的指定 broker 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值 */ - public Map getBrokerMetricsPoint(Long clusterPhyId, Integer brokerId, List metrics, - String aggType, Long startTime, Long endTime){ + public Map getBrokerMetricsPoint(Long clusterPhyId, + Integer brokerId, + List metrics, + String aggType, + Long startTime, + Long endTime) { //1、获取需要查下的索引 String realIndex = realIndex(startTime, endTime); @@ -60,8 +68,13 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { String dsl = dslLoaderUtil.getFormatDslByFileName( DslsConstant.GET_BROKER_AGG_SINGLE_METRICS, clusterPhyId, brokerId, startTime, endTime, aggDsl); - return esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl, - s -> handleSingleESQueryResponse(s, metrics, aggType), 3); + return esOpClient.performRequestWithRouting( + String.valueOf(brokerId), + realIndex, + dsl, + s -> handleSingleESQueryResponse(s, metrics, aggType), + 3 + ); } /** @@ -75,10 +88,19 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { Map> metricBrokerIds = getTopNBrokerIds(clusterPhyId, metrics, aggType, topN, startTime, endTime); Table> table = HashBasedTable.create(); + //2、查询指标 for(String metric : metricBrokerIds.keySet()){ - table.putAll(listBrokerMetricsByBrokerIds(clusterPhyId, Arrays.asList(metric), - aggType, metricBrokerIds.getOrDefault(metric, brokerIds), startTime, endTime)); + table.putAll( + this.listBrokerMetricsByBrokerIds( + clusterPhyId, + Arrays.asList(metric), + aggType, + metricBrokerIds.getOrDefault(metric, brokerIds), + startTime, + endTime + ) + ); } return table; @@ -87,9 +109,12 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { /** * 获取集群 clusterPhyId 中每个 metric 的指定 brokers 在指定时间[startTime、endTime]区间内所有的指标 */ - public Table> listBrokerMetricsByBrokerIds(Long clusterPhyId, List metrics, - String aggType, List brokerIds, - Long startTime, Long endTime){ + public Table> listBrokerMetricsByBrokerIds(Long clusterPhyId, + List metrics, + String aggType, + List brokerIds, + Long startTime, + Long endTime){ //1、获取需要查下的索引 String realIndex = realIndex(startTime, endTime); @@ -105,22 +130,34 @@ public class BrokerMetricESDAO extends BaseMetricESDAO { for(Long brokerId : brokerIds){ try { String dsl = dslLoaderUtil.getFormatDslByFileName( - DslsConstant.GET_BROKER_AGG_LIST_METRICS, clusterPhyId, brokerId, startTime, endTime, interval, aggDsl); + DslsConstant.GET_BROKER_AGG_LIST_METRICS, + clusterPhyId, + brokerId, + startTime, + endTime, + interval, + aggDsl + ); queryFuture.runnableTask( String.format("class=BrokerMetricESDAO||method=listBrokerMetricsByBrokerIds||ClusterPhyId=%d", clusterPhyId), 5000, () -> { - Map> metricMap = esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl, - s -> handleListESQueryResponse(s, metrics, aggType), 3); + Map> metricMap = esOpClient.performRequestWithRouting( + String.valueOf(brokerId), + realIndex, + dsl, + s -> handleListESQueryResponse(s, metrics, aggType), + 3 + ); - synchronized (table){ + synchronized (table) { for(String metric : metricMap.keySet()){ table.put(metric, brokerId, metricMap.get(metric)); } } }); - }catch (Exception e){ + } catch (Exception e){ LOGGER.error("method=listBrokerMetricsByBrokerIds||clusterPhyId={}||brokerId{}||errMsg=exception!", clusterPhyId, brokerId, e); } }