合并3.3.0企业版改动

This commit is contained in:
zengqiao
2023-02-24 17:49:26 +08:00
parent cca7246281
commit a82d7f594e
137 changed files with 591 additions and 1082 deletions

View File

@@ -1,355 +0,0 @@
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.job;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.BrokerSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.job.ClusterBalanceReassignDetail;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.job.detail.ClusterBalanceDetailDataGroupByTopic;
import com.xiaojukeji.know.streaming.km.common.bean.entity.job.Job;
import com.xiaojukeji.know.streaming.km.common.bean.entity.job.JobStatus;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.job.content.JobClusterBalanceContent;
import com.xiaojukeji.know.streaming.km.common.bean.entity.job.detail.JobDetail;
import com.xiaojukeji.know.streaming.km.common.bean.entity.job.detail.JobModifyDetail;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobPO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceReassignPO;
import com.xiaojukeji.know.streaming.km.common.bean.po.job.JobPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.job.sub.SubJobPartitionDetailVO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.constant.JobConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.converter.ClusterBalanceConverter;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.converter.ClusterBalanceReassignConverter;
import com.xiaojukeji.know.streaming.km.common.enums.job.JobActionEnum;
import com.xiaojukeji.know.streaming.km.common.enums.job.JobTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerSpecService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceJobService;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceReassignService;
import com.xiaojukeji.know.streaming.km.core.service.config.ConfigUtils;
import com.xiaojukeji.know.streaming.km.core.service.job.JobHandler;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.persistence.mysql.job.JobDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
import com.xiaojukeji.know.streaming.km.rebalance.executor.ExecutionRebalance;
import com.xiaojukeji.know.streaming.km.rebalance.executor.common.BalanceParameter;
import com.xiaojukeji.know.streaming.km.rebalance.executor.common.OptimizerResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@EnterpriseLoadReBalance
@Component(JobConstant.CLUSTER_BALANCE)
public class ClusterBalanceJobHandler implements JobHandler {
private static final ILog logger = LogFactory.getLog(ClusterBalanceJobHandler.class);
@Value("${es.client.address:}")
private String esAddress;
@Autowired
private ClusterBalanceJobService clusterBalanceJobService;
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private ClusterBalanceReassignService clusterBalanceReassignService;
@Autowired
private KafkaZKDAO kafkaZKDAO;
@Autowired
private JobDAO jobDAO;
@Autowired
private BrokerService brokerService;
@Autowired
private BrokerSpecService brokerSpecService;
@Autowired
private TopicService topicService;
@Autowired
private ConfigUtils configUtils;
@Override
public JobTypeEnum type() {
return JobTypeEnum.CLUSTER_BALANCE;
}
@Override
@Transactional
public Result<Void> submit(Job job, String operator) {
// 获取任务详情信息
JobClusterBalanceContent dto = ConvertUtil.str2ObjByJson(job.getJobData(), JobClusterBalanceContent.class);
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(dto.getClusterId());
if (clusterPhy == null){
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
//获取broke规格信息
Map<Integer, BrokerSpec> brokerSpecMap = brokerSpecService.getBrokerSpecMap(clusterPhy.getId());
//获取集群所有broker信息
List<Broker> brokers = brokerService.listAllBrokersFromDB(clusterPhy.getId());
for(Broker broker:brokers){
if (brokerSpecMap.get(broker.getBrokerId()) == null){
return Result.buildFromRSAndMsg(ResultStatus.BROKER_SPEC_NOT_EXIST,String.format("Broker规格信息不存在:brokerId:%s", broker.getBrokerId()));
}
}
//获取任务计划
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(dto.getClusterId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, topicNames);
try {
ExecutionRebalance executionRebalance = new ExecutionRebalance();
OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter);
Result cRs = checkOptimizerResult(optimizerResult, job.getId());
if (cRs.failed()){
return cRs;
}
Map<String, Topic> topicMap = topicService.listTopicsFromDB(clusterPhy.getId()).stream().collect(Collectors.toMap(Topic::getTopicName, Function.identity()));
List<ClusterBalanceReassignPO> reassignPOS = ClusterBalanceConverter.convert2ListClusterBalanceReassignPO(
optimizerResult.resultTask(), topicMap, job.getId(), clusterPhy.getId());
String generateReassignmentJson = optimizerResult.resultJsonTask();
if (dto.getParallelNum() > 0){
//根据执行策略生成迁移json
Result<String> jResult = clusterBalanceJobService.generateReassignmentJson(job.getClusterId(),dto.getParallelNum(), dto.getExecutionStrategy(), Constant.NUM_ONE, reassignPOS);
if (jResult.failed()){
return Result.buildFromIgnoreData(jResult);
}
generateReassignmentJson = jResult.getData();
}
//生成平衡job
ClusterBalanceJobPO clusterBalanceJobPO = ClusterBalanceConverter.convert2ClusterBalanceJobPO(job.getId(), dto, optimizerResult, brokers, operator, generateReassignmentJson);
Result<Void> result = clusterBalanceJobService.createClusterBalanceJob(clusterBalanceJobPO, operator);
if (result.failed()){
logger.error("method=clusterBalanceJobHandler.submit||job={}||errMsg={}!",
job, result.getMessage());
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return result;
}
//生成迁移明细
Result<Void> cbrResult = clusterBalanceReassignService.addBatchBalanceReassign(reassignPOS);
if (cbrResult.failed()){
logger.error("method=clusterBalanceJobHandler.submit||job={}||errMsg={}!",
job, cbrResult.getMessage());
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return cbrResult;
}
//更新job执行对象
job.setTarget(optimizerResult.resultOverview().getMoveTopics());
int count = jobDAO.updateById(ConvertUtil.obj2Obj(job, JobPO.class));
if (count < 0){
logger.error("method=clusterBalanceJobHandler.submit||job={}||errMsg={}!",
job, result.getMessage());
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
}catch (Exception e){
logger.error("method=clusterBalanceJobHandler.submit||job={}||errMsg=exception", job, e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return Result.buildFailure(e.getMessage());
}
return Result.buildSuc();
}
@Override
@Transactional
public Result<Void> delete(Job job, String operator) {
//删除balanceJob
Result<Void> balanceJobResult = clusterBalanceJobService.deleteByJobId(job.getId(), operator);
if (balanceJobResult.failed()){
logger.error("method=clusterBalanceJobHandler.delete||job={}||operator:{}||errMsg={}", job, operator, balanceJobResult);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return balanceJobResult;
}
return Result.buildSuc();
}
@Override
public Result<Void> modify(Job job, String operator) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(job.getClusterId());
if (clusterPhy == null){
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
Result<ClusterBalanceJobPO> balanceJobPOResult = clusterBalanceJobService.getClusterBalanceJobById(job.getId());
if (!balanceJobPOResult.hasData()){
return Result.buildFrom(ResultStatus.NOT_EXIST);
}
List<Broker> brokers = brokerService.listAllBrokersFromDB(clusterPhy.getId());
Map<Integer, BrokerSpec> brokerSpecMap = brokerSpecService.getBrokerSpecMap(clusterPhy.getId());
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(job.getClusterId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
JobClusterBalanceContent dto = ConvertUtil.str2ObjByJson(job.getJobData(), JobClusterBalanceContent.class);
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(dto, brokers, brokerSpecMap, clusterPhy, esAddress, topicNames);
ExecutionRebalance executionRebalance = new ExecutionRebalance();
try {
OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter);
Result cRs = checkOptimizerResult(optimizerResult, job.getId());
if (cRs.failed()){
return cRs;
}
Map<String, Topic> topicMap = kafkaZKDAO.getAllTopicMetadata(clusterPhy.getId(), false).stream().collect(Collectors.toMap(Topic::getTopicName, Function.identity()));
List<ClusterBalanceReassignPO> reassignPOS = ClusterBalanceConverter.convert2ListClusterBalanceReassignPO(optimizerResult.resultTask(),
topicMap, job.getId(), clusterPhy.getId());
String generateReassignmentJson = optimizerResult.resultJsonTask();
if (dto.getParallelNum() > 0){
//根据执行策略生成迁移json
Result<String> jResult = clusterBalanceJobService.generateReassignmentJson(job.getClusterId(),dto.getParallelNum(), dto.getExecutionStrategy(), Constant.NUM_ONE, reassignPOS);
if (jResult.failed()){
return Result.buildFromIgnoreData(jResult);
}
generateReassignmentJson = jResult.getData();
}
//生成平衡job
ClusterBalanceJobPO clusterBalanceJobPO = ClusterBalanceConverter.convert2ClusterBalanceJobPO(job.getId(), dto ,optimizerResult, brokers, operator, generateReassignmentJson);
Result<Void> result = clusterBalanceJobService.modifyClusterBalanceJob(clusterBalanceJobPO, operator);
if (result.failed()){
return result;
}
//删除原迁移详情,生成新的迁移详情
Result<Void> delReassignResult = clusterBalanceReassignService.delete(job.getId(), operator);
if (delReassignResult.failed()){
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return delReassignResult;
}
Result<Void> cbrResult = clusterBalanceReassignService.addBatchBalanceReassign(reassignPOS);
if (cbrResult.failed()){
logger.error("method=clusterBalanceJobHandler.submit||job={}||errMsg={}!",
job, cbrResult.getMessage());
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return cbrResult;
}
}catch (Exception e){
logger.error("method=clusterBalanceJobHandler.modify||job={}||operator:{}||errMsg=exception", job, operator, e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}
return Result.buildSuc();
}
@Override
public Result<Void> updateLimit(Job job, Long limit, String operator) {
return clusterBalanceReassignService.modifyThrottle(job.getId(), limit, operator);
}
@Override
public Result<Void> process(Job job, JobActionEnum action, String operator) {
if (JobActionEnum.START.equals(action)) {
return clusterBalanceReassignService.execute(job.getId());
}
if (JobActionEnum.CANCEL.equals(action)) {
return clusterBalanceReassignService.cancel(job.getId());
}
// 迁移中,不支持该操作
return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FORBIDDEN, String.format("不支持[%s]操作", action.getValue()));
}
@Override
public Result<JobStatus> status(Job job) {
// Topic下每个分区的状态
Map<String, List<ClusterBalanceReassignPO>> topicJobsMap = new HashMap<>();
// 获取子任务并按照Topic进行聚合
List<ClusterBalanceReassignPO> allSubJobPOList = clusterBalanceReassignService.getBalanceReassignsByJobId(job.getId());
allSubJobPOList.forEach(elem -> {
topicJobsMap.putIfAbsent(elem.getTopicName(), new ArrayList<>());
topicJobsMap.get(elem.getTopicName()).add(elem);
});
// 获取每个Topic的状态
List<Integer> topicStatusList = new ArrayList<>();
for (List<ClusterBalanceReassignPO> topicJobPOList: topicJobsMap.values()) {
topicStatusList.add(new JobStatus(
topicJobPOList.stream().map(elem -> elem.getStatus()).collect(Collectors.toList())
).getStatus());
}
// 聚合Topic的结果
return Result.buildSuc(new JobStatus(topicStatusList));
}
@Override
public Result<JobDetail> getTaskDetail(Job job) {
Result<ClusterBalanceReassignDetail> detailResult = clusterBalanceReassignService.getJobDetailsGroupByTopic(job.getId());
if (detailResult.failed()) {
return Result.buildFromIgnoreData(detailResult);
}
return Result.buildSuc(ClusterBalanceReassignConverter.convert2JobDetail(job, detailResult.getData()));
}
@Override
public Result<JobModifyDetail> getTaskModifyDetail(Job job) {
// 获取任务详情信息
JobClusterBalanceContent dto = ConvertUtil.str2ObjByJson(job.getJobData(), JobClusterBalanceContent.class);
if (dto == null) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "jobData格式错误");
}
JobModifyDetail detail = ConvertUtil.obj2Obj(job, JobModifyDetail.class);
detail.setJobData(ConvertUtil.obj2Json(dto));
return Result.buildSuc(detail);
}
@Override
public Result<List<SubJobPartitionDetailVO>> getSubJobPartitionDetail(Job job, String topic) {
Result<ClusterBalanceReassignDetail> detailResult = clusterBalanceReassignService.getJobDetailsGroupByTopic(job.getId());
if (detailResult.failed()) {
return Result.buildFromIgnoreData(detailResult);
}
List<ClusterBalanceDetailDataGroupByTopic> detailDataGroupByTopicList = detailResult.getData().getReassignTopicDetailsList()
.stream()
.filter(elem -> elem.getTopicName().equals(topic))
.collect(Collectors.toList());
if (detailDataGroupByTopicList.isEmpty()) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(job.getClusterId(), topic));
}
return Result.buildSuc(ClusterBalanceReassignConverter.convert2SubJobPartitionDetailVOList(detailDataGroupByTopicList.get(0)));
}
private Result<Void> checkOptimizerResult(OptimizerResult optimizerResult, Long jobId){
if (optimizerResult == null){
return Result.buildFrom(ResultStatus.KAFKA_OPERATE_FAILED);
}
if (optimizerResult.resultOverview().getMoveReplicas() == 0){
logger.info("method=checkOptimizerResult||jobId:{}||msg=the cluster has reached equilibrium", jobId);
return Result.buildFailure("该集群已达到均衡要求,不需要再执行均衡任务。");
}
return Result.buildSuc();
}
}

View File

@@ -1,9 +0,0 @@
/**
* Load-reBalance相关功能模块
* km-extends/km-rebalance 模块,是依据指标生成迁移 plan 的模块,是底层的一个基础功能
* 当前 package 模块是依据产品的要求,依赖 km-extends/km-rebalance 模块,构建产品实际使用功能
*/
@EnterpriseLoadReBalance
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;

View File

@@ -1,23 +0,0 @@
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobConfigPO;
@EnterpriseLoadReBalance
public interface ClusterBalanceJobConfigService {
/**
* 新增平衡配置
* @param clusterBalanceJobConfigPO
* @return
*/
Result<Void> replaceClusterBalanceJobConfigByClusterId(ClusterBalanceJobConfigPO clusterBalanceJobConfigPO);
/**
*
* @param clusterId
* @return
*/
Result<ClusterBalanceJobConfigPO> getByClusterId(Long clusterId);
}

View File

@@ -1,93 +0,0 @@
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobPO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceReassignPO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.vo.ClusterBalanceHistoryVO;
import java.util.List;
import java.util.Map;
@EnterpriseLoadReBalance
public interface ClusterBalanceJobService {
/**
*
* @param jobId jobId
* @return
*/
Result<Void> deleteByJobId(Long jobId, String operator);
/**
*
* @param clusterBalanceJobPO
* @return
*/
Result<Void> createClusterBalanceJob(ClusterBalanceJobPO clusterBalanceJobPO, String operator);
/**
*
* @param clusterBalanceJobPO
* @return
*/
Result<Void> modifyClusterBalanceJob(ClusterBalanceJobPO clusterBalanceJobPO, String operator);
/**
*
* @param id id
* @return
*/
Result<ClusterBalanceJobPO> getClusterBalanceJobById(Long id);
/**
*
* @param clusterPhyId
* @return
*/
ClusterBalanceJobPO getLastOneByClusterId(Long clusterPhyId);
/**
*
* @param clusterPhyId
* @return
*/
Map<String, Double> getBalanceInterval(Long clusterPhyId);
/**
*
* @param clusterPhyId
* @return
*/
PaginationResult<ClusterBalanceHistoryVO> page(Long clusterPhyId, PaginationBaseDTO dto);
/**
* 依据任务状态或者其中一个任务ID
*/
Long getOneRunningJob(Long clusterPhyId);
/**
* 检查平衡任务
*/
Result<Void> verifyClusterBalanceAndUpdateStatue(Long jobId);
/**
* 根据jobId生成迁移json
* @param parallelNum 并行数
* @param clusterId 集群id
* @param executionStrategy 执行策略
* @param reassignPOList 迁移任务详情
* @return
*/
Result<String> generateReassignmentJson(Long clusterId, Integer parallelNum, Integer jsonVersion, Integer executionStrategy, List<ClusterBalanceReassignPO> reassignPOList);
/**
* 根据迁移策略更新迁移任务
* @param jobId jobId
* @param clusterPhyId 集群id
* @return
*/
Result<Void> generateReassignmentForStrategy(Long clusterPhyId, Long jobId);
}

View File

@@ -1,72 +0,0 @@
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.job.ClusterBalanceReassignDetail;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobPO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceReassignPO;
import java.util.List;
@EnterpriseLoadReBalance
public interface ClusterBalanceReassignService {
/**
*新增迁移任务
* @param clusterBalanceReassignPO
* @return
*/
Result<Void> addBalanceReassign(ClusterBalanceReassignPO clusterBalanceReassignPO);
/**
*批量新增迁移任务
* @param reassignPOList
* @return
*/
Result<Void> addBatchBalanceReassign(List<ClusterBalanceReassignPO> reassignPOList);
/**
* 删除迁移任务
*/
Result<Void> delete(Long jobId, String operator);
/**
* 执行迁移任务
*/
Result<Void> execute(Long jobId);
/**
* 取消迁移任务
*/
Result<Void> cancel(Long jobId);
/**
* 检查迁移任务
*/
Result<Boolean> verifyAndUpdateStatue(ClusterBalanceJobPO clusterBalanceJobPO);
/**
* 修改限流值
*/
Result<Void> modifyThrottle(Long jobId, Long throttleUnitB, String operator);
/**
* 更新子任务中扩展字段的数据
*/
Result<Void> getAndUpdateSubJobExtendData(Long jobId);
/**
* 获取迁移任务信息
*/
List<ClusterBalanceReassignPO> getBalanceReassignsByJobId(Long jobId);
/**
* 获取按照Topic维度聚合的详情
*/
Result<ClusterBalanceReassignDetail> getJobDetailsGroupByTopic(Long jobId);
/**
* leader重新选举
*/
Result<Void> preferredReplicaElection(Long jobId);
}

View File

@@ -1,82 +0,0 @@
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.dto.ClusterBalanceOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.dto.ClusterBalancePreviewDTO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.dto.ClusterBalanceStrategyDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.ClusterBalanceItemState;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.vo.*;
@EnterpriseLoadReBalance
public interface ClusterBalanceService {
/**
* @param clusterPhyId
* @return
*/
Result<ClusterBalanceStateVO> state(Long clusterPhyId);
/**
* @param clusterPhyId
* @return
*/
Result<ClusterBalanceJobConfigVO> config(Long clusterPhyId);
/**
* @param clusterPhyId
* @param dto
* @return
*/
PaginationResult<ClusterBalanceOverviewVO> overview(Long clusterPhyId, ClusterBalanceOverviewDTO dto);
/**
* @param clusterPhyId
* @return
*/
Result<ClusterBalanceItemState> getItemState(Long clusterPhyId);
/**
* @param clusterPhyId
* @param dto
* @return
*/
PaginationResult<ClusterBalanceHistoryVO> history(Long clusterPhyId, PaginationBaseDTO dto);
/**
* @param clusterPhyId
* @param jobId
* @return
*/
Result<ClusterBalancePlanVO> plan(Long clusterPhyId, Long jobId);
/**
* @param clusterBalancePreviewDTO
* @return
*/
Result<ClusterBalancePlanVO> preview(Long clusterPhyId, ClusterBalancePreviewDTO clusterBalancePreviewDTO);
/**
* @param jobId
* @return
*/
Result<ClusterBalancePlanVO> schedule(Long clusterPhyId, Long jobId);
/**
* @param clusterPhyId
* @param dto
* @return
*/
Result<Void> strategy(Long clusterPhyId, ClusterBalanceStrategyDTO dto, String operator);
/**
* @param clusterPhyId
* @return
*/
Result<Void> createScheduleJob(Long clusterPhyId, long triggerTimeUnitMs);
}

View File

@@ -1,79 +0,0 @@
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.BrokerSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobConfigPO;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerSpecService;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceJobConfigService;
import com.xiaojukeji.know.streaming.km.persistence.mysql.enterprise.rebalance.ClusterBalanceJobConfigDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@Service
@EnterpriseLoadReBalance
public class ClusterBalanceJobConfigServiceImpl implements ClusterBalanceJobConfigService {
private static final ILog logger = LogFactory.getLog(ClusterBalanceJobConfigServiceImpl.class);
@Autowired
private ClusterBalanceJobConfigDao clusterBalanceJobConfigDao;
@Autowired
private BrokerSpecService brokerSpecService;
@Autowired
private BrokerService brokerService;
@Override
public Result<Void> replaceClusterBalanceJobConfigByClusterId(ClusterBalanceJobConfigPO clusterBalanceJobConfigPO) {
List<Broker> brokers = brokerService.listAllBrokersFromDB(clusterBalanceJobConfigPO.getClusterId());
Map<Integer, BrokerSpec> brokerSpecMap = brokerSpecService.getBrokerSpecMap(clusterBalanceJobConfigPO.getClusterId());
for(Broker broker: brokers){
if (brokerSpecMap.get(broker.getBrokerId())==null){
return Result.buildFrom(ResultStatus.CLUSTER_SPEC_INCOMPLETE);
}
}
try {
LambdaQueryWrapper<ClusterBalanceJobConfigPO> queryWrapper = new LambdaQueryWrapper();
queryWrapper.eq(ClusterBalanceJobConfigPO::getClusterId, clusterBalanceJobConfigPO.getClusterId());
ClusterBalanceJobConfigPO oldConfig = clusterBalanceJobConfigDao.selectOne(queryWrapper);
int count;
if (oldConfig == null){
count = clusterBalanceJobConfigDao.insert(clusterBalanceJobConfigPO);
}else{
clusterBalanceJobConfigPO.setId(oldConfig.getId());
count = clusterBalanceJobConfigDao.updateById(clusterBalanceJobConfigPO);
}
if (count < 1){
logger.error("replace cluster balance job config detail failed! clusterBalanceJobConfigPO:{}", clusterBalanceJobConfigPO);
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
}catch (Exception e){
logger.error("replace cluster balance job config failed! clusterBalanceJobConfigPO:{}", clusterBalanceJobConfigPO, e);
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
return Result.buildSuc();
}
@Override
public Result<ClusterBalanceJobConfigPO> getByClusterId(Long clusterId) {
ClusterBalanceJobConfigPO queryParam = new ClusterBalanceJobConfigPO();
queryParam.setClusterId(clusterId);
return Result.buildSuc(clusterBalanceJobConfigDao.selectOne(new QueryWrapper<>(queryParam)));
}
}

View File

@@ -1,458 +0,0 @@
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.google.common.collect.Lists;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.BrokerSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.reassign.ExecuteReassignParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.reassign.strategy.ReassignExecutionStrategy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.reassign.strategy.ReassignTask;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.ClusterBalanceInterval;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.job.detail.ClusterBalancePlanDetail;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobConfigPO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobPO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceReassignPO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.vo.ClusterBalanceHistorySubVO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.vo.ClusterBalanceHistoryVO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.converter.ClusterBalanceConverter;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.converter.ClusterBalanceReassignConverter;
import com.xiaojukeji.know.streaming.km.common.enums.job.JobStatusEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceJobConfigService;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceJobService;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceReassignService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerSpecService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.config.ConfigUtils;
import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService;
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignService;
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignStrategyService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.persistence.mysql.enterprise.rebalance.ClusterBalanceJobDao;
import com.xiaojukeji.know.streaming.km.persistence.mysql.enterprise.rebalance.ClusterBalanceReassignDao;
import com.xiaojukeji.know.streaming.km.rebalance.executor.ExecutionRebalance;
import com.xiaojukeji.know.streaming.km.rebalance.executor.common.BrokerBalanceState;
import com.xiaojukeji.know.streaming.km.rebalance.model.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
@EnterpriseLoadReBalance
public class ClusterBalanceJobServiceImpl implements ClusterBalanceJobService {
private static final ILog logger = LogFactory.getLog(ClusterBalanceJobServiceImpl.class);
@Value("${es.client.address}")
private String esAddress;
@Autowired
private ClusterBalanceJobDao clusterBalanceJobDao;
@Autowired
private ClusterBalanceReassignDao clusterBalanceReassignDao;
@Autowired
private ClusterBalanceReassignService clusterBalanceReassignService;
@Autowired
private ClusterBalanceJobConfigService clusterBalanceJobConfigService;
@Autowired
private BrokerSpecService brokerSpecService;
@Autowired
private BrokerService brokerService;
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private TopicService topicService;
@Autowired
private ConfigUtils configUtils;
@Autowired
private ReassignService reassignService;
@Autowired
private ReassignStrategyService reassignStrategyService;
@Autowired
private OpPartitionService opPartitionService;
@Override
public Result<Void> deleteByJobId(Long jobId, String operator) {
if (jobId == null) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "jobId不允许为空");
}
try {
ClusterBalanceJobPO jobPO = clusterBalanceJobDao.selectById(jobId);
if (jobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, String.format("jobId:[%d] not exist", jobId));
}
if (JobStatusEnum.canNotDeleteJob(jobPO.getStatus())) {
// 状态错误,禁止执行
return this.buildActionForbidden(jobId, jobPO.getStatus());
}
clusterBalanceJobDao.deleteById(jobId);
LambdaQueryWrapper<ClusterBalanceReassignPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(ClusterBalanceReassignPO::getJobId, jobId);
clusterBalanceReassignDao.delete(lambdaQueryWrapper);
return Result.buildSuc();
} catch (Exception e) {
logger.error("method=delete||jobId={}||errMsg=exception", jobId, e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage());
}
}
@Override
public Result<Void> createClusterBalanceJob(ClusterBalanceJobPO clusterBalanceJobPO, String operator) {
if (ValidateUtils.isNull(clusterBalanceJobPO)){
return Result.buildFrom(ResultStatus.NOT_EXIST);
}
try {
clusterBalanceJobDao.addClusterBalanceJob(clusterBalanceJobPO);
}catch (Exception e){
logger.error("method=createClusterBalanceJob||clusterBalanceJobPO:{}||errMsg=exception", clusterBalanceJobPO, e);
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
return Result.buildSuc();
}
@Override
public Result<Void> modifyClusterBalanceJob(ClusterBalanceJobPO clusterBalanceJobPO, String operator) {
ClusterBalanceJobPO oldJobPo = clusterBalanceJobDao.selectById(clusterBalanceJobPO.getId());
if (oldJobPo == null){
return Result.buildFrom(ResultStatus.NOT_EXIST);
}
try {
int count = clusterBalanceJobDao.updateById(clusterBalanceJobPO);
if (count < 1){
logger.error("method=modifyClusterBalanceJob||clusterBalanceJobPO:{}||errMsg=modify clusterBalanceJob failed", clusterBalanceJobPO);
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
}catch (Exception e){
logger.error("method=modifyClusterBalanceJob||clusterBalanceJobPO:{}||errMsg=exception", clusterBalanceJobPO, e);
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
return Result.buildSuc();
}
@Override
public Result<ClusterBalanceJobPO> getClusterBalanceJobById(Long id) {
return Result.buildSuc(clusterBalanceJobDao.selectById(id));
}
@Override
public ClusterBalanceJobPO getLastOneByClusterId(Long clusterPhyId) {
ClusterBalanceJobPO clusterBalanceJobPO = new ClusterBalanceJobPO();
clusterBalanceJobPO.setClusterId(clusterPhyId);
QueryWrapper<ClusterBalanceJobPO> queryWrapper = new QueryWrapper<>();
queryWrapper.setEntity(clusterBalanceJobPO);
queryWrapper.orderByDesc("id");
List<ClusterBalanceJobPO> clusterBalanceJobPOS = clusterBalanceJobDao.selectList(queryWrapper);
if (clusterBalanceJobPOS.isEmpty()){
return null;
}
return clusterBalanceJobPOS.get(0);
}
@Override
public Map<String, Double> getBalanceInterval(Long clusterPhyId) {
Result<ClusterBalanceJobConfigPO> configPOResult = clusterBalanceJobConfigService.getByClusterId(clusterPhyId);
if (!configPOResult.hasData()){
return new HashMap();
}
List<ClusterBalanceInterval> clusterBalanceIntervals = ConvertUtil.str2ObjArrayByJson(configPOResult.getData().getBalanceIntervalJson(), ClusterBalanceInterval.class);
return clusterBalanceIntervals.stream().collect(Collectors.toMap(ClusterBalanceInterval::getType,ClusterBalanceInterval::getIntervalPercent));
}
@Override
public PaginationResult<ClusterBalanceHistoryVO> page(Long clusterPhyId, PaginationBaseDTO dto) {
List<ClusterBalanceHistoryVO> historyVOS = new ArrayList<>();
LambdaQueryWrapper<ClusterBalanceJobPO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(ClusterBalanceJobPO::getClusterId, clusterPhyId);
List<Integer> status = Lists.newArrayList(JobStatusEnum.SUCCESS.getStatus(), JobStatusEnum.CANCELED.getStatus(), JobStatusEnum.FAILED.getStatus());
queryWrapper.in(ClusterBalanceJobPO::getStatus, status);
queryWrapper.orderByDesc(ClusterBalanceJobPO::getStartTime);
IPage<ClusterBalanceJobPO> page = clusterBalanceJobDao.selectPage(new Page<>(dto.getPageNo(), dto.getPageSize()), queryWrapper);
page.setTotal(clusterBalanceJobDao.selectCount(queryWrapper));
for (ClusterBalanceJobPO clusterBalanceJobPO : page.getRecords()){
ClusterBalanceHistoryVO clusterBalanceHistoryVO = new ClusterBalanceHistoryVO();
clusterBalanceHistoryVO.setBegin(clusterBalanceJobPO.getStartTime());
clusterBalanceHistoryVO.setEnd(clusterBalanceJobPO.getFinishedTime());
clusterBalanceHistoryVO.setJobId(clusterBalanceJobPO.getId());
List<ClusterBalancePlanDetail> detailVOS = ConvertUtil.str2ObjArrayByJson(clusterBalanceJobPO.getBrokerBalanceDetail(), ClusterBalancePlanDetail.class);
Map<String, ClusterBalanceHistorySubVO> subMap = new HashMap<>();
ClusterBalanceHistorySubVO diskSubVO = new ClusterBalanceHistorySubVO();
diskSubVO.setSuccessNu(detailVOS.stream().filter(clusterBalancePlanDetail -> clusterBalancePlanDetail.getDiskStatus() != null && clusterBalancePlanDetail.getDiskStatus() == 0).count());
diskSubVO.setFailedNu(detailVOS.stream().filter(clusterBalancePlanDetail -> clusterBalancePlanDetail.getDiskStatus() != null && clusterBalancePlanDetail.getDiskStatus() != 0).count());
subMap.put(Resource.DISK.resource(), diskSubVO);
ClusterBalanceHistorySubVO cupSubVO = new ClusterBalanceHistorySubVO();
cupSubVO.setSuccessNu(detailVOS.stream().filter(clusterBalancePlanDetail -> clusterBalancePlanDetail.getCpuStatus() != null && clusterBalancePlanDetail.getCpuStatus() == 0).count());
cupSubVO.setFailedNu(detailVOS.stream().filter(clusterBalancePlanDetail -> clusterBalancePlanDetail.getCpuStatus() != null && clusterBalancePlanDetail.getCpuStatus() != 0).count());
subMap.put(Resource.CPU.resource(), cupSubVO);
ClusterBalanceHistorySubVO bytesInSubVO = new ClusterBalanceHistorySubVO();
bytesInSubVO.setSuccessNu(detailVOS.stream().filter(clusterBalancePlanDetail -> clusterBalancePlanDetail.getByteInStatus() != null && clusterBalancePlanDetail.getByteInStatus() == 0).count());
bytesInSubVO.setFailedNu(detailVOS.stream().filter(clusterBalancePlanDetail -> clusterBalancePlanDetail.getByteInStatus() != null && clusterBalancePlanDetail.getByteInStatus() != 0).count());
subMap.put(Resource.NW_IN.resource(), bytesInSubVO);
ClusterBalanceHistorySubVO bytesOutSubVO = new ClusterBalanceHistorySubVO();
bytesOutSubVO.setSuccessNu(detailVOS.stream().filter(clusterBalancePlanDetail -> clusterBalancePlanDetail.getByteOutStatus() != null && clusterBalancePlanDetail.getByteOutStatus() == 0).count());
bytesOutSubVO.setFailedNu(detailVOS.stream().filter(clusterBalancePlanDetail -> clusterBalancePlanDetail.getByteOutStatus() != null && clusterBalancePlanDetail.getByteOutStatus() != 0).count());
subMap.put(Resource.NW_OUT.resource(), bytesOutSubVO);
clusterBalanceHistoryVO.setSub(subMap);
historyVOS.add(clusterBalanceHistoryVO);
}
return PaginationResult.buildSuc(historyVOS, page);
}
@Override
public Long getOneRunningJob(Long clusterPhyId) {
LambdaQueryWrapper<ClusterBalanceJobPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(ClusterBalanceJobPO::getClusterId, clusterPhyId);
lambdaQueryWrapper.eq(ClusterBalanceJobPO::getStatus, JobStatusEnum.RUNNING.getStatus());
List<ClusterBalanceJobPO> poList = clusterBalanceJobDao.selectList(lambdaQueryWrapper);
if (!ValidateUtils.isEmptyList(poList)) {
// 默认获取第一个
return poList.get(0).getId();
}
// 获取子任务中待执行的任务,避免主任务和子任务状态不一致
LambdaQueryWrapper<ClusterBalanceReassignPO> subLambdaQueryWrapper = new LambdaQueryWrapper<>();
subLambdaQueryWrapper.eq(ClusterBalanceReassignPO::getClusterId, clusterPhyId);
subLambdaQueryWrapper.eq(ClusterBalanceReassignPO::getStatus, JobStatusEnum.RUNNING.getStatus());
List<ClusterBalanceReassignPO> subPOList = clusterBalanceReassignDao.selectList(subLambdaQueryWrapper);
if (ValidateUtils.isEmptyList(subPOList)) {
return null;
}
return subPOList.get(0).getJobId();
}
@Override
@Transactional
public Result<Void> verifyClusterBalanceAndUpdateStatue(Long jobId) {
ClusterBalanceJobPO clusterBalanceJobPO = clusterBalanceJobDao.selectById(jobId);
if (clusterBalanceJobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, String.format("jobId:[%d] not exist", jobId));
}
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterBalanceJobPO.getClusterId());
if (clusterPhy == null){
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
Result<Void> rv = reassignService.changReassignmentThrottles(
new ExecuteReassignParam(clusterBalanceJobPO.getClusterId(), clusterBalanceJobPO.getReassignmentJson(), clusterBalanceJobPO.getThrottleUnitB())
);
if (rv.failed()) {
logger.error("method=verifyClusterBalanceAndUpdateStatue||jobId={}||result={}||msg=change throttle failed", jobId, rv);
return rv;
}
//获取规格信息
Map<Integer, BrokerSpec> brokerSpecMap = brokerSpecService.getBrokerSpecMap(clusterBalanceJobPO.getClusterId());
//获取broker信息
Map<Integer, Broker> brokerMap = brokerService.listAllBrokersFromDB(clusterBalanceJobPO.getClusterId()).stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
//更新平衡任务状态信息
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhy.getId(), configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
Map<Integer, BrokerBalanceState> brokerBalanceStateMap = ExecutionRebalance
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(clusterBalanceJobPO, brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames));
List<ClusterBalancePlanDetail> oldDetails = ConvertUtil.str2ObjArrayByJson(clusterBalanceJobPO.getBrokerBalanceDetail(), ClusterBalancePlanDetail.class);
List<ClusterBalancePlanDetail> newDetails = ClusterBalanceConverter.convert2ClusterBalancePlanDetail(oldDetails, brokerBalanceStateMap);
clusterBalanceJobPO.setBrokerBalanceDetail(ConvertUtil.obj2Json(newDetails));
Result<Void> modifyResult = this.modifyClusterBalanceJob(clusterBalanceJobPO, Constant.SYSTEM);
if (modifyResult.failed()){
logger.error("method=verifyClusterBalanceAndUpdateStatue||jobId:{}||errMsg={}", jobId, modifyResult);
return modifyResult;
}
//更新迁移任务状态信息
Result<Boolean> result = clusterBalanceReassignService.verifyAndUpdateStatue(clusterBalanceJobPO);
if (!result.hasData()){
return Result.buildFromIgnoreData(result);
}
rv = clusterBalanceReassignService.preferredReplicaElection(jobId);
if (rv.failed()){
logger.error("method=verifyClusterBalanceAndUpdateStatue||jobId={}||result={}||msg=preferred replica election failed", jobId, rv);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return rv;
}
return Result.buildSuc();
}
@Override
public Result<String> generateReassignmentJson(Long clusterId,
Integer parallelNum,
Integer executionStrategy,
Integer jsonVersion,
List<ClusterBalanceReassignPO> reassignPOList){
Result<List<ReassignTask>> result = reassignStrategyService.generateReassignmentTask(
new ReassignExecutionStrategy(
clusterId,
parallelNum,
executionStrategy,
ClusterBalanceReassignConverter.convert2ReplaceReassignSubList(reassignPOList)
)
);
if (result.failed() || result.getData().isEmpty()){
return Result.buildFromIgnoreData(result);
}
Map<String, Object> reassign = new HashMap<>();
reassign.put(KafkaConstant.PARTITIONS, result.getData());
reassign.put(KafkaConstant.VERSION, jsonVersion);
String generateReassignmentJson = ConvertUtil.obj2Json(reassign);
// 检查生成的迁移Json是否合法
Result<Void> rv = reassignService.parseExecuteAssignmentArgs(clusterId, generateReassignmentJson);
if (rv.failed()) {
return Result.buildFromIgnoreData(rv);
}
return Result.buildSuc(generateReassignmentJson);
}
@Override
@Transactional
public Result<Void> generateReassignmentForStrategy(Long clusterPhyId, Long jobId) {
ClusterBalanceJobPO job = clusterBalanceJobDao.selectById(jobId);
if (job == null){
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST,
MsgConstant.getReassignJobBizStr(jobId, clusterPhyId));
}
if (!JobStatusEnum.isRunning(job.getStatus()) || job.getParallelNum() < 1){
return Result.buildSuc();
}
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null){
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
List<ClusterBalanceReassignPO> reassignPOS = clusterBalanceReassignService.getBalanceReassignsByJobId(jobId);
//2.4以内版本因为无法动态增加副本迁移,所以需要等一部分任务完成之后再根据并行度重新获下一部分迁移任务
Double version = new Double(clusterPhy.getKafkaVersion().substring(0,3));
if (version < 2.4 && reassignPOS.stream()
.filter(reassignPO -> reassignPO.getStatus()==JobStatusEnum.RUNNING.getStatus()).count() > 0){
return Result.buildSuc();
}
//过滤已完成子任务
reassignPOS = reassignPOS.stream()
.filter(reassignPO -> reassignPO.getStatus()==JobStatusEnum.RUNNING.getStatus()
||reassignPO.getStatus()==JobStatusEnum.WAITING.getStatus())
.collect(Collectors.toList());
if (reassignPOS.isEmpty()){
return Result.buildSuc();
}
Map<String, Object> reassign = JSON.parseObject(job.getBalanceIntervalJson());
Result<String> r = this.generateReassignmentJson(job.getClusterId(),
job.getParallelNum(),
job.getExecutionStrategy(),
reassign.get(KafkaConstant.VERSION)==null?1:(Integer)reassign.get(KafkaConstant.VERSION) + 1,
reassignPOS);
if (!r.hasData()){
return Result.buildFromIgnoreData(r);
}
try {
//更新任务json
job.setReassignmentJson(r.getData());
job.setUpdateTime(new Date());
clusterBalanceJobDao.updateById(job);
//更新任务状态
modifyReassignStatus(r.getData(), reassignPOS);
}catch (Exception e){
logger.error("method=generateReassignmentForStrategy||jobId={}||errMsg=exception", jobId, e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
return reassignService.executePartitionReassignments(
new ExecuteReassignParam(job.getClusterId(), r.getData(), job.getThrottleUnitB()));
}
private Result<Void> buildActionForbidden(Long jobId, Integer jobStatus) {
return Result.buildFromRSAndMsg(
ResultStatus.OPERATION_FORBIDDEN,
String.format("jobId:[%d] 当前 status:[%s], 不允许被执行", jobId, JobStatusEnum.valueOfStatus(jobStatus))
);
}
private void modifyReassignStatus(String reassignmentJson, List<ClusterBalanceReassignPO> reassignPOS){
Map<String, Object> reassign = JSON.parseObject(reassignmentJson);
List<ReassignTask> reassignTasks = (List<ReassignTask>)reassign.get(KafkaConstant.PARTITIONS);
if (reassignTasks == null || reassignTasks.isEmpty()){
return;
}
// 更新子任务状态
reassignTasks.forEach(reassignTask -> {
for (ClusterBalanceReassignPO reassignPO: reassignPOS) {
if (reassignPO.getStatus().equals(JobStatusEnum.WAITING.getStatus())
&& reassignTask.getTopic().equals(reassignPO.getTopicName())
&& reassignTask.getPartition() == reassignPO.getPartitionId()) {
ClusterBalanceReassignPO newReassignPO = new ClusterBalanceReassignPO();
newReassignPO.setId(reassignPO.getId());
newReassignPO.setStatus(JobStatusEnum.RUNNING.getStatus());
newReassignPO.setUpdateTime(new Date());
clusterBalanceReassignDao.updateById(newReassignPO);
break;
}
}
});
}
}

View File

@@ -1,514 +0,0 @@
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.job.ClusterBalanceReassignDetail;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ReplicationMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.reassign.ExecuteReassignParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.reassign.ReassignResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.reassign.job.ReassignSubJobExtendData;
import com.xiaojukeji.know.streaming.km.common.bean.entity.reassign.strategy.ReassignTask;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobPO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceReassignPO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.converter.ClusterBalanceReassignConverter;
import com.xiaojukeji.know.streaming.km.common.enums.job.JobStatusEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceReassignService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignService;
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ReplicaMetricVersionItems;
import com.xiaojukeji.know.streaming.km.persistence.mysql.enterprise.rebalance.ClusterBalanceJobDao;
import com.xiaojukeji.know.streaming.km.persistence.mysql.enterprise.rebalance.ClusterBalanceReassignDao;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import java.util.*;
@Service
@EnterpriseLoadReBalance
public class ClusterBalanceReassignServiceImpl implements ClusterBalanceReassignService {
private static final ILog logger = LogFactory.getLog(ClusterBalanceReassignServiceImpl.class);
@Autowired
private ClusterBalanceReassignDao clusterBalanceReassignDao;
@Autowired
private ClusterBalanceJobDao clusterBalanceJobDao;
@Autowired
private ReassignService reassignService;
@Autowired
private ReplicaMetricService replicationMetricService;
@Autowired
private PartitionService partitionService;
@Autowired
private OpLogWrapService opLogWrapService;
@Autowired
private OpPartitionService opPartitionService;
@Override
public Result<Void> addBalanceReassign(ClusterBalanceReassignPO clusterBalanceReassignPO) {
if (clusterBalanceReassignPO == null) {
return Result.buildFrom(ResultStatus.NOT_EXIST);
}
try {
int count = clusterBalanceReassignDao.insert(clusterBalanceReassignPO);
if (count < 1) {
logger.error("create cluster balance reassign detail failed! clusterBalanceReassignPO:{}", clusterBalanceReassignPO);
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
} catch (Exception e) {
logger.error("create cluster balance reassign detail failed! clusterBalanceReassignPO:{}", clusterBalanceReassignPO, e);
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
return Result.buildSuc();
}
@Override
public Result<Void> addBatchBalanceReassign(List<ClusterBalanceReassignPO> reassignPOList) {
try {
int count = clusterBalanceReassignDao.addBatch(reassignPOList);
if (count < 1) {
logger.error("method=addBatchBalanceReassign||reassignPOList:{}||msg=create cluster balance reassign detail failed! ", reassignPOList);
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
} catch (Exception e) {
logger.error("method=addBatchBalanceReassign||reassignPOList:{}||msg=create cluster balance reassign detail failed! ", reassignPOList, e);
return Result.buildFrom(ResultStatus.MYSQL_OPERATE_FAILED);
}
return Result.buildSuc();
}
@Override
public Result<Void> delete(Long jobId, String operator) {
if (jobId == null) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull());
}
try {
ClusterBalanceJobPO jobPO = clusterBalanceJobDao.selectById(jobId);
if (jobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId));
}
LambdaQueryWrapper<ClusterBalanceReassignPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(ClusterBalanceReassignPO::getJobId, jobId);
clusterBalanceReassignDao.delete(lambdaQueryWrapper);
return Result.buildSuc();
} catch (Exception e) {
logger.error("method=delete||jobId={}||errMsg=exception", jobId, e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage());
}
}
@Override
public Result<Void> execute(Long jobId) {
if (jobId == null) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull());
}
ClusterBalanceJobPO jobPO = clusterBalanceJobDao.selectById(jobId);
if (jobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId));
}
if (!JobStatusEnum.canExecuteJob(jobPO.getStatus())) {
// 状态错误,禁止执行
return this.buildActionForbidden(jobId, jobPO.getStatus());
}
// 修改DB状态
this.setJobInRunning(jobPO);
// 执行任务
Result<Void> rv = reassignService.executePartitionReassignments(new ExecuteReassignParam(jobPO.getClusterId(), jobPO.getReassignmentJson(), jobPO.getThrottleUnitB()));
if (rv.failed()) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return rv;
}
return Result.buildSuc();
}
@Override
public Result<Void> cancel(Long jobId) {
if (jobId == null) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull());
}
try {
ClusterBalanceJobPO jobPO = clusterBalanceJobDao.selectById(jobId);
if (jobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId));
}
if (!JobStatusEnum.canCancelJob(jobPO.getStatus())) {
// 状态错误,禁止执行
return this.buildActionForbidden(jobId, jobPO.getStatus());
}
this.setJobCanceled(jobPO);
return Result.buildSuc();
} catch (Exception e) {
logger.error("method=cancel||jobId={}||errMsg=exception", jobId, e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage());
}
}
@Override
@Transactional
public Result<Boolean> verifyAndUpdateStatue(ClusterBalanceJobPO jobPO) {
if (jobPO == null) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, "job not exist");
}
// 检查迁移的结果
Result<ReassignResult> reassignResult = reassignService.verifyPartitionReassignments(
new ExecuteReassignParam(jobPO.getClusterId(), jobPO.getReassignmentJson(), jobPO.getThrottleUnitB())
);
if (reassignResult.failed()) {
return Result.buildFromIgnoreData(reassignResult);
}
// 更新任务状态
return this.checkAndSetSuccessIfFinished(jobPO, reassignResult.getData());
}
@Override
public Result<Void> modifyThrottle(Long jobId, Long throttleUnitB, String operator) {
if (jobId == null) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull());
}
try {
ClusterBalanceJobPO jobPO = clusterBalanceJobDao.selectById(jobId);
if (jobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId));
}
if (JobStatusEnum.isFinished(jobPO.getStatus())) {
// 状态错误,禁止执行
return this.buildActionForbidden(jobId, jobPO.getStatus());
}
// 修改限流值
jobPO.setThrottleUnitB(throttleUnitB);
clusterBalanceJobDao.updateById(jobPO);
// 记录操作
opLogWrapService.saveOplogAndIgnoreException(new OplogDTO(
operator,
OperationEnum.EDIT.getDesc(),
ModuleEnum.JOB_CLUSTER_BALANCE.getDesc(),
MsgConstant.getReassignJobBizStr(
jobId,
jobPO.getClusterId()
),
String.format("新的限流值:[%d]", throttleUnitB)
));
return Result.buildSuc();
} catch (Exception e) {
logger.error("method=modifyThrottle||jobId={}||throttleUnitB={}||errMsg=exception", jobId, throttleUnitB, e);
}
return Result.buildSuc();
}
@Override
public Result<Void> getAndUpdateSubJobExtendData(Long jobId) {
if (jobId == null) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull());
}
ClusterBalanceJobPO jobPO = clusterBalanceJobDao.selectById(jobId);
if (jobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId));
}
List<ClusterBalanceReassignPO> reassigns = this.getBalanceReassignsByJobId(jobId);
for (ClusterBalanceReassignPO reassignPO: reassigns) {
Result<ReassignSubJobExtendData> extendDataResult = this.getReassignSubJobExtendData(reassignPO);
if (extendDataResult.failed()) {
continue;
}
reassignPO.setExtendData(ConvertUtil.obj2Json(extendDataResult.getData()));
clusterBalanceReassignDao.updateById(reassignPO);
}
return Result.buildSuc();
}
@Override
public List<ClusterBalanceReassignPO> getBalanceReassignsByJobId(Long jobId) {
if (jobId == null) {
return new ArrayList<>();
}
LambdaQueryWrapper<ClusterBalanceReassignPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(ClusterBalanceReassignPO::getJobId, jobId);
return clusterBalanceReassignDao.selectList(lambdaQueryWrapper);
}
@Override
public Result<ClusterBalanceReassignDetail> getJobDetailsGroupByTopic(Long jobId) {
if (jobId == null) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull());
}
// 获取任务
ClusterBalanceJobPO jobPO = clusterBalanceJobDao.selectById(jobId);
if (jobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId));
}
// 获取子任务
List<ClusterBalanceReassignPO> subJobPOList = this.getBalanceReassignsByJobId(jobId);
// 数据组装并放回
return Result.buildSuc(ClusterBalanceReassignConverter.convert2ClusterBalanceReassignDetail(jobPO, subJobPOList));
}
@Override
public Result<Void> preferredReplicaElection(Long jobId) {
// 获取任务
ClusterBalanceJobPO jobPO = clusterBalanceJobDao.selectById(jobId);
if (jobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId));
}
if (!JobStatusEnum.isFinished(jobPO.getStatus())){
return Result.buildSuc();
}
// 获取子任务
List<ClusterBalanceReassignPO> subJobPOList = this.getBalanceReassignsByJobId(jobId);
List<TopicPartition> topicPartitions = new ArrayList<>();
subJobPOList.stream().forEach(reassignPO -> {
Integer targetLeader = CommonUtils.string2IntList(reassignPO.getReassignBrokerIds()).get(0);
Integer originalLeader = CommonUtils.string2IntList(reassignPO.getOriginalBrokerIds()).get(0);
//替换过leader的添加到优先副本选举任务列表
if (!originalLeader.equals(targetLeader)){
topicPartitions.add(new TopicPartition(reassignPO.getTopicName(), reassignPO.getPartitionId()));
}
});
if (!topicPartitions.isEmpty()){
return opPartitionService.preferredReplicaElection(jobPO.getClusterId(), topicPartitions);
}
return Result.buildSuc();
}
private Result<Void> buildActionForbidden(Long jobId, Integer jobStatus) {
return Result.buildFromRSAndMsg(
ResultStatus.OPERATION_FORBIDDEN,
String.format("jobId:[%d] 当前 status:[%s], 不允许被执行", jobId, JobStatusEnum.valueOfStatus(jobStatus))
);
}
private Result<Void> setJobInRunning(ClusterBalanceJobPO jobPO) {
Map<String, Object> reassign = JSON.parseObject(jobPO.getReassignmentJson());
if (reassign.isEmpty()){
return Result.buildSuc();
}
List<ReassignTask> reassignTasks = JSON.parseArray(JSON.toJSONString(reassign.get(KafkaConstant.PARTITIONS)), ReassignTask.class);
if (reassignTasks == null || reassignTasks.isEmpty()){
return Result.buildSuc();
}
long now = System.currentTimeMillis();
// 更新子任务状态
List<ClusterBalanceReassignPO> reassignPOS = this.getBalanceReassignsByJobId(jobPO.getId());
reassignTasks.forEach(reassignTask -> {
for (ClusterBalanceReassignPO reassignPO: reassignPOS) {
if (reassignTask.getTopic().equals(reassignPO.getTopicName())
&& reassignTask.getPartition() == reassignPO.getPartitionId()) {
ClusterBalanceReassignPO newReassignPO = new ClusterBalanceReassignPO();
newReassignPO.setId(reassignPO.getId());
newReassignPO.setStatus(JobStatusEnum.RUNNING.getStatus());
newReassignPO.setStartTime(new Date(now));
newReassignPO.setUpdateTime(new Date(now));
clusterBalanceReassignDao.updateById(newReassignPO);
break;
}
}
});
// 更新父任务状态
ClusterBalanceJobPO newJobPO = new ClusterBalanceJobPO();
newJobPO.setId(jobPO.getId());
newJobPO.setStatus(JobStatusEnum.RUNNING.getStatus());
newJobPO.setStartTime(new Date(now));
newJobPO.setUpdateTime(new Date(now));
clusterBalanceJobDao.updateById(newJobPO);
return Result.buildSuc();
}
private Result<Void> setJobCanceled(ClusterBalanceJobPO jobPO) {
// 更新子任务状态
List<ClusterBalanceReassignPO> reassignPOS = this.getBalanceReassignsByJobId(jobPO.getId());
for (ClusterBalanceReassignPO reassignPO: reassignPOS) {
ClusterBalanceReassignPO newReassignPO = new ClusterBalanceReassignPO();
newReassignPO.setId(reassignPO.getId());
newReassignPO.setStatus(JobStatusEnum.CANCELED.getStatus());
clusterBalanceReassignDao.updateById(newReassignPO);
}
// 更新父任务状态
ClusterBalanceJobPO newJobPO = new ClusterBalanceJobPO();
newJobPO.setId(jobPO.getId());
newJobPO.setStatus(JobStatusEnum.CANCELED.getStatus());
clusterBalanceJobDao.updateById(newJobPO);
return Result.buildSuc();
}
private Result<Boolean> checkAndSetSuccessIfFinished(ClusterBalanceJobPO jobPO, ReassignResult reassignmentResult) {
long now = System.currentTimeMillis();
List<ClusterBalanceReassignPO> reassignPOS = this.getBalanceReassignsByJobId(jobPO.getId());
boolean existNotFinished = false;
for (ClusterBalanceReassignPO balanceReassignPO: reassignPOS) {
if (!reassignmentResult.checkPartitionFinished(balanceReassignPO.getTopicName(), balanceReassignPO.getPartitionId())) {
existNotFinished = true;
continue;
}
// 更新状态
ClusterBalanceReassignPO newReassignPO = new ClusterBalanceReassignPO();
newReassignPO.setId(balanceReassignPO.getId());
newReassignPO.setStatus(JobStatusEnum.SUCCESS.getStatus());
newReassignPO.setFinishedTime(new Date(now));
clusterBalanceReassignDao.updateById(newReassignPO);
}
// 更新任务状态
if (!existNotFinished && !reassignmentResult.isPartsOngoing()) {
ClusterBalanceJobPO newBalanceJobPO = new ClusterBalanceJobPO();
newBalanceJobPO.setId(jobPO.getId());
newBalanceJobPO.setStatus(JobStatusEnum.SUCCESS.getStatus());
newBalanceJobPO.setFinishedTime(new Date(now));
clusterBalanceJobDao.updateById(newBalanceJobPO);
}
return Result.buildSuc(reassignmentResult.isPartsOngoing());
}
private Result<ReassignSubJobExtendData> getReassignSubJobExtendData(ClusterBalanceReassignPO subJobPO) {
Partition partition = partitionService.getPartitionByTopicAndPartitionId(
subJobPO.getClusterId(),
subJobPO.getTopicName(),
subJobPO.getPartitionId()
);
if (partition == null) {
// 分区不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getPartitionNotExist(subJobPO.getClusterId(),
subJobPO.getTopicName(),
subJobPO.getPartitionId())
);
}
// 获取leader副本
Float leaderLogSize = this.getReplicaLogSize(subJobPO.getClusterId(),
partition.getLeaderBrokerId(),
subJobPO.getTopicName(),
subJobPO.getPartitionId()
);
// 获取新增的副本
Set<Integer> newReplicas = new HashSet<>(CommonUtils.string2IntList(subJobPO.getReassignBrokerIds()));
newReplicas.removeAll(CommonUtils.string2IntList(subJobPO.getOriginalBrokerIds()));
Long finishedLogSizeUnitB = 0L;
for (Integer brokerId: newReplicas) {
Float replicaLogSize = this.getReplicaLogSize(subJobPO.getClusterId(),
brokerId,
subJobPO.getTopicName(),
subJobPO.getPartitionId()
);
if (replicaLogSize == null) {
continue;
}
finishedLogSizeUnitB += replicaLogSize.longValue();
}
ReassignSubJobExtendData extendData = ConvertUtil.str2ObjByJson(subJobPO.getExtendData(), ReassignSubJobExtendData.class);
extendData.setFinishedReassignLogSizeUnitB(finishedLogSizeUnitB);
if (leaderLogSize != null) {
extendData.setNeedReassignLogSizeUnitB(leaderLogSize.longValue() * newReplicas.size());
}
//迁移任务已完成时若分区指标未更新已完成logSize等于需要迁移logSize
if (JobStatusEnum.isFinished(subJobPO.getStatus()) && finishedLogSizeUnitB.equals(0L)){
extendData.setFinishedReassignLogSizeUnitB(extendData.getNeedReassignLogSizeUnitB());
}
if (extendData.getNeedReassignLogSizeUnitB().equals(0L) || JobStatusEnum.isFinished(subJobPO.getStatus())) {
extendData.setRemainTimeUnitMs(0L);
} else if (extendData.getFinishedReassignLogSizeUnitB().equals(0L)) {
// 未知
extendData.setRemainTimeUnitMs(null);
} else {
Long usedTime = System.currentTimeMillis() - subJobPO.getStartTime().getTime();
// (需迁移LogSize / 已迁移LogSize) = (总时间 / 已进行时间)
extendData.setRemainTimeUnitMs(extendData.getNeedReassignLogSizeUnitB() * usedTime / extendData.getFinishedReassignLogSizeUnitB());
}
return Result.buildSuc(extendData);
}
private Float getReplicaLogSize(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId) {
Result<ReplicationMetrics> replicaMetricsResult = replicationMetricService.collectReplicaMetricsFromKafka(
clusterPhyId,
topicName,
partitionId,
brokerId,
ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE
);
if (!replicaMetricsResult.hasData()) {
return null;
}
return replicaMetricsResult.getData().getMetric(ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE);
}
}

View File

@@ -1,481 +0,0 @@
package com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.dto.ClusterBalanceIntervalDTO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.dto.ClusterBalanceOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.dto.ClusterBalancePreviewDTO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.dto.ClusterBalanceStrategyDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.job.JobDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.BrokerSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.ClusterBalanceItemState;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.job.content.JobClusterBalanceContent;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobConfigPO;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.po.ClusterBalanceJobPO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.vo.*;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.converter.ClusterBalanceConverter;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.enums.ClusterBalanceStateEnum;
import com.xiaojukeji.know.streaming.km.common.enums.job.JobHandleEnum;
import com.xiaojukeji.know.streaming.km.common.enums.job.JobStatusEnum;
import com.xiaojukeji.know.streaming.km.common.enums.job.JobTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerSpecService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceJobConfigService;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceJobService;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceService;
import com.xiaojukeji.know.streaming.km.core.service.config.ConfigUtils;
import com.xiaojukeji.know.streaming.km.core.service.job.JobService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.BrokerMetricVersionItems;
import com.xiaojukeji.know.streaming.km.rebalance.executor.ExecutionRebalance;
import com.xiaojukeji.know.streaming.km.rebalance.executor.common.BalanceParameter;
import com.xiaojukeji.know.streaming.km.rebalance.executor.common.BrokerBalanceState;
import com.xiaojukeji.know.streaming.km.rebalance.model.Resource;
import com.xiaojukeji.know.streaming.km.rebalance.executor.common.OptimizerResult;
import org.apache.logging.log4j.core.util.CronExpression;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.text.ParseException;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
@EnterpriseLoadReBalance
public class ClusterBalanceServiceImpl implements ClusterBalanceService {
private static final ILog logger = LogFactory.getLog(ClusterBalanceServiceImpl.class);
@Value("${es.client.address}")
private String esAddress;
@Autowired
private JobService jobService;
@Autowired
private ClusterBalanceJobService clusterBalanceJobService;
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private BrokerService brokerService;
@Autowired
private BrokerSpecService brokerSpecService;
@Autowired
private ClusterBalanceJobConfigService clusterBalanceJobConfigService;
@Autowired
private BrokerMetricService brokerMetricService;
@Autowired
private TopicService topicService;
@Autowired
private ConfigUtils configUtils;
@Override
public Result<ClusterBalanceStateVO> state(Long clusterPhyId) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null){
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
Result<ClusterBalanceJobConfigPO> configPOResult = clusterBalanceJobConfigService.getByClusterId(clusterPhyId);
if(!configPOResult.hasData()){
return Result.buildFromIgnoreData(configPOResult);
}
Map<Integer, Broker> brokerMap = brokerService.listAllBrokersFromDB(clusterPhy.getId()).stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
Map<Integer, BrokerSpec> brokerSpecMap = brokerSpecService.getBrokerSpecMap(clusterPhy.getId());
ClusterBalanceStateVO clusterBalanceStateVO = new ClusterBalanceStateVO();
try {
CronExpression cronExpression = new CronExpression(configPOResult.getData().getTaskCron());
//是否到满足周期时间
clusterBalanceStateVO.setNext(cronExpression.getTimeAfter(new Date()));
} catch (ParseException e) {
logger.error("method=state||clusterId:{}||errMsg=exception", clusterPhyId, e);
}
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhyId, configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
clusterBalanceStateVO.setEnable(configPOResult.getData().getStatus() == 1);
Map<Resource, Double> resourceDoubleMap;
Map<Integer, BrokerBalanceState> brokerBalanceStateMap;
try {
resourceDoubleMap = ExecutionRebalance.getClusterAvgResourcesState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames));
brokerBalanceStateMap = ExecutionRebalance
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames));
}catch (Exception e){
logger.error("method=state||clusterPhyId={}||errMsg=exception", clusterPhyId, e);
return Result.buildFailure(e.getMessage());
}
//集群状态信息
ArrayList<BrokerBalanceState> balanceStates = new ArrayList(brokerBalanceStateMap.values());
clusterBalanceStateVO.setStatus(ClusterBalanceStateEnum.BALANCE.getState());
balanceStates.forEach(brokerBalanceState ->{
if ((brokerBalanceState.getDiskBalanceState() != null && !brokerBalanceState.getDiskBalanceState().equals(ClusterBalanceStateEnum.BALANCE.getState()))
|| (brokerBalanceState.getCpuBalanceState() != null && !brokerBalanceState.getCpuBalanceState().equals(ClusterBalanceStateEnum.BALANCE.getState()))
|| (brokerBalanceState.getBytesOutBalanceState() != null && !brokerBalanceState.getBytesOutBalanceState().equals(ClusterBalanceStateEnum.BALANCE.getState()))
|| (brokerBalanceState.getBytesInBalanceState() != null && !brokerBalanceState.getBytesInBalanceState().equals(ClusterBalanceStateEnum.BALANCE.getState()))){
clusterBalanceStateVO.setStatus(ClusterBalanceStateEnum.UNBALANCED.getState());
}
});
clusterBalanceStateVO.setSub(getStateSubVOMap(resourceDoubleMap, balanceStates, clusterPhyId));
return Result.buildSuc(clusterBalanceStateVO);
}
@Override
public Result<ClusterBalanceJobConfigVO> config(Long clusterPhyId) {
Result<ClusterBalanceJobConfigPO> configPOResult = clusterBalanceJobConfigService.getByClusterId(clusterPhyId);
if (!configPOResult.hasData()){
return Result.buildFromIgnoreData(configPOResult);
}
return Result.buildSuc(ClusterBalanceConverter.convert2ClusterBalanceJobConfigVO(configPOResult.getData()));
}
@Override
public PaginationResult<ClusterBalanceOverviewVO> overview(Long clusterPhyId, ClusterBalanceOverviewDTO dto) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null){
return PaginationResult.buildFailure(ResultStatus.CLUSTER_NOT_EXIST, dto);
}
Result<ClusterBalanceJobConfigPO> configPOResult = clusterBalanceJobConfigService.getByClusterId(clusterPhyId);
if(configPOResult.failed()){
return PaginationResult.buildFailure(configPOResult, dto);
}
//获取规格信息
Map<Integer, BrokerSpec> brokerSpecMap = brokerSpecService.getBrokerSpecMap(clusterPhyId);
List<ClusterBalanceOverviewVO> clusterBalanceOverviewVOS = new ArrayList<>();
List<Broker> brokerList = brokerService.listAllBrokersFromDB(clusterPhyId);
Map<Integer, Broker> brokerMap = brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
Map<Integer, BrokerBalanceState> brokerBalanceStateMap = new HashMap<>();
if (configPOResult.hasData()) {
try {
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhyId, configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
brokerBalanceStateMap = ExecutionRebalance
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames));
} catch (Exception e) {
logger.error("method=overview||clusterBalanceOverviewDTO={}||errMsg=exception", dto, e);
return PaginationResult.buildFailure(e.getMessage(), dto);
}
}
// 获取指标
Result<List<BrokerMetrics>> metricsResult = brokerMetricService.getLatestMetricsFromES(
clusterPhyId,
brokerList.stream().filter(elem1 -> elem1.alive()).map(elem2 -> elem2.getBrokerId()).collect(Collectors.toList())
);
if (metricsResult.failed()){
return PaginationResult.buildFailure(metricsResult, dto);
}
Map<Integer, BrokerMetrics> brokerMetricsMap = new HashMap<>();
if (metricsResult.hasData()){
brokerMetricsMap = metricsResult.getData().stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity()));
}
for(Map.Entry<Integer, Broker> entry : brokerMap.entrySet()){
Broker broker = entry.getValue();
if (broker == null){
continue;
}
ClusterBalanceOverviewVO clusterBalanceOverviewVO = new ClusterBalanceOverviewVO();
clusterBalanceOverviewVO.setBrokerId(entry.getKey());
clusterBalanceOverviewVO.setHost(broker.getHost());
clusterBalanceOverviewVO.setRack(broker.getRack());
BrokerMetrics brokerMetrics = brokerMetricsMap.get(entry.getKey());
if (brokerMetrics != null){
clusterBalanceOverviewVO.setLeader(brokerMetrics.getMetric( BrokerMetricVersionItems.BROKER_METRIC_LEADERS)!=null
?brokerMetrics.getMetric( BrokerMetricVersionItems.BROKER_METRIC_LEADERS).intValue():null);
clusterBalanceOverviewVO.setReplicas(brokerMetrics.getMetric( BrokerMetricVersionItems.BROKER_METRIC_PARTITIONS)!=null
?brokerMetrics.getMetric( BrokerMetricVersionItems.BROKER_METRIC_PARTITIONS).intValue():null);
}
clusterBalanceOverviewVO.setSub(ClusterBalanceConverter.convert2MapClusterBalanceOverviewSubVO(brokerSpecMap.get(entry.getKey()), brokerBalanceStateMap.get(entry.getKey())));
clusterBalanceOverviewVOS.add(clusterBalanceOverviewVO);
}
//过滤status
if (dto.getStateParam()!= null && dto.getStateParam().size()>0){
clusterBalanceOverviewVOS = filterState(dto.getStateParam(), clusterBalanceOverviewVOS);
}
clusterBalanceOverviewVOS = PaginationUtil.pageByFuzzyFilter(ConvertUtil.list2List(clusterBalanceOverviewVOS, ClusterBalanceOverviewVO.class), dto.getSearchKeywords(), Arrays.asList("host"));
return PaginationResult.buildSuc(clusterBalanceOverviewVOS, clusterBalanceOverviewVOS.size(), dto.getPageNo(), dto.getPageSize());
}
@Override
public Result<ClusterBalanceItemState> getItemState(Long clusterPhyId) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null){
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
ClusterBalanceItemState clusterBalanceState = new ClusterBalanceItemState();
Result<ClusterBalanceJobConfigPO> configPOResult = clusterBalanceJobConfigService.getByClusterId(clusterPhyId);
if(!configPOResult.hasData()){
clusterBalanceState.setConfigureBalance(Boolean.FALSE);
clusterBalanceState.setEnable(Boolean.FALSE);
return Result.buildSuc(clusterBalanceState);
}
Map<Integer, Broker> brokerMap = brokerService.listAllBrokersFromDB(clusterPhy.getId()).stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
Map<Integer, BrokerSpec> brokerSpecMap = brokerSpecService.getBrokerSpecMap(clusterPhy.getId());
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhyId, configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
clusterBalanceState.setConfigureBalance(Boolean.TRUE);
clusterBalanceState.setEnable(configPOResult.getData().getStatus() == 1);
Map<Integer, BrokerBalanceState> brokerBalanceStateMap;
try {
brokerBalanceStateMap = ExecutionRebalance
.getBrokerResourcesBalanceState(ClusterBalanceConverter.convert2BalanceParameter(configPOResult.getData(), brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames));
}catch (Exception e){
logger.error("method=state||clusterPhyId={}||errMsg=exception", clusterPhyId, e);
return Result.buildFailure(e.getMessage());
}
Map<String, Boolean> itemStateMap = new HashMap<>();
//集群状态信息
ArrayList<BrokerBalanceState> balanceStates = new ArrayList(brokerBalanceStateMap.values());
List<ClusterBalanceIntervalDTO> intervalDTOS = ConvertUtil.str2ObjArrayByJson(configPOResult.getData().getBalanceIntervalJson(), ClusterBalanceIntervalDTO.class);
intervalDTOS.forEach(intervalDTO->{
if (Resource.CPU.resource().equals(intervalDTO.getType())){
itemStateMap.put(Resource.CPU.resource(), balanceStates.stream()
.filter(status->ClusterBalanceStateEnum.BALANCE.getState().equals(status.getCpuBalanceState())).count()==brokerMap.size());
}else if (Resource.NW_IN.resource().equals(intervalDTO.getType())){
itemStateMap.put(Resource.NW_IN.resource(), balanceStates.stream()
.filter(status->ClusterBalanceStateEnum.BALANCE.getState().equals(status.getBytesInBalanceState())).count()==brokerMap.size());
}else if (Resource.NW_OUT.resource().equals(intervalDTO.getType())){
itemStateMap.put(Resource.NW_OUT.resource(), balanceStates.stream()
.filter(status->ClusterBalanceStateEnum.BALANCE.getState().equals(status.getBytesOutBalanceState())).count()==brokerMap.size());
}else if (Resource.DISK.resource().equals(intervalDTO.getType())){
itemStateMap.put(Resource.DISK.resource(), balanceStates.stream()
.filter(status->ClusterBalanceStateEnum.BALANCE.getState().equals(status.getDiskBalanceState())).count()==brokerMap.size());
}
});
clusterBalanceState.setItemState(itemStateMap);
return Result.buildSuc(clusterBalanceState);
}
private List<ClusterBalanceOverviewVO> filterState(Map<String, Integer> stateParam, List<ClusterBalanceOverviewVO> oldVos){
if (stateParam.isEmpty()){
return oldVos;
}
List<ClusterBalanceOverviewVO> overviewVOS = new ArrayList<>();
for(ClusterBalanceOverviewVO oldVo : oldVos){
Boolean check = true;
for(Map.Entry<String, Integer> paramEntry : stateParam.entrySet()){
ClusterBalanceOverviewSubVO subVO = oldVo.getSub().get(paramEntry.getKey());
if (subVO == null){
check = false;
continue;
}
if (subVO.getStatus()==null || !subVO.getStatus().equals(paramEntry.getValue())){
check = false;
}
}
if (check){
overviewVOS.add(oldVo);
}
}
return overviewVOS;
}
@Override
public PaginationResult<ClusterBalanceHistoryVO> history(Long clusterPhyId, PaginationBaseDTO dto) {
return clusterBalanceJobService.page(clusterPhyId, dto);
}
@Override
public Result<ClusterBalancePlanVO> plan(Long clusterPhyId, Long jobId) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null){
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
Result<ClusterBalanceJobPO> jobPOResult = clusterBalanceJobService.getClusterBalanceJobById(jobId);
if (jobPOResult.failed()){
return Result.buildFrom(ResultStatus.NOT_EXIST);
}
List<Broker> allBrokers= brokerService.listAllBrokersFromDB(clusterPhyId);
ClusterBalancePlanVO planVO = new ClusterBalancePlanVO();
ClusterBalanceJobPO jobPO = jobPOResult.getData();
planVO.setMoveSize(jobPO.getTotalReassignSize());
planVO.setBrokers(ClusterBalanceConverter.convert2HostList(allBrokers, jobPO.getBrokers()));
planVO.setBlackTopics(CommonUtils.string2StrList(jobPO.getTopicBlackList()));
planVO.setReplicas(jobPO.getTotalReassignReplicaNum());
planVO.setType(jobPO.getType());
planVO.setTopics(CommonUtils.string2StrList(jobPO.getMoveInTopicList()));
planVO.setDetail(ConvertUtil.str2ObjArrayByJson(jobPO.getBrokerBalanceDetail(), ClusterBalancePlanDetailVO.class));
planVO.setReassignmentJson(jobPO.getReassignmentJson());
planVO.setClusterBalanceIntervalList(ConvertUtil.str2ObjArrayByJson(jobPO.getBalanceIntervalJson(), ClusterBalanceIntervalVO.class));
return Result.buildSuc(planVO);
}
@Override
public Result<ClusterBalancePlanVO> preview(Long clusterPhyId, ClusterBalancePreviewDTO clusterBalancePreviewDTO) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null){
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
List<Broker> allBrokers = brokerService.listAllBrokersFromDB(clusterPhy.getId());
Map<Integer, BrokerSpec> brokerSpecMap = brokerSpecService.getBrokerSpecMap(clusterPhy.getId());
for(Broker broker:allBrokers){
if (brokerSpecMap.get(broker.getBrokerId()) == null){
return Result.buildFromRSAndMsg(ResultStatus.BROKER_SPEC_NOT_EXIST,String.format("Broker规格信息不存在:brokerId:%s", broker.getBrokerId()));
}
}
if (clusterBalancePreviewDTO.getBrokers() == null || clusterBalancePreviewDTO.getBrokers().isEmpty()){
clusterBalancePreviewDTO.setBrokers(
allBrokers.stream().map(Broker::getBrokerId).collect(Collectors.toList()));
}
//获取任务计划
Map<Integer, Broker> brokerMap = allBrokers.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity()));
List<String> topicNames = topicService.listRecentUpdateTopicNamesFromDB(clusterPhyId, configUtils.getClusterBalanceIgnoredTopicsTimeSecond());
BalanceParameter balanceParameter = ClusterBalanceConverter.convert2BalanceParameter(clusterBalancePreviewDTO, brokerMap, brokerSpecMap, clusterPhy, esAddress, topicNames);
ExecutionRebalance executionRebalance = new ExecutionRebalance();
try {
OptimizerResult optimizerResult = executionRebalance.optimizations(balanceParameter);
if (optimizerResult == null) {
return Result.buildFrom(ResultStatus.KAFKA_OPERATE_FAILED);
}
//生成平衡job
return Result.buildSuc(ClusterBalanceConverter.convert2ClusterBalancePlanVO(clusterBalancePreviewDTO, optimizerResult, allBrokers));
} catch (Exception e){
logger.error("method=preview||clusterBalancePreviewDTO:{}||errMsg=exception", clusterBalancePreviewDTO, e);
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}
}
@Override
public Result<ClusterBalancePlanVO> schedule(Long clusterPhyId, Long jobId) {
Result<ClusterBalanceJobPO> rbr= clusterBalanceJobService.getClusterBalanceJobById(jobId);
if (!rbr.hasData()){
return Result.buildFromIgnoreData(rbr);
}
return preview(clusterPhyId, ClusterBalanceConverter.convert2ClusterBalancePreviewDTO(rbr.getData()));
}
@Override
public Result<Void> strategy(Long clusterPhyId, ClusterBalanceStrategyDTO dto, String operator) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null){
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
//如果不是周期任务,那么就直接往 jobService 中添加一个任务
if(!dto.isScheduleJob()){
JobDTO jobDTO = new JobDTO();
jobDTO.setPlanTime(new Date());
jobDTO.setJobStatus(JobStatusEnum.WAITING.getStatus());
jobDTO.setCreator(operator);
jobDTO.setJobType(JobHandleEnum.CLUSTER_BALANCE.getType());
jobDTO.setTarget(JobHandleEnum.CLUSTER_BALANCE.getMessage());
jobDTO.setJobData(ConvertUtil.obj2Json(dto));
return jobService.addTask(clusterPhyId, jobDTO, operator);
}else {
return clusterBalanceJobConfigService.replaceClusterBalanceJobConfigByClusterId(ClusterBalanceConverter.convert2ClusterBalanceJobConfigPO(dto, operator));
}
}
@Override
public Result<Void> createScheduleJob(Long clusterPhyId, long triggerTimeUnitMs){
//获取到 clusterPhyId 对应的周期任务策略
Result<ClusterBalanceJobConfigPO> configPOResult = clusterBalanceJobConfigService.getByClusterId(clusterPhyId);
if (!configPOResult.hasData() || configPOResult.getData().getStatus().equals(Constant.DOWN)){
return Result.buildSuc();
}
try {
CronExpression cronExpression = new CronExpression(configPOResult.getData().getTaskCron());
//是否到满足周期时间
if (!cronExpression.isSatisfiedBy(new Date(triggerTimeUnitMs))){
return Result.buildSuc();
}
} catch (ParseException e) {
logger.error("method=createScheduleJob||clusterId:{}||errMsg=exception", clusterPhyId, e);
e.printStackTrace();
}
//满足周期时间新增job任务
JobDTO jobDTO = new JobDTO();
jobDTO.setPlanTime(new Date());
jobDTO.setJobStatus(JobStatusEnum.WAITING.getStatus());
jobDTO.setCreator(Constant.SYSTEM);
jobDTO.setJobType(JobTypeEnum.CLUSTER_BALANCE.getType());
jobDTO.setTarget(JobHandleEnum.CLUSTER_BALANCE.getMessage());
JobClusterBalanceContent content = ClusterBalanceConverter.convert2JobClusterBalanceContent(configPOResult.getData());
jobDTO.setJobData(ConvertUtil.obj2Json(content));
return jobService.addTask(clusterPhyId, jobDTO, Constant.SYSTEM);
}
private Map<String, ClusterBalanceStateSubVO> getStateSubVOMap(Map<Resource, Double> resourceDoubleMap, ArrayList<BrokerBalanceState> balanceStates, Long clusterId){
Map<String, ClusterBalanceStateSubVO> subVOMap = new HashMap<>();
Map<String, Double> balanceInterval = clusterBalanceJobService.getBalanceInterval(clusterId);
for (Map.Entry<Resource, Double> entry : resourceDoubleMap.entrySet()){
Resource resource = entry.getKey();
if (Resource.CPU.resource().equals(resource.resource())){
ClusterBalanceStateSubVO cpuSubVo = new ClusterBalanceStateSubVO();
cpuSubVo.setAvg(entry.getValue());
cpuSubVo.setInterval(balanceInterval.get(Resource.CPU.resource()));
cpuSubVo.setBetweenNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.BALANCE.getState().equals(status.getCpuBalanceState())).count());
cpuSubVo.setBigNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.ABOVE_BALANCE.getState().equals(status.getCpuBalanceState())).count());
cpuSubVo.setSmallNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.BELOW_BALANCE.getState().equals(status.getCpuBalanceState())).count());
subVOMap.put(Resource.CPU.resource(), cpuSubVo);
}else if (Resource.NW_IN.resource().equals(resource.resource())){
ClusterBalanceStateSubVO cpuSubVo = new ClusterBalanceStateSubVO();
cpuSubVo.setAvg(entry.getValue());
cpuSubVo.setInterval(balanceInterval.get(Resource.NW_IN.resource()));
cpuSubVo.setBetweenNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.BALANCE.getState().equals(status.getBytesInBalanceState())).count());
cpuSubVo.setBigNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.ABOVE_BALANCE.getState().equals(status.getBytesInBalanceState())).count());
cpuSubVo.setSmallNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.BELOW_BALANCE.getState().equals(status.getBytesInBalanceState())).count());
subVOMap.put(Resource.NW_IN.resource(), cpuSubVo);
}else if (Resource.NW_OUT.resource().equals(resource.resource())){
ClusterBalanceStateSubVO cpuSubVo = new ClusterBalanceStateSubVO();
cpuSubVo.setAvg(entry.getValue());
cpuSubVo.setInterval(balanceInterval.get(Resource.NW_OUT.resource()));
cpuSubVo.setBetweenNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.BALANCE.getState().equals(status.getBytesOutBalanceState())).count());
cpuSubVo.setBigNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.ABOVE_BALANCE.getState().equals(status.getBytesOutBalanceState())).count());
cpuSubVo.setSmallNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.BELOW_BALANCE.getState().equals(status.getBytesOutBalanceState())).count());
subVOMap.put(Resource.NW_OUT.resource(), cpuSubVo);
}else if (Resource.DISK.resource().equals(resource.resource())){
ClusterBalanceStateSubVO cpuSubVo = new ClusterBalanceStateSubVO();
cpuSubVo.setAvg(entry.getValue());
cpuSubVo.setInterval(balanceInterval.get(Resource.DISK.resource()));
cpuSubVo.setBetweenNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.BALANCE.getState().equals(status.getDiskBalanceState())).count());
cpuSubVo.setBigNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.ABOVE_BALANCE.getState().equals(status.getDiskBalanceState())).count());
cpuSubVo.setSmallNu(balanceStates.stream().filter(status->ClusterBalanceStateEnum.BELOW_BALANCE.getState().equals(status.getDiskBalanceState())).count());
subVOMap.put(Resource.DISK.resource(), cpuSubVo);
}
}
return subVOMap;
}
}

View File

@@ -3,12 +3,10 @@ package com.xiaojukeji.know.streaming.km.core.service.cluster.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.google.common.collect.Table;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricsClusterPhyDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.enterprise.rebalance.bean.entity.ClusterBalanceItemState;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics;
@@ -40,8 +38,6 @@ import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.enterprise.rebalance.service.ClusterBalanceService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.core.service.health.state.HealthStateService;
import com.xiaojukeji.know.streaming.km.core.service.job.JobService;
@@ -54,7 +50,6 @@ import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.ClusterMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
import com.xiaojukeji.know.streaming.km.rebalance.model.Resource;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.resource.ResourceType;
import org.springframework.beans.factory.annotation.Autowired;
@@ -72,13 +67,11 @@ import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics.initWithMetrics;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.ClusterMetricVersionItems.*;
/**
* @author didi
*/
@Service("clusterMetricService")
@EnterpriseLoadReBalance(all = false)
@Service
public class ClusterMetricServiceImpl extends BaseMetricService implements ClusterMetricService {
private static final ILog LOGGER = LogFactory.getLog(ClusterMetricServiceImpl.class);
@@ -117,9 +110,6 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
public static final String CLUSTER_METHOD_GET_JOBS_SUCCESS = "getJobsSuccess";
public static final String CLUSTER_METHOD_GET_JOBS_FAILED = "getJobsFailed";
@EnterpriseLoadReBalance(all = false)
public static final String CLUSTER_METHOD_GET_CLUSTER_LOAD_RE_BALANCE_INFO = "getClusterLoadReBalanceInfo";
@Autowired
private HealthStateService healthStateService;
@@ -159,12 +149,6 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
@Autowired
private JobService jobService;
@Autowired
private ClusterBalanceService clusterBalanceService;
@Autowired
private ClusterPhyService clusterPhyService;
@Override
protected VersionItemTypeEnum getVersionItemType() {
return VersionItemTypeEnum.METRIC_CLUSTER;
@@ -176,7 +160,6 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
}
@Override
@EnterpriseLoadReBalance(all = false)
protected void initRegisterVCHandler(){
registerVCHandler( CLUSTER_METHOD_DO_NOTHING, this::doNothing);
registerVCHandler( CLUSTER_METHOD_GET_TOPIC_SIZE, this::getTopicSize);
@@ -216,8 +199,6 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
registerVCHandler( CLUSTER_METHOD_GET_JOBS_WAITING, this::getJobsWaiting);
registerVCHandler( CLUSTER_METHOD_GET_JOBS_SUCCESS, this::getJobsSuccess);
registerVCHandler( CLUSTER_METHOD_GET_JOBS_FAILED, this::getJobsFailed);
registerVCHandler( CLUSTER_METHOD_GET_CLUSTER_LOAD_RE_BALANCE_INFO, this::getClusterLoadReBalanceInfo);
}
@Override
@@ -680,26 +661,6 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
return Result.buildSuc(initWithMetrics(clusterId, metric, count));
}
@EnterpriseLoadReBalance
private Result<ClusterMetrics> getClusterLoadReBalanceInfo(VersionItemParam metricParam) {
ClusterMetricParam param = (ClusterMetricParam)metricParam;
Result<ClusterBalanceItemState> stateResult = clusterBalanceService.getItemState(param.getClusterId());
if (stateResult.failed()) {
return Result.buildFromIgnoreData(stateResult);
}
ClusterBalanceItemState state = stateResult.getData();
ClusterMetrics metric = ClusterMetrics.initWithMetrics(param.getClusterId(), CLUSTER_METRIC_LOAD_RE_BALANCE_ENABLE, state.getEnable()? Constant.YES: Constant.NO);
metric.putMetric(CLUSTER_METRIC_LOAD_RE_BALANCE_CPU, state.getResItemState(Resource.CPU).floatValue());
metric.putMetric(CLUSTER_METRIC_LOAD_RE_BALANCE_NW_IN, state.getResItemState(Resource.NW_IN).floatValue());
metric.putMetric(CLUSTER_METRIC_LOAD_RE_BALANCE_NW_OUT, state.getResItemState(Resource.NW_OUT).floatValue());
metric.putMetric(CLUSTER_METRIC_LOAD_RE_BALANCE_DISK, state.getResItemState(Resource.DISK).floatValue());
return Result.buildSuc(metric);
}
/**
* 从某一个 controller 的 JMX 中获取指标再聚合得到集群的指标
* @param metricParam

View File

@@ -16,5 +16,5 @@ public class ConfigUtils {
}
@Value("${cluster-balance.ignored-topics.time-second:300}")
private Integer clusterBalanceIgnoredTopicsTimeSecond;
private Integer clusterBalanceIgnoredTopicsTimeSecond;
}

View File

@@ -143,7 +143,7 @@ public class HealthStateServiceImpl implements HealthStateService {
// DB中不存在则默认是存活的
metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.GOOD.getDimension());
} else if (!broker.alive()) {
metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)HealthStateEnum.DEAD.getDimension());
metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float) HealthStateEnum.DEAD.getDimension());
} else {
metrics.getMetrics().put(BROKER_METRIC_HEALTH_STATE, (float)this.calHealthState(aggResultList).getDimension());
}

View File

@@ -1,6 +1,5 @@
package com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseLoadReBalance;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum;
@@ -20,7 +19,6 @@ import static com.xiaojukeji.know.streaming.km.core.service.cluster.impl.Cluster
* @author didi
*/
@Component
@EnterpriseLoadReBalance(all = false)
public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
/**
* 整体的健康指标
@@ -121,13 +119,6 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
public static final String CLUSTER_METRIC_JOB_SUCCESS = "JobsSuccess";
public static final String CLUSTER_METRIC_JOB_FAILED = "JobsFailed";
@EnterpriseLoadReBalance
public static final String CLUSTER_METRIC_LOAD_RE_BALANCE_ENABLE = "LoadReBalanceEnable";
public static final String CLUSTER_METRIC_LOAD_RE_BALANCE_CPU = "LoadReBalanceCpu";
public static final String CLUSTER_METRIC_LOAD_RE_BALANCE_NW_IN = "LoadReBalanceNwIn";
public static final String CLUSTER_METRIC_LOAD_RE_BALANCE_NW_OUT = "LoadReBalanceNwOut";
public static final String CLUSTER_METRIC_LOAD_RE_BALANCE_DISK = "LoadReBalanceDisk";
public static final String CLUSTER_METRIC_COLLECT_COST_TIME = Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME;
public ClusterMetricVersionItems(){}
@@ -138,7 +129,6 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
}
@Override
@EnterpriseLoadReBalance(all = false)
public List<VersionMetricControlItem> init(){
List<VersionMetricControlItem> itemList = new ArrayList<>();
@@ -418,27 +408,6 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
.name(CLUSTER_METRIC_JOB_FAILED).unit("").desc("集群failed任务总数").category(CATEGORY_JOB)
.extend( buildMethodExtend( CLUSTER_METHOD_GET_JOBS_FAILED )));
// 集群维度-均衡相关
itemList.add( buildAllVersionsItem()
.name(CLUSTER_METRIC_LOAD_RE_BALANCE_ENABLE).unit("是/否").desc("是否开启均衡, 10").category(CATEGORY_CLUSTER)
.extend( buildMethodExtend( CLUSTER_METHOD_GET_CLUSTER_LOAD_RE_BALANCE_INFO )));
itemList.add( buildAllVersionsItem()
.name(CLUSTER_METRIC_LOAD_RE_BALANCE_CPU).unit("是/否").desc("CPU是否均衡, 10").category(CATEGORY_CLUSTER)
.extend( buildMethodExtend( CLUSTER_METHOD_GET_CLUSTER_LOAD_RE_BALANCE_INFO )));
itemList.add( buildAllVersionsItem()
.name(CLUSTER_METRIC_LOAD_RE_BALANCE_NW_IN).unit("是/否").desc("BytesIn是否均衡, 10").category(CATEGORY_CLUSTER)
.extend( buildMethodExtend( CLUSTER_METHOD_GET_CLUSTER_LOAD_RE_BALANCE_INFO )));
itemList.add( buildAllVersionsItem()
.name(CLUSTER_METRIC_LOAD_RE_BALANCE_NW_OUT).unit("是/否").desc("BytesOut是否均衡, 10").category(CATEGORY_CLUSTER)
.extend( buildMethodExtend( CLUSTER_METHOD_GET_CLUSTER_LOAD_RE_BALANCE_INFO )));
itemList.add( buildAllVersionsItem()
.name(CLUSTER_METRIC_LOAD_RE_BALANCE_DISK).unit("是/否").desc("Disk是否均衡, 10").category(CATEGORY_CLUSTER)
.extend( buildMethodExtend( CLUSTER_METHOD_GET_CLUSTER_LOAD_RE_BALANCE_INFO )));
itemList.add(buildAllVersionsItem()
.name(CLUSTER_METRIC_COLLECT_COST_TIME).unit("").desc("采集Cluster指标的耗时").category(CATEGORY_PERFORMANCE)
.extendMethod(CLUSTER_METHOD_DO_NOTHING));