diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java index 62f03e65..c8c300a0 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/BrokerService.java @@ -67,4 +67,8 @@ public interface BrokerService { * 获取总的Broker数 */ Integer countAllBrokers(); + + boolean allServerDown(Long clusterPhyId); + + boolean existServerDown(Long clusterPhyId); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index 7fc4f4f2..3fd74ee5 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -262,14 +262,32 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok return version; } - - @Override public Integer countAllBrokers() { LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); return brokerDAO.selectCount(lambdaQueryWrapper); } + @Override + public boolean allServerDown(Long clusterPhyId) { + List poList = this.getAllBrokerPOsFromDB(clusterPhyId); + if (ValidateUtils.isEmptyList(poList)) { + return false; + } + + return poList.stream().filter(elem -> elem.getStatus().equals(Constant.DOWN)).count() == poList.size(); + } + + @Override + public boolean existServerDown(Long clusterPhyId) { + List poList = this.getAllBrokerPOsFromDB(clusterPhyId); + if (ValidateUtils.isEmptyList(poList)) { + return false; + } + + return poList.stream().filter(elem -> elem.getStatus().equals(Constant.DOWN)).count() > 0; + } + /**************************************************** private method ****************************************************/ private List listAllBrokersAndUpdateCache(Long clusterPhyId) {