[fix]Repair that preferredReplicaElection is not called as expected

This commit is contained in:
zwen
2022-10-26 19:29:50 +08:00
committed by EricZeng
parent 80f001cdd5
commit 127b5be651
2 changed files with 20 additions and 5 deletions

View File

@@ -1,5 +1,6 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.reassign;
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import lombok.Data;
import org.apache.kafka.common.TopicPartition;
@@ -19,4 +20,10 @@ public class ReassignResult {
return state.isDone();
}
public boolean checkPreferredReplicaElectionUnNeed(String reassignBrokerIds, String originalBrokerIds) {
Integer targetLeader = CommonUtils.string2IntList(reassignBrokerIds).get(0);
Integer originalLeader = CommonUtils.string2IntList(originalBrokerIds).get(0);
return originalLeader.equals(targetLeader);
}
}

View File

@@ -384,18 +384,20 @@ public class ReassignJobServiceImpl implements ReassignJobService {
}
// 更新任务状态
Result<Void> result = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData());
if (!result.hasData()){
return Result.buildFromIgnoreData(result);
rv = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData());
if (rv.successful()){
return Result.buildFromIgnoreData(rv);
}
//已完成
rv = this.preferredReplicaElection(jobId);
if (rv.failed()){
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
return Result.buildSuc();
return Result.buildFromIgnoreData(rv);
}
@Override
@@ -562,8 +564,14 @@ public class ReassignJobServiceImpl implements ReassignJobService {
long now = System.currentTimeMillis();
boolean existNotFinished = false;
boolean unNeedPreferredReplicaElection = true;
List<ReassignSubJobPO> subJobPOList = this.getSubJobsByJobId(jobPO.getId());
for (ReassignSubJobPO subJobPO: subJobPOList) {
if (!reassignmentResult.checkPreferredReplicaElectionUnNeed(subJobPO.getReassignBrokerIds(),subJobPO.getOriginalBrokerIds())) {
unNeedPreferredReplicaElection = false;
}
if (!reassignmentResult.checkPartitionFinished(subJobPO.getTopicName(), subJobPO.getPartitionId())) {
existNotFinished = true;
continue;
@@ -587,7 +595,7 @@ public class ReassignJobServiceImpl implements ReassignJobService {
reassignJobDAO.updateById(newJobPO);
}
return Result.buildSuc();
return Result.build(unNeedPreferredReplicaElection);
}
private Result<List<ReassignSubJobPO>> setJobInRunning(ReassignJobPO jobPO) {