diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java index 838ac594..8fafdec5 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.admin.ElectLeadersOptions; import org.apache.kafka.clients.admin.ElectLeadersResult; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ElectionNotNeededException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import scala.jdk.javaapi.CollectionConverters; @@ -108,12 +109,17 @@ public class OpPartitionServiceImpl extends BaseKafkaVersionControlService imple return Result.buildSuc(); } catch (Exception e) { + if(e.getCause() instanceof ElectionNotNeededException) { + // ignore ElectionNotNeededException + return Result.buildSuc(); + } + LOGGER.error( "method=preferredReplicaElectionByKafkaClient||clusterPhyId={}||errMsg=exception", partitionParam.getClusterPhyId(), e ); - return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); } } }