From 87cd058fd8b6510ea5680fcb1bae4bbceb64dc27 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 10 Oct 2022 19:54:47 +0800 Subject: [PATCH] =?UTF-8?q?Broker=E5=A2=9E=E5=8A=A0=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E6=98=AF=E5=90=A6=E5=AD=98=E6=B4=BB=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/core/service/broker/BrokerService.java | 4 ++++ .../broker/impl/BrokerServiceImpl.java | 22 +++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java index 62f03e65..c8c300a0 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java @@ -67,4 +67,8 @@ public interface BrokerService { * 获取总的Broker数 */ Integer countAllBrokers(); + + boolean allServerDown(Long clusterPhyId); + + boolean existServerDown(Long clusterPhyId); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index 7fc4f4f2..3fd74ee5 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -262,14 +262,32 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok return version; } - - @Override public Integer countAllBrokers() { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); return brokerDAO.selectCount(lambdaQueryWrapper); } + @Override + public boolean allServerDown(Long clusterPhyId) { + List poList = this.getAllBrokerPOsFromDB(clusterPhyId); + if (ValidateUtils.isEmptyList(poList)) { + return false; + } + + return poList.stream().filter(elem -> elem.getStatus().equals(Constant.DOWN)).count() == poList.size(); + } + + @Override + public boolean existServerDown(Long clusterPhyId) { + List poList = this.getAllBrokerPOsFromDB(clusterPhyId); + if (ValidateUtils.isEmptyList(poList)) { + return false; + } + + return poList.stream().filter(elem -> elem.getStatus().equals(Constant.DOWN)).count() > 0; + } + /**************************************************** private method ****************************************************/ private List listAllBrokersAndUpdateCache(Long clusterPhyId) {