解决raft集群controller信息不断记录问题

This commit is contained in:
zengqiao
2022-08-30 19:39:15 +08:00
parent 507abc1d84
commit 9e3c4dc06b
6 changed files with 130 additions and 45 deletions

View File

@@ -44,6 +44,7 @@ public interface BrokerService {
* 获取具体Broker
*/
Broker getBroker(Long clusterPhyId, Integer brokerId);
Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId);
/**
* 获取BrokerLog-Dir信息

View File

@@ -178,11 +178,16 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
@Override
public Result<List<MetricPointVO>> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, MetricDTO dto) {
Map<String/*metric*/, MetricPointVO> metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(clusterPhyId, brokerId,
dto.getMetricsNames(), dto.getAggType(), dto.getStartTime(), dto.getEndTime());
Map<String/*metric*/, MetricPointVO> metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(
clusterPhyId,
brokerId,
dto.getMetricsNames(),
dto.getAggType(),
dto.getStartTime(),
dto.getEndTime()
);
List<MetricPointVO> metricPoints = new ArrayList<>(metricPointMap.values());
return Result.buildSuc(metricPoints);
return Result.buildSuc(new ArrayList<>(metricPointMap.values()));
}
@Override
@@ -199,8 +204,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
brokerMetrics.add(ConvertUtil.obj2Obj(brokerMetricPO, BrokerMetrics.class));
} catch (Exception e) {
LOGGER.error("method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception",
clusterPhyId, brokerId, e);
LOGGER.error(
"method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception",
clusterPhyId, brokerId, e
);
}
}
@@ -219,6 +226,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
}
/**************************************************** private method ****************************************************/
private List<Long> listTopNBrokerIds(Long clusterId, Integer topN){
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
if(CollectionUtils.isEmpty(brokers)){return new ArrayList<>();}

View File

@@ -206,6 +206,22 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class);
}
@Override
public Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId) {
List<Broker> brokerList = this.listAliveBrokersFromCacheFirst(clusterPhyId);
if (brokerList == null) {
return null;
}
for (Broker broker: brokerList) {
if (brokerId.equals(broker.getBrokerId())) {
return broker;
}
}
return null;
}
@Override
public Result<Map<String, LogDirDescription>> getBrokerLogDirDescFromKafka(Long clusterPhyId, Integer brokerId) {
try {

View File

@@ -56,7 +56,7 @@ public class KafkaControllerServiceImpl implements KafkaControllerService {
@Override
public int insertAndIgnoreDuplicateException(KafkaController kafkaController) {
try {
Broker broker = brokerService.getBroker(kafkaController.getClusterPhyId(), kafkaController.getBrokerId());
Broker broker = brokerService.getBrokerFromCacheFirst(kafkaController.getClusterPhyId(), kafkaController.getBrokerId());
KafkaControllerPO kafkaControllerPO = new KafkaControllerPO();
kafkaControllerPO.setClusterPhyId(kafkaController.getClusterPhyId());
@@ -136,34 +136,56 @@ public class KafkaControllerServiceImpl implements KafkaControllerService {
/**************************************************** private method ****************************************************/
private Result<KafkaController> getControllerFromAdminClient(ClusterPhy clusterPhy) {
AdminClient adminClient = null;
try {
AdminClient adminClient = null;
try {
adminClient = kafkaAdminClient.getClient(clusterPhy.getId());
} catch (Exception e) {
log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
// 集群已经加载进来但是创建admin-client失败则设置无controller
return Result.buildSuc();
}
DescribeClusterResult describeClusterResult = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
Node controllerNode = describeClusterResult.controller().get();
if (controllerNode == null) {
return Result.buildSuc();
}
return Result.buildSuc(new KafkaController(
clusterPhy.getId(),
controllerNode.id(),
System.currentTimeMillis()
));
adminClient = kafkaAdminClient.getClient(clusterPhy.getId());
} catch (Exception e) {
log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
// 集群已经加载进来但是创建admin-client失败则设置无controller
return Result.buildSuc();
}
// 先从DB获取该集群controller
KafkaController dbKafkaController = null;
for (int i = 1; i <= Constant.DEFAULT_RETRY_TIME; ++i) {
try {
if (i == 1) {
// 获取DB中的controller信息
dbKafkaController = this.getKafkaControllerFromDB(clusterPhy.getId());
}
DescribeClusterResult describeClusterResult = adminClient.describeCluster(
new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);
Node controllerNode = describeClusterResult.controller().get();
if (controllerNode == null) {
return Result.buildSuc();
}
if (dbKafkaController != null && controllerNode.id() == dbKafkaController.getBrokerId()) {
// ID没有变化直接返回原先的
return Result.buildSuc(dbKafkaController);
}
// 发生了变化
return Result.buildSuc(new KafkaController(
clusterPhy.getId(),
controllerNode.id(),
System.currentTimeMillis()
));
} catch (Exception e) {
log.error(
"class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||tryTime={}||errMsg=exception",
clusterPhy.getId(), i, e
);
}
}
// 三次出错则直接返回无controller
return Result.buildSuc();
}
private Result<KafkaController> getControllerFromZKClient(ClusterPhy clusterPhy) {