diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/reassign/ReassignResult.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/reassign/ReassignResult.java index 92dbe77d..8a734b1e 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/reassign/ReassignResult.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/reassign/ReassignResult.java @@ -1,6 +1,5 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.reassign; -import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; import lombok.Data; import org.apache.kafka.common.TopicPartition; @@ -20,10 +19,4 @@ public class ReassignResult { return state.isDone(); } - - public boolean checkPreferredReplicaElectionUnNeed(String reassignBrokerIds, String originalBrokerIds) { - Integer targetLeader = CommonUtils.string2IntList(reassignBrokerIds).get(0); - Integer originalLeader = CommonUtils.string2IntList(originalBrokerIds).get(0); - return originalLeader.equals(targetLeader); - } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/CommonUtils.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/CommonUtils.java index f3d2b357..48601795 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/CommonUtils.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/CommonUtils.java @@ -261,4 +261,20 @@ public class CommonUtils { return null; } } + + + /** + * 校验两个list的第一个元素是否相等,以","分隔元素。 + * @param str1 + * @param str2 + * @return + */ + public static boolean checkFirstElementIsEquals(String str1, String str2) { + if (ValidateUtils.anyBlank(str1, str2)) { + return false; + } + Integer targetLeader = CommonUtils.string2IntList(str1).get(0); + Integer originalLeader = CommonUtils.string2IntList(str2).get(0); + return originalLeader.equals(targetLeader); + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java index 30c2f6e1..5f9b737d 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/impl/ReassignJobServiceImpl.java @@ -28,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReassignUtil; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; @@ -385,11 +386,13 @@ public class ReassignJobServiceImpl implements ReassignJobService { // 更新任务状态 rv = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData()); - if (rv.successful()){ + + //如果任务还未完成,先返回,不必考虑优先副本的重新选举。 + if (!rv.successful()) { return Result.buildFromIgnoreData(rv); } - //已完成 + //任务已完成,检查是否需要重新选举,并进行选举。 rv = this.preferredReplicaElection(jobId); @@ -500,10 +503,8 @@ public class ReassignJobServiceImpl implements ReassignJobService { List subJobPOList = this.getSubJobsByJobId(jobId); List topicPartitions = new ArrayList<>(); subJobPOList.stream().forEach(reassignPO -> { - Integer targetLeader = CommonUtils.string2IntList(reassignPO.getReassignBrokerIds()).get(0); - Integer originalLeader = CommonUtils.string2IntList(reassignPO.getOriginalBrokerIds()).get(0); //替换过leader的添加到优先副本选举任务列表 - if (!originalLeader.equals(targetLeader)){ + if (!CommonUtils.checkFirstElementIsEquals(reassignPO.getReassignBrokerIds(), reassignPO.getOriginalBrokerIds())) { topicPartitions.add(new TopicPartition(reassignPO.getTopicName(), reassignPO.getPartitionId())); } }); @@ -534,8 +535,12 @@ public class ReassignJobServiceImpl implements ReassignJobService { if (dbSubPO == null) { // DB中不存在 reassignSubJobDAO.insert(elem); + return; } + //补全缺失信息 + this.completeInfo(elem,dbSubPO); + // 已存在则进行更新 elem.setId(dbSubPO.getId()); reassignSubJobDAO.updateById(elem); @@ -565,13 +570,10 @@ public class ReassignJobServiceImpl implements ReassignJobService { long now = System.currentTimeMillis(); boolean existNotFinished = false; - boolean unNeedPreferredReplicaElection = true; + boolean jobSucceed = false; List subJobPOList = this.getSubJobsByJobId(jobPO.getId()); for (ReassignSubJobPO subJobPO: subJobPOList) { - if (!reassignmentResult.checkPreferredReplicaElectionUnNeed(subJobPO.getReassignBrokerIds(),subJobPO.getOriginalBrokerIds())) { - unNeedPreferredReplicaElection = false; - } if (!reassignmentResult.checkPartitionFinished(subJobPO.getTopicName(), subJobPO.getPartitionId())) { existNotFinished = true; @@ -591,12 +593,13 @@ public class ReassignJobServiceImpl implements ReassignJobService { // 当前没有分区处于迁移中, 并且没有任务并不处于执行中 ReassignJobPO newJobPO = new ReassignJobPO(); newJobPO.setId(jobPO.getId()); + jobSucceed = true; newJobPO.setStatus(JobStatusEnum.SUCCESS.getStatus()); newJobPO.setFinishedTime(new Date(now)); reassignJobDAO.updateById(newJobPO); } - return Result.build(unNeedPreferredReplicaElection); + return Result.build(jobSucceed); } private Result> setJobInRunning(ReassignJobPO jobPO) { @@ -861,4 +864,25 @@ public class ReassignJobServiceImpl implements ReassignJobService { return returnRV; } + + private void completeInfo(ReassignSubJobPO newPO, ReassignSubJobPO dbPO) { + if (newPO.getJobId() == null) { + newPO.setJobId(dbPO.getJobId()); + } + if (newPO.getTopicName() == null) { + newPO.setTopicName(dbPO.getTopicName()); + } + if (newPO.getClusterPhyId() == null) { + newPO.setClusterPhyId(dbPO.getClusterPhyId()); + } + if (newPO.getPartitionId() == null) { + newPO.setPartitionId(dbPO.getPartitionId()); + } + if (newPO.getOriginalBrokerIds() == null || newPO.getOriginalBrokerIds().isEmpty()) { + newPO.setOriginalBrokerIds(dbPO.getOriginalBrokerIds()); + } + if (newPO.getReassignBrokerIds() == null || newPO.getReassignBrokerIds().isEmpty()) { + newPO.setReassignBrokerIds(dbPO.getReassignBrokerIds()); + } + } }