diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java new file mode 100644 index 00000000..d316112f --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java @@ -0,0 +1,19 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +@Data +@NoArgsConstructor +public class BatchPartitionParam extends ClusterPhyParam { + private List tpList; + + public BatchPartitionParam(Long clusterPhyId, List tpList) { + super(clusterPhyId); + this.tpList = tpList; + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java new file mode 100644 index 00000000..6a3611f8 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java @@ -0,0 +1,14 @@ +package com.xiaojukeji.know.streaming.km.core.service.partition; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +public interface OpPartitionService { + + /** + * 优先副本选举 + */ + Result preferredReplicaElection(Long clusterPhyId, List tpList); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java new file mode 100644 index 00000000..0f1186ef --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java @@ -0,0 +1,119 @@ +package com.xiaojukeji.know.streaming.km.core.service.partition.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.BatchPartitionParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; +import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ElectLeadersOptions; +import org.apache.kafka.clients.admin.ElectLeadersResult; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import scala.jdk.javaapi.CollectionConverters; + +import javax.annotation.PostConstruct; +import java.util.HashSet; +import java.util.List; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_PARTITION_LEADER; + + +/** + * @author didi + */ +@Service +public class OpPartitionServiceImpl extends BaseVersionControlService implements OpPartitionService { + private static final ILog LOGGER = LogFactory.getLog(OpPartitionServiceImpl.class); + + @Autowired + private KafkaAdminClient kafkaAdminClient; + + @Autowired + private KafkaAdminZKClient kafkaAdminZKClient; + + public static final String PREFERRED_REPLICA_ELECTION = "PreferredReplicaElection"; + + @Override + protected VersionItemTypeEnum getVersionItemType() { + return SERVICE_OP_PARTITION_LEADER; + } + + @PostConstruct + private void init() { + registerVCHandler(PREFERRED_REPLICA_ELECTION, V_0_10_0_0, V_2_8_0, "preferredReplicaElectionByZKClient", this::preferredReplicaElectionByZKClient); + registerVCHandler(PREFERRED_REPLICA_ELECTION, V_2_8_0, V_MAX, "preferredReplicaElectionByKafkaClient", this::preferredReplicaElectionByKafkaClient); + } + + @Override + public Result preferredReplicaElection(Long clusterPhyId, List tpList) { + try { + return (Result) doVCHandler( + clusterPhyId, + PREFERRED_REPLICA_ELECTION, + new BatchPartitionParam(clusterPhyId, tpList) + ); + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + /**************************************************** private method ****************************************************/ + + private Result preferredReplicaElectionByZKClient(VersionItemParam itemParam) { + BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam; + + try { + KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(partitionParam.getClusterPhyId()); + + kafkaZkClient.createPreferredReplicaElection(CollectionConverters.asScala(partitionParam.getTpList()).toSet()); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "class=OpPartitionServiceImpl||method=preferredReplicaElectionByZKClient||clusterPhyId={}||errMsg=exception", + partitionParam.getClusterPhyId(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } + + private Result preferredReplicaElectionByKafkaClient(VersionItemParam itemParam) { + BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam; + + try { + AdminClient adminClient = kafkaAdminClient.getClient(partitionParam.getClusterPhyId()); + + ElectLeadersResult electLeadersResult = adminClient.electLeaders( + ElectionType.PREFERRED, + new HashSet<>(partitionParam.getTpList()), + new ElectLeadersOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + electLeadersResult.all().get(); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "class=OpPartitionServiceImpl||method=preferredReplicaElectionByKafkaClient||clusterPhyId={}||errMsg=exception", + partitionParam.getClusterPhyId(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } +}