From c40ae3c45573ee932d6e051aa5f1b0f7714a35a3 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 8 Sep 2022 13:52:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=89=AF=E6=9C=AC=E5=8F=98?= =?UTF-8?q?=E6=9B=B4=E4=BB=BB=E5=8A=A1=E7=BB=93=E6=9D=9F=E5=90=8E=EF=BC=8C?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E4=BC=98=E5=85=88=E5=89=AF=E6=9C=AC=E9=80=89?= =?UTF-8?q?=E4=B8=BE=E7=9A=84=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/reassign/ReassignJobService.java | 3 ++ .../reassign/impl/ReassignJobServiceImpl.java | 52 ++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java index 83c4a7d2..eef02e6a 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java @@ -60,4 +60,7 @@ public interface ReassignJobService { * 依据任务状态或者其中一个任务ID */ Long getOneRunningJobId(Long clusterPhyId); + + + Result preferredReplicaElection(Long jobId); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java index 1cabf913..4fabb154 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java @@ -31,6 +31,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; +import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignJobService; import com.xiaojukeji.know.streaming.km.core.service.reassign.ReassignService; @@ -85,6 +86,9 @@ public class ReassignJobServiceImpl implements ReassignJobService { @Autowired private TopicConfigService topicConfigService; + @Autowired + private OpPartitionService opPartitionService; + @Override @Transactional public Result create(Long jobId, ReplaceReassignJob replaceReassignJob, String creator) { @@ -343,6 +347,7 @@ public class ReassignJobServiceImpl implements ReassignJobService { } @Override + @Transactional public Result verifyAndUpdateStatue(Long jobId) { if (jobId == null) { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, MsgConstant.getJobIdCanNotNull()); @@ -379,7 +384,18 @@ public class ReassignJobServiceImpl implements ReassignJobService { } // 更新任务状态 - return this.checkAndSetSuccessIfFinished(jobPO, rrr.getData()); + Result result = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData()); + if (!result.hasData()){ + return Result.buildFromIgnoreData(result); + } + + //已完成 + rv = this.preferredReplicaElection(jobId); + if (rv.failed()){ + TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); + } + + return Result.buildSuc(); } @Override @@ -466,6 +482,37 @@ public class ReassignJobServiceImpl implements ReassignJobService { return subPOList.get(0).getJobId(); } + @Override + public Result preferredReplicaElection(Long jobId) { + // 获取任务 + ReassignJobPO jobPO = reassignJobDAO.selectById(jobId); + if (jobPO == null) { + // 任务不存在 + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getJobNotExist(jobId)); + } + if (!JobStatusEnum.isFinished(jobPO.getStatus())){ + return Result.buildSuc(); + } + + // 获取子任务 + List subJobPOList = this.getSubJobsByJobId(jobId); + List topicPartitions = new ArrayList<>(); + subJobPOList.stream().forEach(reassignPO -> { + Integer targetLeader = CommonUtils.string2IntList(reassignPO.getReassignBrokerIds()).get(0); + Integer originalLeader = CommonUtils.string2IntList(reassignPO.getOriginalBrokerIds()).get(0); + //替换过leader的添加到优先副本选举任务列表 + if (!originalLeader.equals(targetLeader)){ + topicPartitions.add(new TopicPartition(reassignPO.getTopicName(), reassignPO.getPartitionId())); + } + }); + + if (!topicPartitions.isEmpty()){ + return opPartitionService.preferredReplicaElection(jobPO.getClusterPhyId(), topicPartitions); + } + + return Result.buildSuc(); + } + /**************************************************** private method ****************************************************/ @@ -510,7 +557,8 @@ public class ReassignJobServiceImpl implements ReassignJobService { } - private Result checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) { + @Transactional + public Result checkAndSetSuccessIfFinished(ReassignJobPO jobPO, ReassignResult reassignmentResult) { long now = System.currentTimeMillis(); boolean existNotFinished = false;