diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/reassign/ReassignResult.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/reassign/ReassignResult.java index 8a734b1e..92dbe77d 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/reassign/ReassignResult.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/reassign/ReassignResult.java @@ -1,5 +1,6 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.reassign; +import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; import lombok.Data; import org.apache.kafka.common.TopicPartition; @@ -19,4 +20,10 @@ public class ReassignResult { return state.isDone(); } + + public boolean checkPreferredReplicaElectionUnNeed(String reassignBrokerIds, String originalBrokerIds) { + Integer targetLeader = CommonUtils.string2IntList(reassignBrokerIds).get(0); + Integer originalLeader = CommonUtils.string2IntList(originalBrokerIds).get(0); + return originalLeader.equals(targetLeader); + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java index 4fabb154..2f70b2d9 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java @@ -384,18 +384,20 @@ public class ReassignJobServiceImpl implements ReassignJobService { } // 更新任务状态 - Result result = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData()); - if (!result.hasData()){ - return Result.buildFromIgnoreData(result); + rv = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData()); + if (rv.successful()){ + return Result.buildFromIgnoreData(rv); } //已完成 rv = this.preferredReplicaElection(jobId); + + if (rv.failed()){ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); } - return Result.buildSuc(); + return Result.buildFromIgnoreData(rv); } @Override @@ -562,8 +564,14 @@ public class ReassignJobServiceImpl implements ReassignJobService { long now = System.currentTimeMillis(); boolean existNotFinished = false; + boolean unNeedPreferredReplicaElection = true; List subJobPOList = this.getSubJobsByJobId(jobPO.getId()); + for (ReassignSubJobPO subJobPO: subJobPOList) { + if (!reassignmentResult.checkPreferredReplicaElectionUnNeed(subJobPO.getReassignBrokerIds(),subJobPO.getOriginalBrokerIds())) { + unNeedPreferredReplicaElection = false; + } + if (!reassignmentResult.checkPartitionFinished(subJobPO.getTopicName(), subJobPO.getPartitionId())) { existNotFinished = true; continue; @@ -587,7 +595,7 @@ public class ReassignJobServiceImpl implements ReassignJobService { reassignJobDAO.updateById(newJobPO); } - return Result.buildSuc(); + return Result.build(unNeedPreferredReplicaElection); } private Result> setJobInRunning(ReassignJobPO jobPO) {