From 4114777a4ea8adb6dd51049efe1cfe9295b9c604 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Wed, 31 Aug 2022 17:11:12 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85leader=E9=80=89=E4=B8=BE?= =?UTF-8?q?=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../param/partition/BatchPartitionParam.java | 19 +++ .../service/partition/OpPartitionService.java | 14 +++ .../impl/OpPartitionServiceImpl.java | 119 ++++++++++++++++++ 3 files changed, 152 insertions(+) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java create mode 100644 km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java new file mode 100644 index 00000000..d316112f --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/BatchPartitionParam.java @@ -0,0 +1,19 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +@Data +@NoArgsConstructor +public class BatchPartitionParam extends ClusterPhyParam { + private List tpList; + + public BatchPartitionParam(Long clusterPhyId, List tpList) { + super(clusterPhyId); + this.tpList = tpList; + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java new file mode 100644 index 00000000..6a3611f8 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/OpPartitionService.java @@ -0,0 +1,14 @@ +package com.xiaojukeji.know.streaming.km.core.service.partition; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +public interface OpPartitionService { + + /** + * 优先副本选举 + */ + Result preferredReplicaElection(Long clusterPhyId, List tpList); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java new file mode 100644 index 00000000..0f1186ef --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/OpPartitionServiceImpl.java @@ -0,0 +1,119 @@ +package com.xiaojukeji.know.streaming.km.core.service.partition.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.BatchPartitionParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; +import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ElectLeadersOptions; +import org.apache.kafka.clients.admin.ElectLeadersResult; +import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import scala.jdk.javaapi.CollectionConverters; + +import javax.annotation.PostConstruct; +import java.util.HashSet; +import java.util.List; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_PARTITION_LEADER; + + +/** + * @author didi + */ +@Service +public class OpPartitionServiceImpl extends BaseVersionControlService implements OpPartitionService { + private static final ILog LOGGER = LogFactory.getLog(OpPartitionServiceImpl.class); + + @Autowired + private KafkaAdminClient kafkaAdminClient; + + @Autowired + private KafkaAdminZKClient kafkaAdminZKClient; + + public static final String PREFERRED_REPLICA_ELECTION = "PreferredReplicaElection"; + + @Override + protected VersionItemTypeEnum getVersionItemType() { + return SERVICE_OP_PARTITION_LEADER; + } + + @PostConstruct + private void init() { + registerVCHandler(PREFERRED_REPLICA_ELECTION, V_0_10_0_0, V_2_8_0, "preferredReplicaElectionByZKClient", this::preferredReplicaElectionByZKClient); + registerVCHandler(PREFERRED_REPLICA_ELECTION, V_2_8_0, V_MAX, "preferredReplicaElectionByKafkaClient", this::preferredReplicaElectionByKafkaClient); + } + + @Override + public Result preferredReplicaElection(Long clusterPhyId, List tpList) { + try { + return (Result) doVCHandler( + clusterPhyId, + PREFERRED_REPLICA_ELECTION, + new BatchPartitionParam(clusterPhyId, tpList) + ); + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + /**************************************************** private method ****************************************************/ + + private Result preferredReplicaElectionByZKClient(VersionItemParam itemParam) { + BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam; + + try { + KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(partitionParam.getClusterPhyId()); + + kafkaZkClient.createPreferredReplicaElection(CollectionConverters.asScala(partitionParam.getTpList()).toSet()); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "class=OpPartitionServiceImpl||method=preferredReplicaElectionByZKClient||clusterPhyId={}||errMsg=exception", + partitionParam.getClusterPhyId(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } + + private Result preferredReplicaElectionByKafkaClient(VersionItemParam itemParam) { + BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam; + + try { + AdminClient adminClient = kafkaAdminClient.getClient(partitionParam.getClusterPhyId()); + + ElectLeadersResult electLeadersResult = adminClient.electLeaders( + ElectionType.PREFERRED, + new HashSet<>(partitionParam.getTpList()), + new ElectLeadersOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + electLeadersResult.all().get(); + + return Result.buildSuc(); + } catch (Exception e) { + LOGGER.error( + "class=OpPartitionServiceImpl||method=preferredReplicaElectionByKafkaClient||clusterPhyId={}||errMsg=exception", + partitionParam.getClusterPhyId(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage()); + } + } +}