增加副本变更任务结束后,进行优先副本选举的操作

This commit is contained in:
zengqiao
2022-09-08 13:52:51 +08:00
parent b71a34279e
commit c40ae3c455
2 changed files with 53 additions and 2 deletions

View File

@@ -60,4 +60,7 @@ public interface ReassignJobService {
* 依据任务状态或者其中一个任务ID
*/
Long getOneRunningJobId(Long clusterPhyId);
Result<Void> preferredReplicaElection(Long jobId);
}

View File

@@ -31,6 +31,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService;
import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignService;
@@ -85,6 +86,9 @@ public class ReassignJobServiceImpl implements ReassignJobService {
@Autowired
private TopicConfigService topicConfigService;
@Autowired
private OpPartitionService opPartitionService;
@Override
@Transactional
public Result<Void> create(Long jobId, ReplaceReassignJob replaceReassignJob, String creator) {
@@ -343,6 +347,7 @@ public class ReassignJobServiceImpl implements ReassignJobService {
}
@Override
@Transactional
public Result<Void> verifyAndUpdateStatue(Long jobId) {
if (jobId == null) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull());
@@ -379,7 +384,18 @@ public class ReassignJobServiceImpl implements ReassignJobService {
}
// 更新任务状态
return this.checkAndSetSuccessIfFinished(jobPO, rrr.getData());
Result<Void> result = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData());
if (!result.hasData()){
return Result.buildFromIgnoreData(result);
}
//已完成
rv = this.preferredReplicaElection(jobId);
if (rv.failed()){
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
return Result.buildSuc();
}
@Override
@@ -466,6 +482,37 @@ public class ReassignJobServiceImpl implements ReassignJobService {
return subPOList.get(0).getJobId();
}
@Override
public Result<Void> preferredReplicaElection(Long jobId) {
// 获取任务
ReassignJobPO jobPO = reassignJobDAO.selectById(jobId);
if (jobPO == null) {
// 任务不存在
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId));
}
if (!JobStatusEnum.isFinished(jobPO.getStatus())){
return Result.buildSuc();
}
// 获取子任务
List<ReassignSubJobPO> subJobPOList = this.getSubJobsByJobId(jobId);
List<TopicPartition> topicPartitions = new ArrayList<>();
subJobPOList.stream().forEach(reassignPO -> {
Integer targetLeader = CommonUtils.string2IntList(reassignPO.getReassignBrokerIds()).get(0);
Integer originalLeader = CommonUtils.string2IntList(reassignPO.getOriginalBrokerIds()).get(0);
//替换过leader的添加到优先副本选举任务列表
if (!originalLeader.equals(targetLeader)){
topicPartitions.add(new TopicPartition(reassignPO.getTopicName(), reassignPO.getPartitionId()));
}
});
if (!topicPartitions.isEmpty()){
return opPartitionService.preferredReplicaElection(jobPO.getClusterPhyId(), topicPartitions);
}
return Result.buildSuc();
}
/**************************************************** private method ****************************************************/
@@ -510,7 +557,8 @@ public class ReassignJobServiceImpl implements ReassignJobService {
}
private Result<Void> checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) {
@Transactional
public Result<Void> checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) {
long now = System.currentTimeMillis();
boolean existNotFinished = false;