diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java index 83c4a7d2..eef02e6a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java @@ -60,4 +60,7 @@ public interface ReassignJobService { * 依据任务状态或者其中一个任务ID */ Long getOneRunningJobId(Long clusterPhyId); + + + Result preferredReplicaElection(Long jobId); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java index 1cabf913..4fabb154 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java @@ -31,6 +31,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; +import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService; import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignService; @@ -85,6 +86,9 @@ public class ReassignJobServiceImpl implements ReassignJobService { @Autowired private TopicConfigService topicConfigService; + @Autowired + private OpPartitionService opPartitionService; + @Override @Transactional public Result create(Long jobId, ReplaceReassignJob replaceReassignJob, String creator) { @@ -343,6 +347,7 @@ public class ReassignJobServiceImpl implements ReassignJobService { } @Override + @Transactional public Result verifyAndUpdateStatue(Long jobId) { if (jobId == null) { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull()); @@ -379,7 +384,18 @@ public class ReassignJobServiceImpl implements ReassignJobService { } // 更新任务状态 - return this.checkAndSetSuccessIfFinished(jobPO, rrr.getData()); + Result result = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData()); + if (!result.hasData()){ + return Result.buildFromIgnoreData(result); + } + + //已完成 + rv = this.preferredReplicaElection(jobId); + if (rv.failed()){ + TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); + } + + return Result.buildSuc(); } @Override @@ -466,6 +482,37 @@ public class ReassignJobServiceImpl implements ReassignJobService { return subPOList.get(0).getJobId(); } + @Override + public Result preferredReplicaElection(Long jobId) { + // 获取任务 + ReassignJobPO jobPO = reassignJobDAO.selectById(jobId); + if (jobPO == null) { + // 任务不存在 + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId)); + } + if (!JobStatusEnum.isFinished(jobPO.getStatus())){ + return Result.buildSuc(); + } + + // 获取子任务 + List subJobPOList = this.getSubJobsByJobId(jobId); + List topicPartitions = new ArrayList<>(); + subJobPOList.stream().forEach(reassignPO -> { + Integer targetLeader = CommonUtils.string2IntList(reassignPO.getReassignBrokerIds()).get(0); + Integer originalLeader = CommonUtils.string2IntList(reassignPO.getOriginalBrokerIds()).get(0); + //替换过leader的添加到优先副本选举任务列表 + if (!originalLeader.equals(targetLeader)){ + topicPartitions.add(new TopicPartition(reassignPO.getTopicName(), reassignPO.getPartitionId())); + } + }); + + if (!topicPartitions.isEmpty()){ + return opPartitionService.preferredReplicaElection(jobPO.getClusterPhyId(), topicPartitions); + } + + return Result.buildSuc(); + } + /**************************************************** private method ****************************************************/ @@ -510,7 +557,8 @@ public class ReassignJobServiceImpl implements ReassignJobService { } - private Result checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) { + @Transactional + public Result checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) { long now = System.currentTimeMillis(); boolean existNotFinished = false;