mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Bugfix]修复正常情况下,集群状态统计错误的问题(#865)
This commit is contained in:
@@ -4,8 +4,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysHe
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysState;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.MultiClusterDashboardDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 多集群总体状态
|
||||
*/
|
||||
@@ -24,4 +28,6 @@ public interface MultiClusterPhyManager {
|
||||
* @return
|
||||
*/
|
||||
PaginationResult<ClusterPhyDashboardVO> getClusterPhysDashboard(MultiClusterDashboardDTO dto);
|
||||
|
||||
Result<List<ClusterPhyBaseVO>> getClusterPhysBasic();
|
||||
}
|
||||
|
||||
@@ -9,13 +9,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysHe
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhysState;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.MultiClusterDashboardDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
|
||||
import com.xiaojukeji.know.streaming.km.common.converter.ClusterVOConverter;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
|
||||
@@ -24,7 +23,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -42,37 +40,26 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
|
||||
@Autowired
|
||||
private ClusterMetricService clusterMetricService;
|
||||
|
||||
@Autowired
|
||||
private KafkaControllerService kafkaControllerService;
|
||||
|
||||
@Override
|
||||
public ClusterPhysState getClusterPhysState() {
|
||||
List<ClusterPhy> clusterPhyList = clusterPhyService.listAllClusters();
|
||||
ClusterPhysState physState = new ClusterPhysState(0, 0, 0, clusterPhyList.size());
|
||||
|
||||
Map<Long, KafkaController> controllerMap = kafkaControllerService.getKafkaControllersFromDB(
|
||||
clusterPhyList.stream().map(elem -> elem.getId()).collect(Collectors.toList()),
|
||||
false
|
||||
);
|
||||
|
||||
ClusterPhysState physState = new ClusterPhysState(0, 0, clusterPhyList.size());
|
||||
for (ClusterPhy clusterPhy: clusterPhyList) {
|
||||
KafkaController kafkaController = controllerMap.get(clusterPhy.getId());
|
||||
|
||||
if (kafkaController != null && !kafkaController.alive()) {
|
||||
// 存在明确的信息表示controller挂了
|
||||
physState.setDownCount(physState.getDownCount() + 1);
|
||||
} else if ((System.currentTimeMillis() - clusterPhy.getCreateTime().getTime() >= 5 * 60 * 1000) && kafkaController == null) {
|
||||
// 集群接入时间是在近5分钟内,同时kafkaController信息不存在,则设置为down
|
||||
for (ClusterPhy clusterPhy : clusterPhyList) {
|
||||
ClusterMetrics metrics = clusterMetricService.getLatestMetricsFromCache(clusterPhy.getId());
|
||||
Float state = metrics.getMetric(ClusterMetricVersionItems.CLUSTER_METRIC_HEALTH_STATE);
|
||||
if (state == null) {
|
||||
physState.setUnknownCount(physState.getUnknownCount() + 1);
|
||||
} else if (state.intValue() == HealthStateEnum.DEAD.getDimension()) {
|
||||
physState.setDownCount(physState.getDownCount() + 1);
|
||||
} else {
|
||||
// 其他情况都设置为alive
|
||||
physState.setLiveCount(physState.getLiveCount() + 1);
|
||||
}
|
||||
}
|
||||
|
||||
return physState;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ClusterPhysHealthState getClusterPhysHealthState() {
|
||||
List<ClusterPhy> clusterPhyList = clusterPhyService.listAllClusters();
|
||||
@@ -107,23 +94,6 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
|
||||
// 转为vo格式,方便后续进行分页筛选等
|
||||
List<ClusterPhyDashboardVO> voList = ConvertUtil.list2List(clusterPhyList, ClusterPhyDashboardVO.class);
|
||||
|
||||
// 获取集群controller信息并补充到vo中,
|
||||
Map<Long, KafkaController> controllerMap = kafkaControllerService.getKafkaControllersFromDB(clusterPhyList.stream().map(elem -> elem.getId()).collect(Collectors.toList()), false);
|
||||
for (ClusterPhyDashboardVO vo: voList) {
|
||||
KafkaController kafkaController = controllerMap.get(vo.getId());
|
||||
|
||||
if (kafkaController != null && !kafkaController.alive()) {
|
||||
// 存在明确的信息表示controller挂了
|
||||
vo.setAlive(Constant.DOWN);
|
||||
} else if ((System.currentTimeMillis() - vo.getCreateTime().getTime() >= 5 * 60L * 1000L) && kafkaController == null) {
|
||||
// 集群接入时间是在近5分钟内,同时kafkaController信息不存在,则设置为down
|
||||
vo.setAlive(Constant.DOWN);
|
||||
} else {
|
||||
// 其他情况都设置为alive
|
||||
vo.setAlive(Constant.ALIVE);
|
||||
}
|
||||
}
|
||||
|
||||
// 本地分页过滤
|
||||
voList = this.getAndPagingDataInLocal(voList, dto);
|
||||
|
||||
@@ -148,6 +118,15 @@ public class MultiClusterPhyManagerImpl implements MultiClusterPhyManager {
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<List<ClusterPhyBaseVO>> getClusterPhysBasic() {
|
||||
// 获取集群
|
||||
List<ClusterPhy> clusterPhyList = clusterPhyService.listAllClusters();
|
||||
|
||||
// 转为vo格式,方便后续进行分页筛选等
|
||||
return Result.buildSuc(ConvertUtil.list2List(clusterPhyList, ClusterPhyBaseVO.class));
|
||||
}
|
||||
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.config.PlatformClusterConfigService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.connect.ConnectClusterDAO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.connect.ConnectorDAO;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.health.HealthCheckResultDAO;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
@@ -39,6 +40,9 @@ public class HealthCheckResultServiceImpl implements HealthCheckResultService {
|
||||
@Autowired
|
||||
private ConnectClusterDAO connectClusterDAO;
|
||||
|
||||
@Autowired
|
||||
private ConnectorDAO connectorDAO;
|
||||
|
||||
@Autowired
|
||||
private PlatformClusterConfigService platformClusterConfigService;
|
||||
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.health.state.impl;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.healthcheck.BaseClusterHealthConfig;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthCheckAggResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.health.HealthScoreResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.*;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.connect.ConnectorMetrics;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.mm2.MirrorMakerMetrics;
|
||||
@@ -19,7 +21,9 @@ import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClus
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checker.AbstractHealthCheckService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -29,6 +33,7 @@ import java.util.stream.Collectors;
|
||||
import java.util.List;
|
||||
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthCheckDimensionEnum.*;
|
||||
import static com.xiaojukeji.know.streaming.km.common.enums.health.HealthStateEnum.DEAD;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.ConnectorMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.connect.MirrorMakerMetricVersionItems.*;
|
||||
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.BrokerMetricVersionItems.*;
|
||||
@@ -52,6 +57,9 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
@Autowired
|
||||
private ConnectClusterService connectClusterService;
|
||||
|
||||
@Autowired
|
||||
private KafkaControllerService kafkaControllerService;
|
||||
|
||||
@Override
|
||||
public ClusterMetrics calClusterHealthMetrics(Long clusterPhyId) {
|
||||
ClusterMetrics metrics = new ClusterMetrics(clusterPhyId);
|
||||
@@ -105,6 +113,10 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_CONNECTOR));
|
||||
state = Math.max(state, metrics.getMetric(CLUSTER_METRIC_HEALTH_STATE_MIRROR_MAKER));
|
||||
|
||||
if (isKafkaClusterDown(clusterPhyId)) {
|
||||
state = Float.valueOf(HealthStateEnum.DEAD.getDimension());
|
||||
}
|
||||
|
||||
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_PASSED, passed);
|
||||
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_CHECK_TOTAL, total);
|
||||
metrics.getMetrics().put(CLUSTER_METRIC_HEALTH_STATE, state);
|
||||
@@ -531,4 +543,16 @@ public class HealthStateServiceImpl implements HealthStateService {
|
||||
|
||||
return existNotPassed? HealthStateEnum.MEDIUM: HealthStateEnum.GOOD;
|
||||
}
|
||||
|
||||
private boolean isKafkaClusterDown(Long clusterPhyId) {
|
||||
ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
|
||||
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);
|
||||
if (kafkaController != null && !kafkaController.alive()) {
|
||||
return true;
|
||||
} else if ((System.currentTimeMillis() - clusterPhy.getCreateTime().getTime() >= 5 * 60 * 1000) && kafkaController == null) {
|
||||
// 集群接入时间是在近5分钟内,同时kafkaController信息不存在,则设置为down
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.xiaojukeji.know.streaming.km.biz.cluster.MultiClusterPhyManager;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.MultiClusterDashboardDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyBaseVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhysHealthStateVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhysStateVO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.ClusterPhyDashboardVO;
|
||||
@@ -37,10 +38,17 @@ public class MultiClusterPhyController {
|
||||
@ApiOperation(value = "多物理集群-大盘", notes = "")
|
||||
@PostMapping(value = "physical-clusters/dashboard")
|
||||
@ResponseBody
|
||||
public PaginationResult<ClusterPhyDashboardVO> getClusterPhyBasic(@RequestBody @Validated MultiClusterDashboardDTO dto) {
|
||||
public PaginationResult<ClusterPhyDashboardVO> getClusterPhyDashboard(@RequestBody @Validated MultiClusterDashboardDTO dto) {
|
||||
return multiClusterPhyManager.getClusterPhysDashboard(dto);
|
||||
}
|
||||
|
||||
@ApiOperation(value = "多物理集群-基本信息", notes = "")
|
||||
@GetMapping(value = "physical-clusters/basic")
|
||||
@ResponseBody
|
||||
public Result<List<ClusterPhyBaseVO>> getClusterPhyBasic() {
|
||||
return multiClusterPhyManager.getClusterPhysBasic();
|
||||
}
|
||||
|
||||
@ApiOperation(value = "多物理集群-状态", notes = "")
|
||||
@GetMapping(value = "physical-clusters/state")
|
||||
@ResponseBody
|
||||
|
||||
Reference in New Issue
Block a user