Merge pull request #529 from didi/dev_v3.0.0

解决raft集群controller信息不断记录问题
This commit is contained in:
EricZeng
2022-08-30 19:42:06 +08:00
committed by GitHub
6 changed files with 130 additions and 45 deletions

View File

@@ -63,4 +63,5 @@ public class Constant {
public static final String COLLECT_METRICS_COST_TIME_METRICS_NAME = "CollectMetricsCostTimeUnitSec";
public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F;
public static final Integer DEFAULT_RETRY_TIME = 3;
}

View File

@@ -44,6 +44,7 @@ public interface BrokerService {
* 获取具体Broker
*/
Broker getBroker(Long clusterPhyId, Integer brokerId);
Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId);
/**
* 获取BrokerLog-Dir信息

View File

@@ -178,11 +178,16 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
@Override
public Result<List<MetricPointVO>> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, MetricDTO dto) {
Map<String/*metric*/, MetricPointVO> metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(clusterPhyId, brokerId,
dto.getMetricsNames(), dto.getAggType(), dto.getStartTime(), dto.getEndTime());
Map<String/*metric*/, MetricPointVO> metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(
clusterPhyId,
brokerId,
dto.getMetricsNames(),
dto.getAggType(),
dto.getStartTime(),
dto.getEndTime()
);
List<MetricPointVO> metricPoints = new ArrayList<>(metricPointMap.values());
return Result.buildSuc(metricPoints);
return Result.buildSuc(new ArrayList<>(metricPointMap.values()));
}
@Override
@@ -199,8 +204,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
brokerMetrics.add(ConvertUtil.obj2Obj(brokerMetricPO, BrokerMetrics.class));
} catch (Exception e) {
LOGGER.error("method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception",
clusterPhyId, brokerId, e);
LOGGER.error(
"method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception",
clusterPhyId, brokerId, e
);
}
}
@@ -219,6 +226,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
}
/**************************************************** private method ****************************************************/
private List<Long> listTopNBrokerIds(Long clusterId, Integer topN){
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
if(CollectionUtils.isEmpty(brokers)){return new ArrayList<>();}

View File

@@ -206,6 +206,22 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class);
}
@Override
public Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId) {
List<Broker> brokerList = this.listAliveBrokersFromCacheFirst(clusterPhyId);
if (brokerList == null) {
return null;
}
for (Broker broker: brokerList) {
if (brokerId.equals(broker.getBrokerId())) {
return broker;
}
}
return null;
}
@Override
public Result<Map<String, LogDirDescription>> getBrokerLogDirDescFromKafka(Long clusterPhyId, Integer brokerId) {
try {

View File

@@ -56,7 +56,7 @@ public class KafkaControllerServiceImpl implements KafkaControllerService {
@Override
public int insertAndIgnoreDuplicateException(KafkaController kafkaController) {
try {
Broker broker = brokerService.getBroker(kafkaController.getClusterPhyId(), kafkaController.getBrokerId());
Broker broker = brokerService.getBrokerFromCacheFirst(kafkaController.getClusterPhyId(), kafkaController.getBrokerId());
KafkaControllerPO kafkaControllerPO = new KafkaControllerPO();
kafkaControllerPO.setClusterPhyId(kafkaController.getClusterPhyId());
@@ -136,34 +136,56 @@ public class KafkaControllerServiceImpl implements KafkaControllerService {
/**************************************************** private method ****************************************************/
private Result<KafkaController> getControllerFromAdminClient(ClusterPhy clusterPhy) {
AdminClient adminClient = null;
try {
AdminClient adminClient = null;
try {
adminClient = kafkaAdminClient.getClient(clusterPhy.getId());
} catch (Exception e) {
log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
// 集群已经加载进来但是创建admin-client失败则设置无controller
return Result.buildSuc();
}
DescribeClusterResult describeClusterResult = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
Node controllerNode = describeClusterResult.controller().get();
if (controllerNode == null) {
return Result.buildSuc();
}
return Result.buildSuc(new KafkaController(
clusterPhy.getId(),
controllerNode.id(),
System.currentTimeMillis()
));
adminClient = kafkaAdminClient.getClient(clusterPhy.getId());
} catch (Exception e) {
log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
// 集群已经加载进来但是创建admin-client失败则设置无controller
return Result.buildSuc();
}
// 先从DB获取该集群controller
KafkaController dbKafkaController = null;
for (int i = 1; i <= Constant.DEFAULT_RETRY_TIME; ++i) {
try {
if (i == 1) {
// 获取DB中的controller信息
dbKafkaController = this.getKafkaControllerFromDB(clusterPhy.getId());
}
DescribeClusterResult describeClusterResult = adminClient.describeCluster(
new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);
Node controllerNode = describeClusterResult.controller().get();
if (controllerNode == null) {
return Result.buildSuc();
}
if (dbKafkaController != null && controllerNode.id() == dbKafkaController.getBrokerId()) {
// ID没有变化直接返回原先的
return Result.buildSuc(dbKafkaController);
}
// 发生了变化
return Result.buildSuc(new KafkaController(
clusterPhy.getId(),
controllerNode.id(),
System.currentTimeMillis()
));
} catch (Exception e) {
log.error(
"class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||tryTime={}||errMsg=exception",
clusterPhy.getId(), i, e
);
}
}
// 三次出错则直接返回无controller
return Result.buildSuc();
}
private Result<KafkaController> getControllerFromZKClient(ClusterPhy clusterPhy) {

View File

@@ -41,7 +41,11 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
DslsConstant.GET_BROKER_LATEST_METRICS, clusterId, brokerId, startTime, endTime);
BrokerMetricPO brokerMetricPO = esOpClient.performRequestAndTakeFirst(
brokerId.toString(), realIndex(startTime, endTime), dsl, BrokerMetricPO.class);
brokerId.toString(),
realIndex(startTime, endTime),
dsl,
BrokerMetricPO.class
);
return (null == brokerMetricPO) ? new BrokerMetricPO(clusterId, brokerId) : brokerMetricPO;
}
@@ -49,8 +53,12 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
/**
* 获取集群 clusterPhyId 中每个 metric 的指定 broker 在指定时间[startTime、endTime]区间内聚合计算(avg、max)之后的统计值
*/
public Map<String/*metric*/, MetricPointVO> getBrokerMetricsPoint(Long clusterPhyId, Integer brokerId, List<String> metrics,
String aggType, Long startTime, Long endTime){
public Map<String/*metric*/, MetricPointVO> getBrokerMetricsPoint(Long clusterPhyId,
Integer brokerId,
List<String> metrics,
String aggType,
Long startTime,
Long endTime) {
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
@@ -60,8 +68,13 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_BROKER_AGG_SINGLE_METRICS, clusterPhyId, brokerId, startTime, endTime, aggDsl);
return esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl,
s -> handleSingleESQueryResponse(s, metrics, aggType), 3);
return esOpClient.performRequestWithRouting(
String.valueOf(brokerId),
realIndex,
dsl,
s -> handleSingleESQueryResponse(s, metrics, aggType),
3
);
}
/**
@@ -75,10 +88,19 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
Map<String, List<Long>> metricBrokerIds = getTopNBrokerIds(clusterPhyId, metrics, aggType, topN, startTime, endTime);
Table<String, Long, List<MetricPointVO>> table = HashBasedTable.create();
//2、查询指标
for(String metric : metricBrokerIds.keySet()){
table.putAll(listBrokerMetricsByBrokerIds(clusterPhyId, Arrays.asList(metric),
aggType, metricBrokerIds.getOrDefault(metric, brokerIds), startTime, endTime));
table.putAll(
this.listBrokerMetricsByBrokerIds(
clusterPhyId,
Arrays.asList(metric),
aggType,
metricBrokerIds.getOrDefault(metric, brokerIds),
startTime,
endTime
)
);
}
return table;
@@ -87,9 +109,12 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
/**
* 获取集群 clusterPhyId 中每个 metric 的指定 brokers 在指定时间[startTime、endTime]区间内所有的指标
*/
public Table<String/*metric*/, Long/*brokerId*/, List<MetricPointVO>> listBrokerMetricsByBrokerIds(Long clusterPhyId, List<String> metrics,
String aggType, List<Long> brokerIds,
Long startTime, Long endTime){
public Table<String/*metric*/, Long/*brokerId*/, List<MetricPointVO>> listBrokerMetricsByBrokerIds(Long clusterPhyId,
List<String> metrics,
String aggType,
List<Long> brokerIds,
Long startTime,
Long endTime){
//1、获取需要查下的索引
String realIndex = realIndex(startTime, endTime);
@@ -105,22 +130,34 @@ public class BrokerMetricESDAO extends BaseMetricESDAO {
for(Long brokerId : brokerIds){
try {
String dsl = dslLoaderUtil.getFormatDslByFileName(
DslsConstant.GET_BROKER_AGG_LIST_METRICS, clusterPhyId, brokerId, startTime, endTime, interval, aggDsl);
DslsConstant.GET_BROKER_AGG_LIST_METRICS,
clusterPhyId,
brokerId,
startTime,
endTime,
interval,
aggDsl
);
queryFuture.runnableTask(
String.format("class=BrokerMetricESDAO||method=listBrokerMetricsByBrokerIds||ClusterPhyId=%d", clusterPhyId),
5000,
() -> {
Map<String, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(String.valueOf(brokerId), realIndex, dsl,
s -> handleListESQueryResponse(s, metrics, aggType), 3);
Map<String, List<MetricPointVO>> metricMap = esOpClient.performRequestWithRouting(
String.valueOf(brokerId),
realIndex,
dsl,
s -> handleListESQueryResponse(s, metrics, aggType),
3
);
synchronized (table){
synchronized (table) {
for(String metric : metricMap.keySet()){
table.put(metric, brokerId, metricMap.get(metric));
}
}
});
}catch (Exception e){
} catch (Exception e){
LOGGER.error("method=listBrokerMetricsByBrokerIds||clusterPhyId={}||brokerId{}||errMsg=exception!", clusterPhyId, brokerId, e);
}
}